在 C++ 中将序列化的 Thrift 结构序列化为 Kafka
Serializing a serialized Thrift struct to Kafka in C++
我有一组structs
在Thrift
中定义,如下所示:
struct Foo {
1: i32 a,
2: i64 b
}
我需要在C++
执行以下操作:
(a( 将Foo
实例序列化为兼容 Thrift 的字节(使用 Binary
或 Compact
Thrift 协议(
(b( 将字节序列化实例发送到Kafka
主题
问题
如何将Thrift
序列化实例发送到Kafka
集群?
提前致谢
想出了我自己问题的答案。
序列化
下面的代码片段说明了如何将 Foo
实例序列化为Thrift
兼容的字节(使用 Thrift Compact
协议(。要使用Binary
协议,请将TCompactProtocol
替换为 TBinaryProtocol
。
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TCompactProtocol.h>
using apache::thrift::protocol::TCompactProtocol;
using apache::thrift::transport::TMemoryBuffer;
...
...
boost::shared_ptr<TMemoryBuffer> buffer(new TMemoryBuffer());
boost::shared_ptr<TCompactProtocol> protocol(new TCompactProtocol(buffer));
uint8_t **serialized_bytes = reinterpret_cast<uint8_t **>(malloc(sizeof(uint8_t *)));
uint32_t num_bytes = 0;
// 'foo' is an instance of Foo
foo->write(protocol.get());
buffer->getBuffer(serialized_bytes, &num_bytes);
发送到 Kafka 集群
以下代码片段演示如何将与 Thrift 兼容的字节发送到 Kafka 群集。
注意:下面使用的 kafka 客户端库是 librdkafka。
#include "rdkafkacpp.h"
std::string errstr;
// Create global configuration
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", "localhost:9092", errstr);
conf->set("api.version.request", "true", errstr);
// Create kafka producer
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
// Create topic-specific configuration
RdKafka::Topic *topic = RdKafka::Topic::create(producer, "topic_name", nullptr, errstr);
auto partition = 1;
// Sending the serialized bytes to Kafka cluster
auto res = producer->produce(
topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
serialized_bytes, num_bytes,
NULL, NULL);
if (res != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to publish message" << RdKafka::err2str(res) << std::endl;
} else {
std::cout << "Published message of " << num_bytes << " bytes" << std::endl;
}
producer->flush(10000);
相关文章:
- 如何在C++中序列化结构数据
- 如何使用boost::具有嵌套结构和最小代码更改的序列化
- 如何在 boost::asio 中将打包的结构作为消息传递?(无序列化)
- 使用指向 struc 的指针序列化结构
- C++ 提升 - 包含类层次结构对象的类的序列化
- 如何从平面缓冲区中反序列化联合结构的 void* 值的大小
- 将载体和cv :: mat to Disk的存储结构 - C 中的数据序列化
- 序列化没有"save"方法的大型结构
- 在 C# 中序列化这些值以在C++中作为已知结构正确读取时遇到问题
- 无序列图会创建一个零初始化结构吗?
- 如何在结构中序列化unique_ptr<char[]>
- 我的数据结构的最佳序列化方法
- 分段错误:C++中的结构序列化和 MPI 数据传输
- 跳过层次结构中的中间类,并使用boost ::序列化
- Boost::序列化存储结构时的堆栈溢出错误
- C 使用谷物序列化结构的静态阵列
- 具有3字节块的序列化C 结构
- 序列化结构以归档,并用字符串再次对其进行启用
- 序列化结构
- 序列化结构的STL映射