C++会话管理方法

C++ Session management methods

本文关键字:方法 管理方 管理 会话 C++      更新时间:2023-10-16

我想创建一个会话管理方案,简而言之,它必须是这样的:

  • 跟踪当前活动会话的std::map(已填充带有id字符串和关联的消息队列)
  • 每个消息队列上的一组线程

有两种方法可以创建此方案:

  1. 将所有组件保留在主程序中,并在需要时使用新的会话id和关联的消息队列填充映射,并使用对将作为参数轮询的队列的引用启动新的分离线程。

    //main program code
    _session_map[id] = queue;
    thread = new thread(&queue);
    thread.detach();
    thread.start();
    //main program code
    
  2. 创建一个session_manager类来隐藏所有这些机制到主程序。

    class session_manager
    {
    //session_manager code
    std::string new_session(std::string id)
    {
    _session_map[id] = queue;
    thread = new thread(&queue);
    thread.detach();
    thread.start();
    }
    private:
    std::map<std::string,message_queue> _session_map;
    };
    

创建此方案的更好方法是什么?我不确定第二种方案是否能正确工作,因为我不太擅长使用线程。此外,我也不知道如何跟踪闭门会议,有人有什么建议吗?

几年前,我会给出一个与现在完全不同的答案。

我也不知道你说的"会话管理"是什么意思,也不知道队列与工人之间的关系。因此,我将从一点假设开始:

  • 您希望N个线程并行工作,在队列中找到的作业上相互竞争
  • 您的会话是一种工作会话,它将持续到作业序列完成为止

在C++线程编程的早期,我实际上已经概述了如何实现这样一个方案的体系结构。

但我怀疑,你只需要完成一项工作,而不是一些"理论课"。

因此,我选择展示一种(也是特定于操作系统的)更抽象的方法来完成任务,而不是摆弄低级别线程(特定于操作操作系统)。

优点:

  • 并行编程,无需处理锁、互斥、调用pthis->Execute()的静态线程函数等
  • 概念上易于理解。消息块、源、目标、消息、Worker对象(参与者/代理)。没有承诺未来的C++-linq反应式函数编程替换尝试(幽默的尝试!)
  • 看起来很接近你的想法

缺点:

  • 隐藏了我们这些老年人引以为豪的所有低级东西,并与之一起度过了多年纯粹的快乐和绝望
  • 不幸的是,仅在Windows平台(AFAIK)上运行

这里的所有代码都使用Windows并发运行时。

#include "stdafx.h"
#include <thread>
#include <concrt.h>
#include <agents.h>
#include <iostream>
template<class _Job>
class Worker 
: public concurrency::agent 
{
concurrency::ISource<_Job> *m_source;
volatile bool m_running;
uint32_t m_counter;
public:
Worker(concurrency::ISource<_Job> *source)
: m_source(source)
, m_running(true)
, m_counter(0UL)
{}
~Worker()
{}
uint32_t Counter() const
{
return m_counter;
}
void Stop()
{
m_running = false;
}
virtual void run()
{
while (m_running)
{
try
{
_Job job = concurrency::receive(m_source, 1000);
m_counter++;
}
catch (concurrency::operation_timed_out& /*timeout*/)
{
std::cout << "Timeout." << std::endl;
}
}
_Job job;
while (concurrency::try_receive(m_source, job))
{
m_counter++;
}
done();
}
};
typedef uint64_t Job_t;
int _tmain(int argc, _TCHAR* argv[])
{
const size_t NUM_WORKERS = 4;
concurrency::unbounded_buffer<Job_t> buffer;
Worker<Job_t> workers[NUM_WORKERS] = 
{ Worker<Job_t>(&buffer)
, Worker<Job_t>(&buffer)
, Worker<Job_t>(&buffer)
, Worker<Job_t>(&buffer)
};
std::vector<concurrency::agent*> agents;
for (auto& worker : workers)
{
agents.push_back(&worker);
worker.start();
}
for (uint64_t jobid = 0ULL; jobid < 1000000ULL; jobid++)
{ 
concurrency::asend(buffer, jobid);
}
for (auto& worker : workers)
{
worker.Stop();
}
concurrency::agent::wait_for_all(NUM_WORKERS,&agents[0]);
for (auto& worker : workers)
{
std::cout << "counter: " << worker.Counter() << std::endl;
}
return 0;
}