多线程事件系统

Multithreaded event system

本文关键字:事件系统 多线程      更新时间:2023-10-16

我正在尝试用C++设计一个多线程事件系统。在它中,对象可能位于不同的线程中,并且每个对象都应该能够对其他线程的事件进行排队。每个线程都有自己的事件队列和事件调度器,以及一个事件循环。应该可以更改对象的线程相关性。

假设我们有两个线程:A和B,以及一个属于B的对象myobj。显然,A需要一个指向myobj的指针才能向其发送事件。A没有指向B的指针,但它需要某种方法来获得对它的引用,才能锁定事件队列并将事件添加到其中。

我可以在myobj中存储指向B的指针,但显然我需要保护myobj。如果我在myobj中放置一个互斥对象,那么在互斥对象被锁定时,myobj可能会被破坏,从而导致分段错误。

我还可以使用一个全局表,将每个对象与其对应的线程关联起来。然而,这将消耗大量内存,并导致任何想要发送事件的线程阻塞,直到a完成为止ed.

实现这一点最有效的安全策略是什么?这可能有某种设计模式吗?

提前谢谢。

我实现了一个线程包装基类ThreadEventComponent,用于在其实例之间发送和处理事件。每个ThreadEventComponent都有自己的事件队列,无论何时使用,该队列都会在内部自动锁定。事件本身由map<EventKey, vector<ThreadEventComponent*>>类型的静态映射协商,该静态映射在使用时也会自动锁定。正如您所看到的,多个ThreadEventComponent派生实例可以订阅同一事件。使用SendEvent(Event*)发送的每个事件都按实例进行复制,以确保多个线程不会为事件中保存的相同数据而发生冲突。

诚然,与共享内存相比,这不是最有效的策略。对于addEvent(Event&)方法,需要进行一些优化。抛开缺点不谈,它确实可以很好地将线程配置为在主线程之外执行某些操作。

MainLoop()ProcessEvent(Event*)都是要由派生类实现的虚拟函数。只要队列中有可用的事件,就会调用ProcessEvent(Event*)。之后,无论事件队列状态如何,都会调用MainLoop()MainLoop()是您应该告诉线程睡眠的地方,也是任何其他操作(如文件读/写或网络读/写)应该进行的地方。

下面的代码是我一直在为自己的人编写的,用来让我的头脑沉浸在C++中的线程中。这个代码从未被审查过,所以我很乐意听到你的任何建议。我知道在这个代码示例中有两个元素不太理想。1) 我在运行时使用new,缺点是查找内存需要时间,但可以通过在ThreadEventComponent基类中创建一个内存缓冲区来构造新的事件来减轻这种情况。2) 如果在ProcessEvent中未正确实现,则Event转换为TEvent<T>可能会导致运行时错误。我不确定什么是最好的解决方案。

注意:我将EventKey实现为字符串,但您可以将其更改为任何类型,只要它具有默认值以及可用的相等运算符和赋值运算符即可。

事件.h

#include <string>
using namespace std;
typedef string EventKey;
class Event
{
public:
Event()
: mKey()
{
}
Event(EventKey key)
: mKey(key)
{
}
Event(const Event& e)
: mKey(e.mKey)
{
}
virtual ~Event()
{
}
EventKey GetKey()
{
return mKey;
}
protected:
EventKey mKey;
};
template<class T>
class TEvent : public Event
{
public:
TEvent()
: Event()
{
}
TEvent(EventKey type, T& object)
: Event(type), mObject(object)
{
}
TEvent(const TEvent<T>& e)
: Event(e.mKey), mObject(e.mObject)
{
}
virtual ~TEvent()
{
}
T& GetObject()
{
return mObject;
}
private:
T mObject;
};

ThreadEventComponent.h

