LibRdKafka: commited_offset always at -1001

LibRdKafka: commited_offset always at -1001

本文关键字:always at -1001 offset commited LibRdKafka      更新时间:2023-10-16

当我管理我的消费者组时,我总是有这样的统计数据:

2016-10-15 13:56:17.925: "STATS": { "name": "debian-meox#producer-2", "type": "producer", "ts":16768436761, "time":1476532577, "replyq":0, "msg_cnt":75428, "msg_size":29314007, "msg_max":100000, "msg_size_max":4096000000, "simple_cnt":0, "brokers":{ "localhost:9092/bootstrap": { "name":"localhost:9092/bootstrap", "nodeid":-1, "state":"UP", "stateage":4989561, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":10064, "max":10064, "avg":10064, "sum":10064, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ } } , "localhost:9093/bootstrap": { "name":"localhost:9093/bootstrap", "nodeid":-1, "state":"UP", "stateage":4989603, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":10078, "max":10078, "avg":10078, "sum":10078, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ } } , "localhost:9094/bootstrap": { "name":"localhost:9094/bootstrap", "nodeid":-1, "state":"UP", "stateage":4989343, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":10075, "max":10075, "avg":10075, "sum":10075, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ } } , "debian-meox:9094/2": { "name":"debian-meox:9094/2", "nodeid":2, "state":"UP", "stateage":4959041, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":472, "max":472, "avg":472, "sum":472, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ } } , "debian-meox:9093/1": { "name":"debian-meox:9093/1", "nodeid":1, "state":"UP", "stateage":4958754, "outbuf_cnt":28, "outbuf_msg_cnt":63922, "waitresp_cnt":3, "waitresp_msg_cnt":7488, "tx":1121, "txbytes":587723174, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":633, "rxbytes":24173, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":1248, "max":205146, "avg":22323, "sum":14130815, "cnt":633 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":632 }, "toppars":{ "test_topic": { "topic":"test_topic", "partition":1} } } , "debian-meox:9092/0": { "name":"debian-meox:9092/0", "nodeid":0, "state":"UP", "stateage":4958760, "outbuf_cnt":0, "outbuf_msg_cnt":0, "waitresp_cnt":0, "waitresp_msg_cnt":0, "tx":1, "txbytes":41, "txerrs":0, "txretries":0, "req_timeouts":0, "rx":1, "rxbytes":157, "rxerrs":0, "rxcorriderrs":0, "rxpartial":0, "zbuf_grow":0, "buf_grow":0, "rtt": { "min":328, "max":328, "avg":328, "sum":328, "cnt":1 }, "throttle": { "min":0, "max":0, "avg":0, "sum":0, "cnt":0 }, "toppars":{ "test_topic": { "topic":"test_topic", "partition":0} } } }, "topics":{ "test_topic": { "topic":"test_topic", "metadata_age":4933, "partitions":{ "0": { "partition":0, "leader":0, "desired":false, "unknown":false, "msgq_cnt":0, "msgq_bytes":0, "xmit_msgq_cnt":0, "xmit_msgq_bytes":0, "fetchq_cnt":0, "fetchq_size":0, "fetch_state":"none", "query_offset":0, "next_offset":0, "app_offset":-1001, "stored_offset":-1001, "commited_offset":-1001, "committed_offset":-1001, "eof_offset":-1001, "lo_offset":-1001, "hi_offset":-1001, "consumer_lag":-1, "txmsgs":0, "txbytes":0, "msgs": 0, "rx_ver_drops": 0 } , "1": { "partition":1, "leader":1, "desired":false, "unknown":false, "msgq_cnt":1572, "msgq_bytes":840177, "xmit_msgq_cnt":2474, "xmit_msgq_bytes":1322717, "fetchq_cnt":0, "fetchq_size":0, "fetch_state":"none", "query_offset":0, "next_offset":0, "app_offset":-1001, "stored_offset":-1001, "commited_offset":-1001, "committed_offset":-1001, "eof_offset":-1001, "lo_offset":-1001, "hi_offset":-1001, "consumer_lag":-1, "txmsgs":1357083, "txbytes":613073020, "msgs": 1361129, "rx_ver_drops": 0 } , "-1": { "partition":-1, "leader":-1, "desired":false, "unknown":false, "msgq_cnt":0, "msgq_bytes":0, "xmit_msgq_cnt":0, "xmit_msgq_bytes":0, "fetchq_cnt":0, "fetchq_size":0, "fetch_state":"none", "query_offset":0, "next_offset":0, "app_offset":-1001, "stored_offset":-1001, "commited_offset":-1001, "committed_offset":-1001, "eof_offset":-1001, "lo_offset":-1001, "hi_offset":-1001, "consumer_lag":-1, "txmsgs":0, "txbytes":0, "msgs": 17604, "rx_ver_drops": 0 } } } } }

你可以看到所有的偏移量都被设置为-1001:

"committed_offset":-1001,
"eof_offset":-1001,
"lo_offset":-1001,
"hi_offset":-1001,

这是我的设置(也为default_topic):

"auto.commit.enable", "true"
"offset.store.method", "broker"

有时,即使主题中有很多消息,消费者组也无法获取它们。任何想法?

所有分区都在"fetch_state": "none"中,这通常意味着它们不是assign():ed,因此不适合使用(也可能是它们没有leader,但这里的情况不是这样:"leader": 1)。只有当从代理获取消息并由应用程序使用时,才更新和提交偏移量(如果enable.auto.commit为true(默认))。

您是否注册了RebalanceCb?在ERR__ASSIGN_PARTITIONS上调用assign(partitions...),在ERR__REVOKE_PARTITIONS上调用unassign()

的例子:

class ExampleRebalanceCb : public RdKafka::RebalanceCb {
public:
  void rebalance_cb (RdKafka::KafkaConsumer *consumer,
             RdKafka::ErrorCode err,
                     std::vector<RdKafka::TopicPartition*> &partitions) {
    if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
      consumer->assign(partitions);
    else
      consumer->unassign();
  }
};

完整示例:https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_consumer_example.cpp