librdkafka 线程在失败时不清理
librdkafka threads not cleaning up on failure
在我的应用程序中,我正在创建一个生产者和一个消费者。
conf->set("dr_cb", delivery_cb, errstr);
conf->set("event_cb", event_cb, errstr);
RdKafka::Producer::create(conf.get(), errstr)
conf->set("log_level", "0", errstr);
conf->set("group.id", group_id, errstr);
conf->set("client.id", m_kafka_client_id, errstr);
conf->set("auto.offset.reset", "earliest", errstr);
conf->set("rebalance_cb", rebalance_cb, errstr);
conf->set("statistics.interval.ms", "3000", errstr);
conf->set("event_cb", event_cb, errstr);
RdKafka::KafkaConsumer::create(conf.get(), errstr)
然后我尝试获取元数据,如下所示
err = _consumer->metadata(false, nullptr, &metadata, METADATA_TIMEOUT);
std::unique_ptr<RdKafka::Metadata> metadata_uptr(metadata); // Handover the raw pointer to the unique_ptr now
如果由于某种原因代理通信不起作用,我收到错误消息,然后我生产者和消费者通过unique_ptr ->析构函数删除。 然后进入循环,直到应用程序成功连接到代理。
我观察到的是,我看到很多线程被创建并保留在那里。 在某个时间点,计数达到 2000 个线程。
清理卡夫卡的正确方法是什么?
线程卡在这里
#0 0x00007f6acc031cf2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x00000000004b8e65 in cnd_timedwait_ms (cnd=cnd@entry=0xab41b8, mtx=mtx@entry=0xab4190, timeout_ms=<optimized out>) at tinycthread.c:501
#2 0x00000000004853aa in rd_kafka_q_serve (rkq=0xab4190, timeout_ms=<optimized out>, max_cnt=max_cnt@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK,
callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:440
#3 0x000000000045a43c in rd_kafka_thread_main (arg=arg@entry=0xabf760) at rdkafka.c:1227
#4 0x00000000004b8c07 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624
#5 0x00007f6acc02de25 in start_thread () from /lib64/libpthread.so.0
#6 0x00007f6acaa7834d in clone () from /lib64/libc.so.6
我还尝试在失败时调用以下例程,但没有帮助......
RdKafka::wait_destroyed(5000);
RdKafka::KafkaConsumer 被包装在另一个容器中,"myobject_wrapper->close(("没有被重定向到"RdKafka::KafkaConsumer::close">
在我的应用程序中解决此问题后,现在一切正常。
相关文章:
- 非静态成员失败的线程调用函数
- MS 本机单元测试 - 断言::线程失败不起作用
- 两个线程一个使用流 Api,另一个线程创建文件失败并出现错误ERROR_SHARING_VIOLATION
- 使用提升线程时编译失败
- librdkafka 线程在失败时不清理
- 错误:静态断言失败:std ::线程参数必须在转换为rvalues后不可行
- 什么时候标准::线程::连接会因no_such_process而失败
- 更改线程实时调度策略失败:config_rt_group_sched = y
- C++ 提升 UDP 接收器在放入线程时失败
- 为线程构造函数传递引用以将其绑定到函数失败
- glewInit 从后台线程调用时失败
- 链接到boost ::线程失败
- 将函子传递给boost ::线程在Visual Studio 2010中失败
- Qt 窗口包含提升线程标头失败
- 在多个线程中使用libcurl失败
- 远程线程的DuplicateHandle失败,错误为error_INVALID_HANDLE
- 在C++std::streams中,失败后,如何获得失败原因?必需:线程安全,适用于Windows和Linux(或至少M
- 线程未正确结束:它忽略失败的循环条件
- C++ 线程获取引用参数编译失败
- 提升线程链接在 Netbeans 7.1 调试/测试会话中失败