#include "Event.h"
#include <thread>
#include <atomic>
#include <algorithm>
#include <vector>
#include <queue>
#include <map>
#include <mutex>
#include <assert.h>
class ThreadEventComponent
{
public:
ThreadEventComponent();
~ThreadEventComponent();
void Start(bool detached = false);
void Stop();
void ForceStop();
void WaitToFinish();
virtual void Init() = 0;
virtual void MainLoop() = 0;
virtual void ProcessEvent(Event* incoming) = 0;
template<class T>
void SendEvent(TEvent<T>& e)
{
sEventListLocker.lock();
EventKey key = e.GetKey();
for (unsigned int i = 0; i < sEventList[key].size(); i++)
{
assert(sEventList[key][i] != nullptr);
sEventList[key][i]->addEvent<T>(e);
}
sEventListLocker.unlock();
}
void SendEvent(Event& e);
void Subscribe(EventKey key);
void Unsubscribe(EventKey key);
protected:
template<class T>
void addEvent(TEvent<T>& e)
{
mQueueLocker.lock();
// The event gets copied per thread
mEventQueue.push(new TEvent<T>(e));
mQueueLocker.unlock();
}
void addEvent(Event& e);
thread mThread;
atomic<bool> mShouldExit;
private:
void threadLoop();
queue<Event*> mEventQueue;
mutex mQueueLocker;
typedef map<EventKey, vector<ThreadEventComponent*>> EventMap;
static EventMap sEventList;
static mutex sEventListLocker;
};

ThreadEventComponent.cpp

#include "ThreadEventComponent.h"
ThreadEventComponent::EventMap ThreadEventComponent::sEventList = ThreadEventComponent::EventMap();
std::mutex ThreadEventComponent::sEventListLocker;
ThreadEventComponent::ThreadEventComponent()
{
mShouldExit = false;
}
ThreadEventComponent::~ThreadEventComponent()
{
}
void ThreadEventComponent::Start(bool detached)
{
mShouldExit = false;
mThread = thread(&ThreadEventComponent::threadLoop, this);
if (detached)
mThread.detach();
}
void ThreadEventComponent::Stop()
{
mShouldExit = true;
}
void ThreadEventComponent::ForceStop()
{
mQueueLocker.lock();
while (!mEventQueue.empty())
{
delete mEventQueue.front();
mEventQueue.pop();
}
mQueueLocker.unlock();
mShouldExit = true;
}
void ThreadEventComponent::WaitToFinish()
{
if(mThread.joinable())
mThread.join();
}
void ThreadEventComponent::SendEvent(Event& e)
{
sEventListLocker.lock();
EventKey key = e.GetKey();
for (unsigned int i = 0; i < sEventList[key].size(); i++)
{
assert(sEventList[key][i] != nullptr);
sEventList[key][i]->addEvent(e);
}
sEventListLocker.unlock();
}
void ThreadEventComponent::Subscribe(EventKey key)
{
sEventListLocker.lock();
if (find(sEventList[key].begin(), sEventList[key].end(), this) == sEventList[key].end())
{
sEventList[key].push_back(this);
}
sEventListLocker.unlock();
}
void ThreadEventComponent::Unsubscribe(EventKey key)
{
sEventListLocker.lock();
// Finds event listener of correct type
EventMap::iterator mapIt = sEventList.find(key);
assert(mapIt != sEventList.end());
// Finds the pointer to itself
std::vector<ThreadEventComponent*>::iterator elIt =
std::find(mapIt->second.begin(), mapIt->second.end(), this);
assert(elIt != mapIt->second.end());
// Removes it from the event list
mapIt->second.erase(elIt);
sEventListLocker.unlock();
}
void ThreadEventComponent::addEvent(Event& e)
{
mQueueLocker.lock();
// The event gets copied per thread
mEventQueue.push(new Event(e));
mQueueLocker.unlock();
}
void ThreadEventComponent::threadLoop()
{
Init();
bool shouldExit = false;
while (!shouldExit)
{
if (mQueueLocker.try_lock())
{
if (mEventQueue.empty())
{
mQueueLocker.unlock();
if(mShouldExit)
shouldExit = true;
}
else
{
Event* e = mEventQueue.front();
mEventQueue.pop();
mQueueLocker.unlock();
ProcessEvent(e);
delete e;
}
}
MainLoop();
}
}

示例类别-A.h

