Java是否有类似于c ++的组合功能?

Does Java have something similar to c++'s combinable?

本文关键字:组合 功能 是否 类似于 Java      更新时间:2023-10-16

我使用.parallelStream().forEach()做一堆写到DB,但也想保持我写的行计数。

现在,我有一个java.util.concurrent.atomic.AtomicInteger来跟踪计数,但我想使用类似于c++的combinable<>的东西。

在本文中,有一个使用combinable<>的示例:
#include <iostream>
#include <cstdint>
#include <ppl.h>
using namespace Concurrency;
const int max_sum_item = 1000000000;
int main()
{
    combinable<uint64_t> part_sums([] { return 0; });
    parallel_for(0, max_sum_item,
        [&part_sums] (int i)
        {
            part_sums.local() += i;
        }
    );
    uint64_t result = part_sums.combine(std::plus<uint64_t>());
    if (result != uint64_t(499999999500000000))
        throw;
}

是否有一个等效的combinable<>类在Java中?

Java代码片段:
AtomicInteger totalRows = new AtomicInteger(0);
...
myList.parallelStream().forEach(
    ... // write to db
    totalRows.addAndGet(rowsWritten);
    ...
);
print(totalRows.get());

查找如下内容:

Combinable<int> totalRows = new Combinable<>(0);
...
myList.parallelStream().forEach(
    ... // write to db
    totalRows = rowsWritten;
    ...
);
print(totalRows.combine());

编辑:根据@zero323, Spark中正确的工具将是Accumulator。我对多线程的情况更感兴趣,但手头没有一个非spark的例子。

EDIT2:更新示例(并删除Spark引用)

Spark中合适的工具是Accumulator:

Accumulator<Integer> accum = sc.accumulator(0);
myRDD.parallelStream().forEach(
    accum.add(1);
);
accum.value();
从工作者的角度来看,

累加器是只写的,并且只能由驱动程序读取。默认情况下,它只支持Long, DoubleFloat,但您可以实现自定义AccumulatorParam来支持其他类型。