如何使用 std::atomic 实现无锁计数器
How to implement lock-free counter with std::atomic?
在我的程序中,多个线程(检查器)请求网页,如果这些页面包含一些数据,另一个线程(消费者)处理数据。我只需要预定义的使用者计数即可开始处理(不是全部)。我尝试使用 std::atomic 计数器和fetch_add来限制工作消费者数量。但是,尽管计数器保持在边界内,但使用者获得相同的计数器值,并且实际处理消费者计数超过限制。行为取决于处理持续时间。简化代码包含sleep_for而不是获取页面和处理页面函数。
#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>
class cConsumer
{
public:
cConsumer::cConsumer(
const size_t aNumber,
std::atomic<bool> &aFire,
std::atomic<size_t> &aCounter) :
mNumber(aNumber),
mFire(aFire),
mCounter(aCounter){}
void cConsumer::operator ()()
{
while (true)
{
while (!mFire.load()) std::this_thread::sleep_for(mMillisecond);
size_t vCounter = mCounter.fetch_add(1);
if (vCounter < 5)
{
std::cout << " FIRE! consumer " << mNumber << ", counter " << vCounter << "n";
std::this_thread::sleep_for(mWorkDuration);
}
if (vCounter == 5)
{
mFire.store(false);
mCounter.store(0);
}
}
}
private:
static const std::chrono::milliseconds
mMillisecond,
mWorkDuration;
const size_t mNumber;
std::atomic<bool> &mFire;
std::atomic<size_t> &mCounter;
};
const std::chrono::milliseconds
cConsumer::mMillisecond(1),
cConsumer::mWorkDuration(1300);
class cChecker
{
public:
cChecker(
const size_t aNumber,
std::atomic<bool> &aFire) :
mNumber(aNumber),
mFire(aFire),
mStep(1){ }
void cChecker::operator ()()
{
while (true)
{
while (mFire.load()) std::this_thread::sleep_for(mMillisecond);
std::cout << "checker " << mNumber << " step " << mStep << "n";
std::this_thread::sleep_for(mCheckDuration);
if (mStep % 20 == 1) mFire.store(true);
mStep++;
}
}
private:
static const std::chrono::milliseconds
mMillisecond,
mCheckDuration;
const size_t mNumber;
size_t mStep;
std::atomic<bool> &mFire;
};
const std::chrono::milliseconds
cChecker::mMillisecond(1),
cChecker::mCheckDuration(500);
void main()
{
std::atomic<bool> vFire(false);
std::atomic<size_t> vCounter(0);
std::thread vConsumerThreads[16];
for (size_t i = 0; i < 16; i++)
{
std::thread vConsumerThread((cConsumer(i, vFire, vCounter)));
vConsumerThreads[i] = std::move(vConsumerThread);
}
std::chrono::milliseconds vNextCheckerDelay(239);
std::thread vCheckerThreads[3];
for (size_t i = 0; i < 3; i++)
{
std::thread vCheckerThread((cChecker(i, vFire)));
vCheckerThreads[i] = std::move(vCheckerThread);
std::this_thread::sleep_for(vNextCheckerDelay);
}
for (size_t i = 0; i < 16; i++) vConsumerThreads[i].join();
for (size_t i = 0; i < 3; i++) vCheckerThreads[i].join();
}
输出示例(部分)
...
checker 1 step 19
checker 0 step 20
checker 2 step 19
checker 1 step 20
checker 0 step 21
checker 2 step 20
checker 1 step 21
FIRE! consumer 10, counter 0
FIRE! consumer 13, counter 4
FIRE! consumer 6, counter 1
FIRE! consumer 0, counter 2
FIRE! consumer 2, counter 3
checker 0 step 22
checker 2 step 21
FIRE! consumer 5, counter 3
FIRE! consumer 7, counter 4
FIRE! consumer 4, counter 1
FIRE! consumer 15, counter 2
FIRE! consumer 8, counter 0
checker 1 step 22
FIRE! consumer 9, counter 0
FIRE! consumer 11, counter 1
FIRE! consumer 3, counter 2
FIRE! consumer 14, counter 3
FIRE! consumer 1, counter 4
checker 0 step 23
checker 2 step 22
checker 1 step 23
checker 2 step 23
checker 0 step 24
checker 1 step 24
我找到了一个有效但不优雅的解决方案:等待所有消费者尝试工作并了解火已经关闭。
#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>
class cConsumer
{
public:
cConsumer::cConsumer(
const size_t aNumber,
const size_t aConsumerCount,
std::atomic<bool> &aFire,
std::atomic<size_t> &aCounter) :
mNumber(aNumber),
mConsumerCount(aConsumerCount),
mFire(aFire),
mCounter(aCounter){}
void cConsumer::operator ()()
{
while (true)
{
while (!mFire.load()) std::this_thread::sleep_for(mMillisecond);
const size_t vCounter = mCounter.fetch_add(1);
if (vCounter < 5)
{
std::cout << " FIRE! consumer " << mNumber << ", counter " << vCounter << "n";
std::this_thread::sleep_for(mWorkDuration); //stub for process function
}
if (vCounter >= 5)
{
std::this_thread::sleep_for(mWorkDuration); //wait for other threads to increase counter
std::this_thread::sleep_for(mWorkDuration); //double wait for long processing
mFire.store(false);
}
if (vCounter == mConsumerCount)
{
mCounter.store(0);
}
}
}
private:
static const std::chrono::milliseconds
mMillisecond,
mWorkDuration;
const size_t
mNumber,
mConsumerCount;
std::atomic<bool> &mFire;
std::atomic<size_t> &mCounter;
};
const std::chrono::milliseconds
cConsumer::mMillisecond(1),
cConsumer::mWorkDuration(1300);
class cChecker
{
public:
cChecker(
const size_t aNumber,
std::atomic<bool> &aFire) :
mNumber(aNumber),
mFire(aFire),
mStep(1){ }
void cChecker::operator ()()
{
while (true)
{
while (mFire.load()) std::this_thread::sleep_for(mMillisecond);
std::cout << "checker " << mNumber << " step " << mStep << "n";
std::this_thread::sleep_for(mCheckDuration);
if (mStep % 20 == 1) mFire.store(true);
mStep++;
}
}
private:
static const std::chrono::milliseconds
mMillisecond,
mCheckDuration;
const size_t mNumber;
size_t mStep;
std::atomic<bool> &mFire;
};
const std::chrono::milliseconds
cChecker::mMillisecond(1),
cChecker::mCheckDuration(500);
void main()
{
std::atomic<bool> vFire(false);
std::atomic<size_t> vCouter(0);
std::thread vConsumerThreads[16];
for (size_t i = 0; i < 16; i++)
{
vConsumerThreads[i] = std::move(std::thread(cConsumer(i, 16, vFire, vCouter)));
}
std::chrono::milliseconds vNextCheckerDelay(239);
std::thread vCheckerThreads[3];
for (size_t i = 0; i < 3; i++)
{
vCheckerThreads[i] = std::move(std::thread(cChecker(i, vFire)));
std::this_thread::sleep_for(vNextCheckerDelay);
}
for (size_t i = 0; i < 16; i++) vConsumerThreads[i].join();
for (size_t i = 0; i < 3; i++) vCheckerThreads[i].join();
我认为存在更好的解决方案。
这里会发生什么?
运气好的话,一旦你放火,通过这条线的工人可能比 5 多得多:
while(!mFire.load()) std::this_thread::sleep_for(mMillisecond);
假设有 10 个工人醒着,该计数器为 0。然后,每 10 个工作人员将执行以下操作:
size_t vCounter = mCouter.fetch_add(1);
现在,这 10 名工人中的每一个都有一个介于 1 和 11 之间的不同计数器。 5 首先将执行 if 子句:
if(vCounter < 5)
任何具有更高计数器的线程将继续。 其中第6个线程,将重置火灾并重置计数器:
if(vCounter == 5)
{
mFire.store(false);
mCouter.store(0);
cout << "RESET!!!!!! by consume "<<mNumber << endl; // useful to understand
}
然后,所有这些空闲线程将继续循环,等待下一次触发。
但是现在坏事可能会发生,因为你有一些工人还在工作,你有一堆跳棋等待再次放火:
while(mFire.load()) std::this_thread::sleep_for(mMillisecond);
... // now that fire is reset, they will go on
有些可以达到以下行:
if(mStep % 20 == 1) {
mFire.store(true);
cout << "SET FIRE" << endl; // to make the problem visual
}
由于原子计数器为 0,因此除了仍在运行的工作线程外,您还将立即拥有 5 个新工作线程,这些工作线程将开始新工作。
你能做些什么呢?
我并不完全清楚你打算做什么:
- 您想让 5 名工人为每场新火灾活动吗? 在这种情况下,就像你所做的那样没关系。然后,工人总数可能超过5人。
您想在任何时刻让最多 5 个工作人员处于活动状态吗? 在这种情况下,您永远不应该像以前那样将工作线程数重置为 0,但您应该递减所有递增它的线程的计数器。 因此,conter 将包含当前在火灾处理部分中的线程数:
while(true) { while(!mFire.load()) std::this_thread::sleep_for(mMillisecond); size_t vCounter = mCouter.fetch_add(1); // FIRE PROCESSING: INCREMENT COUNTER if(vCounter < 5) { std::cout << " FIRE! consumer " << mNumber << ", counter " << vCounter << "n"; std::this_thread::sleep_for(mWorkDuration); std::cout << " finished consumer "<< mNumber<<endl; } if(vCounter == 5) { mFire.store(false); //mCouter.store(0); cout << "RESET!!!!!! by consumer "<<mNumber << endl; } mCouter.fetch_sub(1); // END OF PROCESSING: DECREMENT COUNTER
可能的解决方案是为消费者完成标志使用辅助数组。当使用者完成处理时,它会存储 true 到其完成的数组单元。一个额外的控制线程扫描完成数组中所有单元格是否为真,并重置程序状态。
#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>
class cConsumer
{
public:
cConsumer::cConsumer(
const size_t aNumber,
const size_t aFiresLimit,
std::atomic<bool> &aFire,
std::atomic<bool> &aDone,
std::atomic<size_t> &aCounter) :
mNumber(aNumber),
mFiresLimit(aFiresLimit),
mFire(aFire),
mDone(aDone),
mCounter(aCounter){}
void cConsumer::operator ()()
{
while (true)
{
while (!mFire.load()) std::this_thread::sleep_for(mMillisecond);
const size_t vCounter = mCounter.fetch_add(1);
if (vCounter < mFiresLimit)
{
std::cout << " FIRE! consumer " << mNumber << ", counter " << vCounter << "n";
std::this_thread::sleep_for(mWorkDuration); // instead real processing
}
mDone.store(true);
while (mDone.load()) std::this_thread::sleep_for(mMillisecond);
}
}
private:
static const std::chrono::milliseconds
mMillisecond,
mWorkDuration;
const size_t
mNumber,
mFiresLimit;
std::atomic<bool>
&mFire,
&mDone;
std::atomic<size_t> &mCounter;
};
const std::chrono::milliseconds
cConsumer::mMillisecond(1),
cConsumer::mWorkDuration(1300);
class cChecker
{
public:
cChecker(
const size_t aNumber,
std::atomic<bool> &aFire) :
mNumber(aNumber),
mFire(aFire),
mStep(1){ }
void cChecker::operator ()()
{
while (true)
{
while (mFire.load()) std::this_thread::sleep_for(mMillisecond);
std::cout << "checker " << mNumber << " step " << mStep << "n";
std::this_thread::sleep_for(mCheckDuration);
if (mStep % 20 == 1) // dummy condition instead real checker function
{
mFire.store(true);
}
mStep++;
}
}
private:
static const std::chrono::milliseconds
mMillisecond,
mCheckDuration;
const size_t mNumber;
size_t mStep;
std::atomic<bool> &mFire;
};
const std::chrono::milliseconds
cChecker::mMillisecond(1),
cChecker::mCheckDuration(500);
class cController
{
public:
cController(
const size_t aConsumerCount,
std::atomic<bool> &aFire,
std::atomic<bool> * const aConsumersDone,
std::atomic<size_t> &aCounter) :
mConsumerCount(aConsumerCount),
mFire(aFire),
mConsumersDone(aConsumersDone),
mCounter(aCounter){}
void cController::operator ()()
{
while (true)
{
while(!mFire.load()) std::this_thread::sleep_for(mMillisecond);
bool vAllConsumersDone = false;
while (!vAllConsumersDone)
{
size_t i = 0;
while ((i < mConsumerCount) && (mConsumersDone[i].load())) i++;
vAllConsumersDone = (i == mConsumerCount);
std::this_thread::sleep_for(mMillisecond);
}
mFire.store(false);
for (size_t i = 0; i < mConsumerCount; i++) mConsumersDone[i].store(false);
mCounter.store(0);
}
}
private:
const size_t mConsumerCount;
static const std::chrono::milliseconds mMillisecond;
std::atomic<bool>
&mFire,
* const mConsumersDone;
std::atomic<size_t> &mCounter;
};
const std::chrono::milliseconds cController::mMillisecond(1);
void main()
{
static const size_t
vCheckerCount = 3,
vConsumersCount = 16,
vFiresLimit = 5;
std::atomic<bool> vFire(false);
std::atomic<bool> vConsumersDone[vConsumersCount];
for (size_t i = 0; i < vConsumersCount; i++) vConsumersDone[i].store(false);
std::atomic<size_t> vCounter(0);
std::thread vControllerThread(cController(vConsumersCount, vFire, vConsumersDone, vCounter));
std::thread vConsumerThreads[vConsumersCount];
for (size_t i = 0; i < vConsumersCount; i++)
{
vConsumerThreads[i] = std::move(std::thread(cConsumer(i, vFiresLimit, vFire, vConsumersDone[i], vCounter)));
}
std::chrono::milliseconds vNextCheckerDelay(239);
std::thread vCheckerThreads[vCheckerCount];
for (size_t i = 0; i < vCheckerCount; i++)
{
vCheckerThreads[i] = std::move(std::thread(cChecker(i, vFire)));
std::this_thread::sleep_for(vNextCheckerDelay);
}
for (size_t i = 0; i < vConsumersCount; i++) vConsumerThreads[i].join();
for (size_t i = 0; i < vCheckerCount; i++) vCheckerThreads[i].join();
vControllerThread.join();
}
输出(部分)示例:
...
checker 2 step 19
checker 1 step 19
checker 0 step 19
checker 2 step 20
checker 0 step 20
checker 1 step 20
checker 2 step 21
checker 0 step 21
checker 1 step 21
FIRE! consumer 11, counter 0
FIRE! consumer 3, counter 2
FIRE! consumer 4, counter 3
FIRE! consumer 10, counter 4
FIRE! consumer 14, counter 1
checker 0 step 22
checker 2 step 22
checker 1 step 22
checker 2 step 23
checker 0 step 23
checker 1 step 23
checker 2 step 24
checker 0 step 24
- 如果没有malloc,链表实现将失败
- 如何在c++中实现处理器调度模拟器
- 如何在c++中使用引用实现类似python的行为
- 实现无开销push_back的最佳方法是什么
- 使用简单类型列表实现的指数编译时间.为什么
- 循环在计数器中不起作用
- 如何在BST的这个简单递归实现中消除警告
- 实现一个在集合上迭代的模板函数
- 我应该实现右值推送功能吗?我应该使用std::move吗
- 映射构造函数,同时在C++中实现计数器
- 用32位原子实现64位原子计数器
- 实现限制为 3 个指针的引用计数器
- C++实现了带有引用计数器的sharedpointer函数
- 如何实现不会溢出的原子参考计数器
- 快速实现大整数计数器(在C/C 中)
- 在C++11中实现共享整数计数器而不使用互斥的最简单方法:
- 引用计数器实现,= 运算符重载错误
- 在堆栈程序中实现一个计数器,用于计算对象移动的次数
- 使用std::bitset实现一个二进制计数器
- 如何使用 std::atomic 实现无锁计数器