#include "ThreadEventComponent.h"
class A : public ThreadEventComponent
{
public:
A() : ThreadEventComponent()
{
}
void Init()
{
Subscribe("a stop");
Subscribe("a");
}
void MainLoop()
{
this_thread::sleep_for(50ms);
}
void ProcessEvent(Event* incoming)
{
if (incoming->GetKey() == "a")
{
auto e = static_cast<TEvent<vector<int>>*>(incoming);
mData = e->GetObject();
for (unsigned int i = 0; i < mData.size(); i++)
{
mData[i] = sqrt(mData[i]);
}
SendEvent(TEvent<vector<int>>("a done", mData));
}
else if(incoming->GetKey() == "a stop")
{
StopWhenDone();
}
}
private:
vector<int> mData;
};

示例类-B.h

#include "ThreadEventComponent.h"
int compare(const void * a, const void * b)
{
return (*(int*)a - *(int*)b);
}
class B : public ThreadEventComponent
{
public:
B() : ThreadEventComponent()
{
}
void Init()
{
Subscribe("b stop");
Subscribe("b");
}
void MainLoop()
{
this_thread::sleep_for(50ms);
}
void ProcessEvent(Event* incoming)
{
if (incoming->GetKey() == "b")
{
auto e = static_cast<TEvent<vector<int>>*>(incoming);
mData = e->GetObject();
qsort(&mData[0], mData.size(), sizeof(int), compare);
SendEvent(TEvent<vector<int>>("b done", mData));
}
else if (incoming->GetKey() == "b stop")
{
StopWhenDone();
}
}
private:
vector<int> mData;
};

测试示例-main.cpp

#include <iostream>
#include <random>
#include "A.h"
#include "B.h"
class Master : public ThreadEventComponent
{
public:
Master() : ThreadEventComponent()
{
}
void Init()
{
Subscribe("a done");
Subscribe("b done");
}
void MainLoop()
{
this_thread::sleep_for(50ms);
}
void ProcessEvent(Event* incoming)
{
if (incoming->GetKey() == "a done")
{
TEvent<vector<int>>* e = static_cast<TEvent<vector<int>>*>(incoming);
cout << "A finished" << endl;
mDataSetA = e->GetObject();
for (unsigned int i = 0; i < mDataSetA.size(); i++)
{
cout << mDataSetA[i] << " ";
}
cout << endl << endl;
}
else if (incoming->GetKey() == "b done")
{
TEvent<vector<int>>* e = static_cast<TEvent<vector<int>>*>(incoming);
cout << "B finished" << endl;
mDataSetB = e->GetObject();
for (unsigned int i = 0; i < mDataSetB.size(); i++)
{
cout << mDataSetB[i] << " ";
}
cout << endl << endl;
}
}
private:
vector<int> mDataSetA;
vector<int> mDataSetB;
};
int main()
{
srand(time(0));
A a;
B b;
a.Start();
b.Start();
vector<int> data;
for (int i = 0; i < 100; i++)
{
data.push_back(rand() % 100);
}
Master master;
master.Start();
master.SendEvent(TEvent<vector<int>>("a", data));
master.SendEvent(TEvent<vector<int>>("b", data));
master.SendEvent(TEvent<vector<int>>("a", data));
master.SendEvent(TEvent<vector<int>>("b", data));
master.SendEvent(Event("a stop"));
master.SendEvent(Event("b stop"));
a.WaitToFinish();
b.WaitToFinish();
// cin.get();
master.StopWhenDone();
master.WaitToFinish();
return EXIT_SUCCESS;
}

我自己没有用过,但Boost.Signals2声称是线程安全的。

Boost.Signals2的主要动机是提供一个可以在多线程环境中安全使用的原始Boost.Ssignals库版本。

当然,使用它会使您的项目依赖于boost,这可能不符合您的兴趣。

[edit]插槽似乎是在发出线程中执行的(没有队列),所以这可能不是你想要的。

我会考虑将线程作为类的一部分来封装它们。这样,您就可以轻松地围绕线程循环(作为这些类的成员函数提供)设计接口,并定义入口点来向线程循环发送数据(例如,使用受互斥体保护的std::队列)。

