R并行写入SEXP结构

R parallel write SEXP structure

本文关键字:SEXP 结构 并行      更新时间:2023-10-16

我正在使用C/C++代码开发R中的数据处理模块,主要是出于速度原因。这是我问题的事实清单。

  • 最终结果数据是字符串向量的列表,占用20MB到200MB的内存
  • 数据处理可以适用于单生产者/多消费者模型
  • 对于我的数据,wrapvector<vector<string> >转换为List需要相当长的时间

因此,我打算直接在SEXP结构中工作,这样我就可以节省最终转换的时间。我的主要功能是这样的。

boost::atomic<bool> done(false);
SEXP myfun(...) {
    ...
    SEXP sdataStr;
    PROTECT(sdataStr=allocVector(VECSXP, nElem));
    vector<SEXP> dataStr(nElem);
    for (int i=0; i<nElem; ++i) {
         dataStr[i]=SET_VECTOR_ELT(sdataStr, i, allocVector(STRSXP, n));
    }
    Producer producer(&queue);
    Consumer consumer1(dataStr, nElem, &queue);
    Consumer consumer2(dataStr, nElem, &queue);
    boost::thread produce(producer);
    boost::thread consume1(consumer1);
    boost::thread consume2(consumer2);
    produce.join();
    done=true;
    consume1.join();
    consume2.join();
    UNPROTECT(1);
    return sdataStr;
}

我的消费者类看起来像这个

class Consumer {
    vector<SEXP>& m_dataStr;
    boost::lockfree::queue<buffer>* m_queue;
    buffer m_buffer;
    public:
    Consumer(vector<SEXP>& dataStr, boost::lockfree::queue<buffer>* queue) : m_dataStr(dataStr), m_queue(queue) {}
    void operator()() {
        while (!done) {
            while (m_queue->pop(m_buffer)) {
                process_item();
            }
        }
        while (m_queue->pop(m_buffer)) {
            process_item();
        }
    }
    private:
    process_item() {
        ...
        // for some 0<=idx<nElem, 0<=i<n, some char* f and integer len
        SET_STRING_ELT(m_dataStr[idx], i, mkCharLen(f,len));
        ...
    }
}

这些是我唯一使用Rinternals的地方。程序的逻辑确保不同线程对同一位置的写入永远不会发生,即Consumer类中的idxi组合最多只能发生一次。我遇到了各种奇怪的问题,比如"堆栈不平衡",或者"进入错误的一代"等等。有什么我遗漏的吗?或者不建议在多个线程中调用SET_STRING_ELT?非常感谢!

C/R API函数不应在线程中调用,除非您知道自己在做什么,例如mkCharLen可能会修改用于所有R字符串的内部哈希表,因此无法在线程中进行调用。SET_STRING_ELT可能在线程中也不可用,尤其是在写屏障打开的情况下。