使用Rfc4180CsvParser将CSV导入Vertica并排除标题行

Import CSV into Vertica using Rfc4180CsvParser and exclude header row

本文关键字:排除 标题 Vertica 导入 Rfc4180CsvParser CSV 使用      更新时间:2023-10-16

通过Rfc4180CsvParser导入数据时,是否有方法排除标题行?COPY命令有一个SKIP选项,但当使用Vertica SDK中提供的CSV解析器时,该选项似乎不起作用。

背景

作为背景,COPY命令本身不会读取CSV文件。对于简单的CSV文件,可以说COPY schema.table FROM '/data/myfile.csv' DELIMITER ',' ENCLOSED BY '"';,但对于具有嵌入引号的字符串值的数据文件,这将失败。

添加ESCAPE AS '"'将生成错误ERROR 3169: ENCLOSED BY and ESCAPE AS can not be the same value。这是一个问题,因为CSV值由"封装和转义。

Vertica SDK CsvParser救援扩展

Vertica在/opt/vertica/sdk/examples下提供了一个SDK,其中包含可以编译为扩展的C++程序。其中之一是/opt/vertica/sdk/examples/ParserFunctions/Rfc4180CsvParser.cpp

这样做效果很好,如下所示:

 cd /opt/vertica/sdk/examples
 make clean
 vsql 
 ==> CREATE LIBRARY Rfc4180CsvParserLib AS '/opt/vertica/sdk/examples/build/Rfc4180CsvParser.so';
 ==> COPY myschema.mytable FROM '/data/myfile.csv' WITH PARSER Rfc4180CsvParser();

问题

除了将数据文件的第一行作为一行导入之外,上面的操作非常有效。COPY命令有一个SKIP 1选项,但这不适用于解析器。

问题

是否可以编辑Rfc4180CsvParser.cpp来跳过第一行,或者更好的是,使用一些参数来指定要跳过的行数?

这个程序只有135行,但我不知道在哪里/如何做这个切口。提示?

复制下面的整个程序,因为我没有看到公共回购链接到…

Rfc4180CsvParser.cpp

/* Copyright (c) 2005 - 2012 Vertica, an HP company -*- C++ -*- */
#include "Vertica.h"
#include "StringParsers.h"
#include "csv.h"
using namespace Vertica;
// Note, the class template is mostly for demonstration purposes,
// so that the same class can use each of two string-parsers.
// Custom parsers can also just pick a string-parser to use.
/**
 * A parser that parses something approximating the "official" CSV format
 * as defined in IETF RFC-4180:  <http://tools.ietf.org/html/rfc4180>
 * Oddly enough, many "CSV" files don't actually conform to this standard
 * for one reason or another.  But for sources that do, this parser should
 * be able to handle the data.
 * Note that the CSV format does not specify how to handle different
 * data types; it is entirely a string-based format.
 * So we just use standard parsers based on the corresponding column type.
 */
