找到你要的答案

Q:Does commitOffsets on high-level consumer block?

Q:对高层次消费commitoffsets块?

In the Java Client (http://kafka.apache.org/documentation.html#highlevelconsumerapi), does commitOffsets on the high-level consumer block until offsets are successfully commited, or is it fire-and-forget?

在java客户端(HTTP:/ /卡夫卡。Apache。org /文档,HTML # highlevelconsumerapi),并对高层次消费块commitoffsets直到偏移成功提交,或者是消防和忘记?

answer1: 回答1:

Does commitOffsets on the high-level consumer block until offsets are successfully committed?

It looks like commitOffsets() loops through each consumer and calls updatePersistentPath if its offset has changed, and if so writes data via zkClient.writeData(path, getBytes(data)). It appears is though commitOffsets() does block until all the offsets are committed.

Here is the source code for commitOffsets() (ref):

public void commitOffsets() {
    if (zkClient == null) {
        logger.error("zk client is null. Cannot commit offsets");
        return;
    }
    for (Entry<String, Pool<Partition, PartitionTopicInfo>> e : topicRegistry.entrySet()) {
        ZkGroupTopicDirs topicDirs = new ZkGroupTopicDirs(config.getGroupId(), e.getKey());
        for (PartitionTopicInfo info : e.getValue().values()) {
            final long lastChanged = info.getConsumedOffsetChanged().get();
            if (lastChanged == 0) {
                logger.trace("consume offset not changed");
                continue;
            }
            final long newOffset = info.getConsumedOffset();
            //path: /consumers/<group>/offsets/<topic>/<brokerid-partition>
            final String path = topicDirs.consumerOffsetDir + "/" + info.partition.getName();
            try {
                ZkUtils.updatePersistentPath(zkClient, path, "" + newOffset);
            } catch (Throwable t) {
                logger.warn("exception during commitOffsets, path=" + path + ",offset=" + newOffset, t);
            } finally {
                info.resetComsumedOffsetChanged(lastChanged);
                if (logger.isDebugEnabled()) {
                    logger.debug("Committed [" + path + "] for topic " + info);
                }
            }
        }
    }
}

and for updatePersistentPath(...) (ref):

public static void updatePersistentPath(ZkClient zkClient, String path, String data) {
    try {
        zkClient.writeData(path, getBytes(data));
    } catch (ZkNoNodeException e) {
        createParentPath(zkClient, path);
        try {
            zkClient.createPersistent(path, getBytes(data));
        } catch (ZkNodeExistsException e2) {
            zkClient.writeData(path, getBytes(data));
        }
    }
}

对高层消费块commitoffsets直到偏移成功提交?

它看起来像commitoffsets()循环通过每个消费者和电话updatepersistentpath如果偏移了,如果这样写数据通过zkclient。WriteData(路径,getBytes(数据))。看来虽然是commitoffsets()会阻塞直到所有补偿承诺。

这是commitoffsets()源代码(参考):

public void commitOffsets() {
    if (zkClient == null) {
        logger.error("zk client is null. Cannot commit offsets");
        return;
    }
    for (Entry<String, Pool<Partition, PartitionTopicInfo>> e : topicRegistry.entrySet()) {
        ZkGroupTopicDirs topicDirs = new ZkGroupTopicDirs(config.getGroupId(), e.getKey());
        for (PartitionTopicInfo info : e.getValue().values()) {
            final long lastChanged = info.getConsumedOffsetChanged().get();
            if (lastChanged == 0) {
                logger.trace("consume offset not changed");
                continue;
            }
            final long newOffset = info.getConsumedOffset();
            //path: /consumers/<group>/offsets/<topic>/<brokerid-partition>
            final String path = topicDirs.consumerOffsetDir + "/" + info.partition.getName();
            try {
                ZkUtils.updatePersistentPath(zkClient, path, "" + newOffset);
            } catch (Throwable t) {
                logger.warn("exception during commitOffsets, path=" + path + ",offset=" + newOffset, t);
            } finally {
                info.resetComsumedOffsetChanged(lastChanged);
                if (logger.isDebugEnabled()) {
                    logger.debug("Committed [" + path + "] for topic " + info);
                }
            }
        }
    }
}

和updatepersistentpath(…)(参考):

public static void updatePersistentPath(ZkClient zkClient, String path, String data) {
    try {
        zkClient.writeData(path, getBytes(data));
    } catch (ZkNoNodeException e) {
        createParentPath(zkClient, path);
        try {
            zkClient.createPersistent(path, getBytes(data));
        } catch (ZkNodeExistsException e2) {
            zkClient.writeData(path, getBytes(data));
        }
    }
}
apache-kafka  kafka-consumer-api