如何在librdkafka中使用"group.id"?(卡夫卡版本"0.8.2.2")
How can I use "group.id" in librdkafka? (kafka version is "0.8.2.2")
- Kafka服务器版本=0.8.2.2
- librdkafka(Kafka c++客户端(版本=0.9.1
例如,一个组中有两个消费者。两个消费者必须从主题中获得不同的信息。
我用了如下的方式,但似乎不起作用。两个消费者从该主题获得所有信息。
std::string topic_str = "sample";
std::string errstr;
int32_t partition = 0;
int64_t start_offset = RdKafka::Topic::OFFSET_END;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
//...omit...
conf->set("broker.version.fallback", "0.8.2.2", errstr);
conf->set("group.id", "group_001", errstr);
//...omit...
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, errstr);
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
//...omit...
while (1) {
//...omit...
RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
//print message and offset
}
librdkafka仅支持在Kafka 0.9中添加的新的基于代理的平衡消费者组(KafkaConsumer类(。(Kafka 0.8上的消费者群体平衡基于Zookeeper,仅在Scala官方客户端中实现。(
您还使用了遗留的低级使用者(使用者类(,它没有任何形式的平衡使用者支持。
我建议将您的Kafka集群升级到0.9(或0.10!(,并更改代码以使用新的KafkaConsumer类。
此处的示例:https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_consumer_example.cpp
相关文章:
- 如何在选项卡视图Qt中设置一个新项目,并保存以前的项目
- 在卡萨布兰卡形成编码参数的列表
- 通过选项卡的文本设置QTabWidget顺序
- phytec phyBOARD iMX-6在从闪存而不是SD卡运行qt5 opengles应用程序时表现不佳(FPS减半
- C++卡验证问题
- C++ OpenCV 卡尔曼滤波器构造函数错误
- 神经网络不学习.卡在50%
- 使用专用显卡进行 OpenGL 渲染时帧速率较低
- 在不使用系统的情况下从C++应用程序格式化 Linux 中的 SD 卡
- 在 std::getline 和 std::cin 期间卡在循环中
- 如何在 macOS 卡塔琳娜上解决此错误?
- 按钮悬停在 QT 中垂直布局的选项卡小部件中不起作用
- Visual Studio C++ 它只构建选项卡中显示的文件吗?
- 比较两个字符串后卡在无限循环中
- 卡夫卡消费者投票最新消息
- Zookeeper中的卡夫卡消费者注册列表
- 卡斯卡包装器
- 为什么一些.exe文件缺少版本选项卡
- 如何在librdkafka中使用"group.id"?(卡夫卡版本"0.8.2.2")
- C++卡夫卡客户端(rdkafka)