template <class StringParsersImpl>
class LibCSVParser : public UDParser {
public:
    LibCSVParser() : colNum(0) {}
    // Keep a copy of the information about each column.
    // Note that Vertica doesn't let us safely keep a reference to
    // the internal copy of this data structure that it shows us.
    // But keeping a copy is fine.
    SizedColumnTypes colInfo;
    // An instance of the class containing the methods that we're
    // using to parse strings to the various relevant data types
    StringParsersImpl sp;
    /// Current column index
    size_t colNum;
    /// Parsing state for libcsv
    struct csv_parser parser;
    // Format strings
    std::vector<std::string> formatStrings;
    /**
     * Given a field in string form (a pointer to the first character and
     * a length), submit that field to Vertica.
     * `colNum` is the column number from the input file; how many fields
     * it is into the current record.
     */
    bool handleField(size_t colNum, char* start, size_t len) {
        if (colNum >= colInfo.getColumnCount()) {
            // Ignore column overflow
            return false;
        }
        // Empty colums are null.
        if (len==0) {
            writer->setNull(colNum);
            return true;
        } else {
            return parseStringToType(start, len, colNum, colInfo.getColumnType(c
olNum), writer, sp);
        }
    }
    static void handle_record(void *data, size_t len, void *p) {
        static_cast<LibCSVParser*>(p)->handleField(static_cast<LibCSVParser*>(p)
->colNum++, (char*)data, len);
    }
    static void handle_end_of_row(int c, void *p) {
        // Ignore 'c' (the terminating character); trust that it's correct
        static_cast<LibCSVParser*>(p)->colNum = 0;
        static_cast<LibCSVParser*>(p)->writer->next();
    }
    virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input
, InputState input_state) {
        size_t processed;
        while ((processed = csv_parse(&parser, input.buf + input.offset, input.s
ize - input.offset,
                handle_record, handle_end_of_row, this)) > 0) {
            input.offset += processed;
        }
        if (input_state == END_OF_FILE && input.size == input.offset) {
            csv_fini(&parser, handle_record, handle_end_of_row, this);
            return DONE;
        }
        return INPUT_NEEDED;
    }
    virtual void setup(ServerInterface &srvInterface, SizedColumnTypes &returnTy
pe);
    virtual void destroy(ServerInterface &srvInterface, SizedColumnTypes &return
Type) {
        csv_free(&parser);
    }
};
template <class StringParsersImpl>
void LibCSVParser<StringParsersImpl>::setup(ServerInterface &srvInterface, Sized
ColumnTypes &returnType) {
    csv_init(&parser, CSV_APPEND_NULL);
    colInfo = returnType;
}
template <>
void LibCSVParser<FormattedStringParsers>::setup(ServerInterface &srvInterface, 
SizedColumnTypes &returnType) {
    csv_init(&parser, CSV_APPEND_NULL);
    colInfo = returnType;
    if (formatStrings.size() != returnType.getColumnCount()) {
        formatStrings.resize(returnType.getColumnCount(), "");
    }
    sp.setFormats(formatStrings);
}
template <class StringParsersImpl>
class LibCSVParserFactoryTmpl : public ParserFactory {
public:
    virtual void plan(ServerInterface &srvInterface,
            PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt) {}
    virtual UDParser* prepare(ServerInterface &srvInterface,
            PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt,
            const SizedColumnTypes &returnType)
    {
        return vt_createFuncObj(srvInterface.allocator,
                LibCSVParser<StringParsersImpl>);
    }
};
typedef LibCSVParserFactoryTmpl<StringParsers> LibCSVParserFactory;
RegisterFactory(LibCSVParserFactory);
typedef LibCSVParserFactoryTmpl<FormattedStringParsers> FormattedLibCSVParserFac
tory;
RegisterFactory(FormattedLibCSVParserFactory);

快速而肮脏的方法是对其进行硬编码。它使用对handle_end_of_row的回调。跟踪行号,只是不处理第一行。类似于:

static void handle_end_of_row(int c, void *ptr) {
    // Ignore 'c' (the terminating character); trust that it's correct
    LibCSVParser *p = static_cast<LibCSVParser*>(ptr);
    p->colNum = 0;
    if (rowcnt <= 0) {
        p->bad_field = "";
        rowcnt++;
    } else if (p->bad_field.empty()) {
        p->writer->next();
    } else {
        // libcsv doesn't give us the whole row to reject.
        // So just write to the log.
        // TODO: Come up with something more clever.
        if (p->currSrvInterface) {
            p->currSrvInterface->log("Invalid CSV field value: '%s'  Row skipped.",
                                    p->bad_field.c_str());
        }
        p->bad_field = "";
    }
}

此外,最好在process中初始化rownum = 0,因为我认为它会为COPY语句中的每个文件调用这个。也许还有更聪明的方法可以做到这一点。基本上,这只是处理记录,然后丢弃它

至于一般支持SKIP。。。关于如何处理参数传递,请参阅TraditionalCSVParser。您必须将其添加到解析器因子prepare中,并将该值发送到LibCSVParser类并覆盖getParameterType。然后在LibCSVParser中,您需要接受构造函数中的参数,并修改process以跳过第一个skip行。然后使用该值,而不是上面的硬编码0