如何在没有缓冲的情况下使用 parquet-cpp 写入面向流/行的数据?
How can I write streaming/row-oriented data using parquet-cpp without buffering?
我基本上有面向行/流的数据(Netflow(进入我的C++应用程序,我想将数据写入Parquet-gzip文件。
查看 parquet-cpp 项目中的示例 reader-writer.cc 程序,似乎我只能以列式方式将数据馈送到 parquet-cpp:
constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
...
// Append a RowGroup with a specific number of rows.
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup(NUM_ROWS_PER_ROW_GROUP);
// Write the Bool column
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
bool_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the Int32 column
...
// Write the ... column
这似乎意味着我需要自己缓冲NUM_ROWS_PER_ROW_GROUP行,然后循环访问它们,以便将它们一次一列地传输到 parquet-cpp 中。 我希望有更好的方法,因为这似乎效率低下,因为数据需要复制两次:一次复制到我的缓冲区中,然后在将数据一次一列地输入 parquet-cpp 时再次复制。
有没有办法将每一行的数据放入 parquet-cpp 中,而不必先缓冲一堆行? Apache Arrow 项目(parquet-cpp 使用(有一个教程,展示了如何将逐行数据转换为箭头表。 对于每行输入数据,代码将附加到每个列生成器:
for (const data_row& row : rows) {
ARROW_RETURN_NOT_OK(id_builder.Append(row.id));
ARROW_RETURN_NOT_OK(cost_builder.Append(row.cost));
我想用镶木地板做类似的事情。 这可能吗?
您将永远无法完全没有缓冲,因为我们需要从逐行转换为列式表示。在撰写本文时,最好的路径是构建 Apache 箭头表,然后将其输入parquet-cpp
.
parquet-cpp
提供了特殊的 Arrow API,然后可以直接对这些表进行操作,大多数情况下无需任何其他数据副本。您可以在parquet/arrow/reader.h
和parquet/arrow/writer.h
中找到 API。
最佳但尚未实施的解决方案可以通过执行以下操作来节省一些字节:
- 在新的镶木地板 API 中逐行摄取
- 使用指定的编码和压缩设置直接对每列的这些值进行编码
- 仅在内存中缓冲此内容
- 在行组的末尾,写出一列又一列
虽然这个最佳解决方案可能会为您节省一些内存,但仍有一些步骤需要由某人实现(随意贡献它们或寻求实现这些步骤的帮助(,您可能擅长使用基于 Apache Arrow 的 API。
我按照@xhochy的建议,在数据到达时使用箭头API填充箭头表,然后使用parquet-cpp
的WriteTable()
方法写出表。 我将 GZIP 设置为默认压缩,但为第二个字段指定了 SNAPPY。
#include <iostream>
#include "arrow/builder.h"
#include "arrow/table.h"
#include "arrow/io/file.h"
#include <parquet/arrow/writer.h>
#include <parquet/properties.h>
main() {
arrow::Int32Builder sip_builder(arrow::default_memory_pool());
arrow::Int32Builder dip_builder(arrow::default_memory_pool());
for(size_t i=0; i < 1000; i++) { // simulate row-oriented incoming data
sip_builder.Append(i*100);
dip_builder.Append(i*10 + i);
}
std::shared_ptr<arrow::Array> sip_array;
sip_builder.Finish(&sip_array);
std::shared_ptr<arrow::Array> dip_array;
dip_builder.Finish(&dip_array);
std::vector<std::shared_ptr<arrow::Field>> schema_definition = {
arrow::field("sip", arrow::int32(), false /* don't allow null; makes field required */),
arrow::field("dip", arrow::int32(), false)
};
auto schema = std::make_shared<arrow::Schema>(schema_definition);
std::shared_ptr<arrow::Table> arrow_table;
MakeTable(schema, {sip_array, dip_array}, &arrow_table);
std::shared_ptr<arrow::io::FileOutputStream> file_output_stream;
arrow::io::FileOutputStream::Open("test.parquet", &file_output_stream);
parquet::WriterProperties::Builder props_builder;
props_builder.compression(parquet::Compression::GZIP);
props_builder.compression("dip", parquet::Compression::SNAPPY);
auto props = props_builder.build();
parquet::arrow::WriteTable(*arrow_table, ::arrow::default_memory_pool(),
file_output_stream, sip_array->length(), props);
std::cout << "done" << std::endl;
}
$ g++ -std=c++11 -I/opt/parquet-cpp/build/release/include -lparquet -larrow arrow-test.cc; ./a.out
done
$ /opt/parquet-cpp/build/release/parquet_reader --only-metadata test.parquet
File Name: test.parquet
Version: 0
Created By: parquet-cpp version 1.2.1-SNAPSHOT
Total rows: 1000
Number of RowGroups: 1 <<----------
Number of Real Columns: 2
Number of Columns: 2
Number of Selected Columns: 2
Column 0: sip (INT32)
Column 1: dip (INT32)
--- Row Group 0 ---
--- Total Bytes 8425 ---
Rows: 1000---
Column 0
, Values: 1000, Null Values: 0, Distinct Values: 0
Max: 99900, Min: 0
Compression: GZIP, Encodings: PLAIN_DICTIONARY PLAIN RLE
Uncompressed Size: 5306, Compressed Size: 3109
Column 1
, Values: 1000, Null Values: 0, Distinct Values: 0
Max: 10989, Min: 0
Compression: SNAPPY, Encodings: PLAIN_DICTIONARY PLAIN RLE
Uncompressed Size: 5306, Compressed Size: 5316
上面的代码为整个表/文件写出一个行组。根据您拥有的数据行数,这可能并不理想,因为太多行会导致"回退到纯编码"(请参阅 Ryan Blue 演示文稿,幻灯片 31-34(。 要为每个表/文件写入多个行组,请将chunk_size
参数设置得更小(下面我除以 2 以获得每个表/文件的两个行组(:
parquet::arrow::WriteTable(*arrow_table, ::arrow::default_memory_pool(),
fileOutputStream, sip_array->length()/2, props);
这仍然不理想。 在调用parquet::arrow::WriteTable()
之前,文件的所有数据都必须缓冲/存储在 Arrow 表中,因为该函数会打开和关闭文件。 我想为每个文件写入多个行组,但我只想在内存中一次缓冲/存储一个或两个行组的数据。 以下代码完成此操作。 它基于 parquet/arrow/writer.cc 中的代码:
#include <parquet/util/memory.h>
...
auto arrow_output_stream = std::make_shared<parquet::ArrowOutputStream>(file_output_stream);
std::unique_ptr<parquet::arrow::FileWriter> writer;
parquet::arrow::FileWriter::Open(*(arrow_table->schema()), ::arrow::default_memory_pool(),
arrow_output_stream, props, parquet::arrow::default_arrow_writer_properties(),
&writer);
// write two row groups for the first table
writer->WriteTable(*arrow_table, sip_array->length()/2);
// ... code here would generate a new table ...
// for now, we'll just write out the same table again, to
// simulate writing more data to the same file, this
// time as one row group
writer->WriteTable(*arrow_table, sip_array->length());
writer->Close();
$ /opt/parquet-cpp/build/release/parquet_reader --only-metadata test.parquet File Name: test.parquet
Version: 0
Created By: parquet-cpp version 1.2.1-SNAPSHOT
Total rows: 2000000
Number of RowGroups: 3 <<--------
...
--- Row Group 0 ---
--- Total Bytes 2627115 ---
Rows: 500000---
...
--- Row Group 1 ---
--- Total Bytes 2626873 ---
Rows: 500000---
...
--- Row Group 2 ---
--- Total Bytes 4176371 ---
Rows: 1000000---
- 防止主数据类型C++的隐式转换
- 用于访问容器<T>数据成员的正确 API
- 嵌套在类中时无法设置成员数据
- 使用流处理接收到的数据
- 静态数据成员的问题-修复链接错误会导致编译器错误
- 处理小于cpu数据总线的数据类型.(c++转换为机器代码)
- 在cuda线程之间共享大量常量数据
- C++将文本文件中的数据读取到结构数组中
- 如何在C++中序列化结构数据
- 在C++中打印指向不同基元数据类型的指针的内存地址
- 通过套接字[TCP]传输数据 如何在C / C ++中打包多个整数并使用send() recv()传输数据
- 在c代码之间共享数据的最佳方式
- 如何在没有缓冲的情况下使用 parquet-cpp 写入面向流/行的数据?
- 有没有办法知道deflate(或你的Z_STREAM)是否有数据被缓冲?zlib C++
- 逐行分析缓冲的数据
- 如何确保数据刷新到文件 IO (WriteFile()) 上的 HDD(未缓冲)
- 在网络应用程序中缓冲数据的OOP方法
- 将CUDA中的缓冲加倍,这样CPU就可以对持久化内核产生的数据进行操作
- 我如何缓冲模型矩阵数据到纹理使用OpenGL ES2, GLSL, c++
- opengl =中缓冲区数据的问题仅在我缓冲比需要更多的字节时绘制