如何在没有缓冲的情况下使用 parquet-cpp 写入面向流/行的数据?

How can I write streaming/row-oriented data using parquet-cpp without buffering?

本文关键字:数据 缓冲 情况下 parquet-cpp      更新时间:2023-10-16

我基本上有面向行/流的数据(Netflow(进入我的C++应用程序,我想将数据写入Parquet-gzip文件。

查看 parquet-cpp 项目中的示例 reader-writer.cc 程序,似乎我只能以列式方式将数据馈送到 parquet-cpp:

constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
...
// Append a RowGroup with a specific number of rows.
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup(NUM_ROWS_PER_ROW_GROUP);
// Write the Bool column
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
bool_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the Int32 column
...
// Write the ... column

这似乎意味着我需要自己缓冲NUM_ROWS_PER_ROW_GROUP行,然后循环访问它们,以便将它们一次一列地传输到 parquet-cpp 中。 我希望有更好的方法,因为这似乎效率低下,因为数据需要复制两次:一次复制到我的缓冲区中,然后在将数据一次一列地输入 parquet-cpp 时再次复制。

有没有办法将每一行的数据放入 parquet-cpp 中,而不必先缓冲一堆行? Apache Arrow 项目(parquet-cpp 使用(有一个教程,展示了如何将逐行数据转换为箭头表。 对于每行输入数据,代码将附加到每个列生成器:

for (const data_row& row : rows) {
ARROW_RETURN_NOT_OK(id_builder.Append(row.id));
ARROW_RETURN_NOT_OK(cost_builder.Append(row.cost));

我想用镶木地板做类似的事情。 这可能吗?

您将永远无法完全没有缓冲,因为我们需要从逐行转换为列式表示。在撰写本文时,最好的路径是构建 Apache 箭头表,然后将其输入parquet-cpp.

parquet-cpp提供了特殊的 Arrow API,然后可以直接对这些表进行操作,大多数情况下无需任何其他数据副本。您可以在parquet/arrow/reader.hparquet/arrow/writer.h中找到 API。

最佳但尚未实施的解决方案可以通过执行以下操作来节省一些字节:

  • 在新的镶木地板 API 中逐行摄取
  • 使用指定的编码和压缩设置直接对每列的这些值进行编码
  • 仅在内存中缓冲此内容
  • 在行组的末尾,写出一列又一列

虽然这个最佳解决方案可能会为您节省一些内存,但仍有一些步骤需要由某人实现(随意贡献它们或寻求实现这些步骤的帮助(,您可能擅长使用基于 Apache Arrow 的 API。

我按照@xhochy的建议,在数据到达时使用箭头API填充箭头表,然后使用parquet-cppWriteTable()方法写出表。 我将 GZIP 设置为默认压缩,但为第二个字段指定了 SNAPPY。

#include <iostream>
#include "arrow/builder.h"
#include "arrow/table.h"
#include "arrow/io/file.h"
#include <parquet/arrow/writer.h>
#include <parquet/properties.h>
main() {
arrow::Int32Builder sip_builder(arrow::default_memory_pool());
arrow::Int32Builder dip_builder(arrow::default_memory_pool());
for(size_t i=0; i < 1000; i++) {  // simulate row-oriented incoming data
sip_builder.Append(i*100);
dip_builder.Append(i*10 + i);
}
std::shared_ptr<arrow::Array> sip_array;
sip_builder.Finish(&sip_array);
std::shared_ptr<arrow::Array> dip_array;
dip_builder.Finish(&dip_array);
std::vector<std::shared_ptr<arrow::Field>> schema_definition = {
arrow::field("sip", arrow::int32(), false /* don't allow null; makes field required */),
arrow::field("dip", arrow::int32(), false)
};
auto schema = std::make_shared<arrow::Schema>(schema_definition);
std::shared_ptr<arrow::Table> arrow_table;
MakeTable(schema, {sip_array, dip_array}, &arrow_table);
std::shared_ptr<arrow::io::FileOutputStream> file_output_stream;
arrow::io::FileOutputStream::Open("test.parquet", &file_output_stream);
parquet::WriterProperties::Builder props_builder;
props_builder.compression(parquet::Compression::GZIP);
props_builder.compression("dip", parquet::Compression::SNAPPY);
auto props = props_builder.build();
parquet::arrow::WriteTable(*arrow_table, ::arrow::default_memory_pool(),
file_output_stream, sip_array->length(), props);
std::cout << "done" << std::endl;
}
$ g++ -std=c++11 -I/opt/parquet-cpp/build/release/include -lparquet -larrow arrow-test.cc; ./a.out
done
$ /opt/parquet-cpp/build/release/parquet_reader --only-metadata test.parquet
File Name: test.parquet
Version: 0
Created By: parquet-cpp version 1.2.1-SNAPSHOT
Total rows: 1000
Number of RowGroups: 1            <<----------
Number of Real Columns: 2
Number of Columns: 2
Number of Selected Columns: 2
Column 0: sip (INT32)
Column 1: dip (INT32)
--- Row Group 0 ---
--- Total Bytes 8425 ---
Rows: 1000---
Column 0
, Values: 1000, Null Values: 0, Distinct Values: 0
Max: 99900, Min: 0
Compression: GZIP, Encodings: PLAIN_DICTIONARY PLAIN RLE
Uncompressed Size: 5306, Compressed Size: 3109
Column 1
, Values: 1000, Null Values: 0, Distinct Values: 0
Max: 10989, Min: 0
Compression: SNAPPY, Encodings: PLAIN_DICTIONARY PLAIN RLE
Uncompressed Size: 5306, Compressed Size: 5316

上面的代码为整个表/文件写出一个行组。根据您拥有的数据行数,这可能并不理想,因为太多行会导致"回退到纯编码"(请参阅 Ryan Blue 演示文稿,幻灯片 31-34(。 要为每个表/文件写入多个行组,请将chunk_size参数设置得更小(下面我除以 2 以获得每个表/文件的两个行组(:

parquet::arrow::WriteTable(*arrow_table, ::arrow::default_memory_pool(),
fileOutputStream, sip_array->length()/2, props);

这仍然不理想。 在调用parquet::arrow::WriteTable()之前,文件的所有数据都必须缓冲/存储在 Arrow 表中,因为该函数会打开和关闭文件。 我想为每个文件写入多个行组,但我只想在内存中一次缓冲/存储一个或两个行组的数据。 以下代码完成此操作。 它基于 parquet/arrow/writer.cc 中的代码:

#include <parquet/util/memory.h>
...
auto arrow_output_stream = std::make_shared<parquet::ArrowOutputStream>(file_output_stream);
std::unique_ptr<parquet::arrow::FileWriter> writer;
parquet::arrow::FileWriter::Open(*(arrow_table->schema()), ::arrow::default_memory_pool(), 
arrow_output_stream, props, parquet::arrow::default_arrow_writer_properties(), 
&writer);
// write two row groups for the first table
writer->WriteTable(*arrow_table, sip_array->length()/2);
// ... code here would generate a new table ...
// for now, we'll just write out the same table again, to
// simulate writing more data to the same file, this
// time as one row group
writer->WriteTable(*arrow_table, sip_array->length());
writer->Close();
$ /opt/parquet-cpp/build/release/parquet_reader --only-metadata test.parquet                        File Name: test.parquet
Version: 0
Created By: parquet-cpp version 1.2.1-SNAPSHOT
Total rows: 2000000
Number of RowGroups: 3   <<--------
...
--- Row Group 0 ---
--- Total Bytes 2627115 ---
Rows: 500000---
...
--- Row Group 1 ---
--- Total Bytes 2626873 ---
Rows: 500000---
...
--- Row Group 2 ---
--- Total Bytes 4176371 ---
Rows: 1000000---