我不知道这是否是一个指定的、众所周知的设计模式,但这就是我在工作中整天使用的高效代码,我(和我的同事)对它的感觉和体验都很好。

我试着给你一点:

class A {
public:
A() {}
bool start();
bool stop();
bool terminate() const;
void terminate(bool value);
int data() const;
void data(int value);
private:
std::thread thread_;
void threadLoop();
bool terminate_;
mutable std::mutex internalDataGuard_;
int data_;
};

bool A::start() {
thread_ = std::thread(std::bind(this,threadLoop));
return true;
}
bool A::stop() {
terminate(true);
thread_.join();
return true;
}
bool A::terminate() const {
std::lock_guard<std::mutex> lock(internalDataGuard_);
return terminate_;
}
void A::terminate(bool value) {
std::lock_guard<std::mutex> lock(internalDataGuard_);
terminate_ = value;
}
int A::data() const {
std::lock_guard<std::mutex> lock(internalDataGuard_);
return data_;
}
void A::data(int value) {
std::lock_guard<std::mutex> lock(internalDataGuard_);
data_ = value;
// Notify thread loop about data changes
}

void A::threadLoop() {
while(!terminate())
{
// Wait (blocking) for data changes
}
}

要设置数据更改的信号,有几个选择和(操作系统)限制:

可以用来唤醒线程循环以处理更改的/新的数据的最简单的东西是信号量。在c++11中,信号量最接近的近似值是一个条件变量。pthreadsAPI的高级版本也提供条件变量支持。无论如何,由于只有一个线程应该在那里等待,并且不需要任何类型的事件外壳,因此使用简单的锁定机制应该很容易实现。

如果您可以选择使用高级操作系统,您可能更喜欢使用类似于poll()的s.th.来实现事件信令,它在用户空间提供无锁实现。

一些框架,如boost、Qt、Platinum C++和其他框架,也支持通过信号/插槽抽象进行事件处理,您可以查看它们的文档和实现,以掌握必要的/最新的技术。

显然,A需要一个指向myobj的指针才能发送

我质疑上面的假设——对我来说,允许线程A有一个指向由线程B控制/拥有/访问的对象的指针有点麻烦。。。特别是,线程A中运行的一些代码稍后可能会使用该指针直接调用myobj上的方法,从而导致竞争条件和不和谐;或者B可以删除点A持有悬空指针从而处于不稳定状态的myobj

如果我在设计这个系统,我会尝试以这样一种方式来完成跨线程消息传递,而不需要指向其他线程中对象的指针,因为你提到的原因——它们是不安全的,特别是这样的指针随时可能变成悬空指针。

那么问题来了,如果我没有指向另一个线程中对象的指针,我该如何向该对象发送消息?

一种方法是给每个对象一个唯一的ID,通过该ID可以指定它。这个ID可以是一个整数(可以是硬编码的,也可以是使用原子计数器或类似方法动态分配的),如果你想让它更容易被人类读取,也可以为一个短字符串。

然后,线程A中的代码不直接向myobj发送消息,而是向线程B发送消息,并且该消息将包括一个字段,该字段指示要接收该消息的对象的ID。

当线程B的事件循环接收到消息时,它将使用包含的ID值来查找适当的对象(使用有效的键值查找机制,如std::unordered_map),并对该对象调用适当的方法。如果对象已经被破坏,那么键值查找将失败(因为您有一种机制来确保对象作为其析构函数的一部分从其线程的对象映射中删除自己),因此尝试向被破坏的对象发送消息将完全失败(而不是调用未定义的行为)。

请注意,这种方法确实意味着线程A的代码必须知道myobj由哪个线程所有,才能知道要向哪个线程发送消息。通常,线程A无论如何都需要知道这一点,但如果你的设计甚至抽象掉了给定对象在哪个线程中运行的知识,你可以将所有者线程ID作为对象ID的一部分,以便您的postMessage()方法可以检查目标对象ID,以确定将消息发送到哪个线程。