具有四个队列的多线程生产者/消费者

Multithreaded producer/consumer with four queues

本文关键字:多线程 生产者 消费者 队列 四个      更新时间:2023-10-16

我将使用者/生产者问题从应用程序中分离出来,以确保我的线程正常工作。

我有一个生产者线程和一个消费者线程池:在我的应用程序中,一个线程接受连接并将它们排队(在我的自定义结构中)在四个队列中的一个队列中,四个线程从队列中弹出并处理之前排队的连接;在这里,我的队列将包含1到4之间的随机int,没有自定义结构。

四个mutex确保每个队列的数据保护(当打印队列大小时,在终端上为一个像样的cout添加一个互斥);使用CCD_ 4来同步从四个队列中移除。生产者线程在右侧队列中推送一个新的int值,然后也在priority_queue中推送,因此当线程想要读取时,他首先需要从priority_queue中读取pop(),以便了解推送了哪个队列(由于它是排序的,在一些随机推送之后,我的priority_queue将看起来像1 1 1 2 3 3 3 3 4 4,所以消费者线程将pop(),看到值1,并理解它必须从队列1中删除)。

为什么要排四个队因为每个队列都有自己的优先级(1=最大,4=最小),所以在从队列2中删除元素之前,应该先删除队列1中的所有元素;所有其他队列的原因相同。由于这里我有一个从1到4的随机推送值,所以不应该有饥饿。

使用:g++ -std=c++11 -o producer-consumer-multiqueue producer-consumer-multiqueue.cpp -pthread在Ubuntu 14.04 x86_64上编译,gcc版本4.8.4。

问题:除了调度程序导致的奇怪输出之外,使用者线程的行为并不像我想要的那样,因为正如你在下面的输出中看到的,它没有被赋予从队列1中删除元素的优先级,但删除是不按优先级(队列1最大值,队列4最小值)执行的。我想在不使用外部库的情况下实现我的目标,没有boost和类似的

(0 0 1 0) // (elements in queue 1, in queue 2, in queue 3, in queue 4)
(1 0 1 0)
(1 1 1 0)
(0 0 0 0)
(0 0 0 0)
(0 0 0 0)
(1 0 0 0)
(2 0 0 0)
(2 1 0 0)
(1 1 0 1)
(1 0 0 1)
(1 0 0 0)
(1 0 0 0)
(0 0 0 0)
(1 0 0 0)
...CTRL+c

代码:这是我的完整测试文件,可编译且可执行,如下所示:

#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <random>
using namespace std;
// modify this to modify the number of consumer threads
#define WORKERS_THREADS     4
// max size of each of four queues
#define MAX_QUEUE_SIZE      100
// debug
#define DEFAULTCOLOR        "33[0m"
#define RED                 "33[22;31m"
#define YELLOW              "33[1;33m"
#define GREEN               "33[0;0;32m"
class MultiQueue {
    public:
        void initThreadPool(void);
        void insert(int num);
        void remove(void);
        void insertPriorityQueue(int num);
        int removePriorityQueue(void);
        void printQueues(string what);
        int getQueue1Size(void);
        int getQueue2Size(void);
        int getQueue3Size(void);
        int getQueue4Size(void);
        int getPrioQueueSize(void);
    private:
        vector<thread> workers;
        queue<int>q1;
        queue<int>q2;
        queue<int>q3;
        queue<int>q4;
        priority_queue<int, vector<int>, greater<int>> prioq;
        // mutex for push/pop in priority queue
        mutex priority_queue_mutex;
        // 4 mutexes for each queue
        mutex m1, m2, m3, m4;
        // mutex for printing 4 queues size
        mutex print;
        // mutex for push/pop to priority_queue
        condition_variable prioq_cond;
        // 4 conds for consumer threads
        condition_variable w1, w2, w3, w4;
};
int MultiQueue::getQueue1Size() { return q1.size(); }
int MultiQueue::getQueue2Size() { return q2.size(); }
int MultiQueue::getQueue3Size() { return q3.size(); }
int MultiQueue::getQueue4Size() { return q4.size(); }
int MultiQueue::getPrioQueueSize() { return prioq.size(); }
void MultiQueue::initThreadPool(void) {
    for (int i=0; i<WORKERS_THREADS; i++) {
        workers.push_back(thread(&MultiQueue::remove, this));
        workers[i].detach();
    }
}
void MultiQueue::printQueues(string what) {
    lock_guard<mutex> l(print);
    if (what == "insert")
        cout << GREEN << '(' << getQueue1Size() << ' ' << getQueue2Size() << ' ' << getQueue3Size() << ' ' << getQueue4Size() << ')' << DEFAULTCOLOR << 'n' << flush;
    else
        cout << YELLOW << '(' << getQueue1Size() << ' ' << getQueue2Size() << ' ' << getQueue3Size() << ' ' << getQueue4Size() << ')' << DEFAULTCOLOR << 'n' << flush;
}
// called from producer thread to tell consumer threads 
// what queues to pop() from
void MultiQueue::insertPriorityQueue(int num) {
    lock_guard<mutex> prio(priority_queue_mutex);
    prioq.push(num);
    prioq_cond.notify_one();
}
// called from consumer threads to see what queues 
// have elements to pop() from
int MultiQueue::removePriorityQueue(void) {
    int ret = 0;
    unique_lock<mutex> prio(priority_queue_mutex);
    prioq_cond.wait(prio, [this] () { return getPrioQueueSize() > 0; });
    ret = prioq.top();
    prioq.pop();
    return ret;
}
// producer thread 
void MultiQueue::insert(int num) {
    switch(num) {
        case 1: {
            unique_lock<mutex> locker(m1);
            w1.wait(locker, [this] () { return getQueue1Size() < MAX_QUEUE_SIZE; });
            q1.push(num);
            break;
        }
        case 2: {
            unique_lock<mutex> locker(m2);
            w2.wait(locker, [this] () { return getQueue2Size() < MAX_QUEUE_SIZE; });
            q2.push(num);
            break;
        }
        case 3: {
            unique_lock<mutex> locker(m3);
            w3.wait(locker, [this] () { return getQueue3Size() < MAX_QUEUE_SIZE; });
            q3.push(num);
            break;      
        }
        case 4: {
            unique_lock<mutex> locker(m4);
            w4.wait(locker, [this] () { return getQueue4Size() < MAX_QUEUE_SIZE; });
            q4.push(num);
            break;
        }
        default: {
            cout << "number not 1, 2, 3 nor 4: " << num << 'n' << flush;
            break;
        }
    }
    printQueues("insert");
    insertPriorityQueue(num);
}
void MultiQueue::remove(void) {
    int which_queue = 0;
    while (true) {
        which_queue = removePriorityQueue();
        switch (which_queue) {
            case 1: {
                lock_guard<mutex> lock(m1);
                int ret = q1.front();
                q1.pop();
                printQueues("remove");
                break;
            }
            case 2: {
                lock_guard<mutex> lock(m2);
                int ret = q2.front();
                q2.pop();
                printQueues("remove");
                break;
            }
            case 3: {
                lock_guard<mutex> lock(m3);
                int ret = q3.front();
                q3.pop();
                printQueues("remove");
                break;
            }
            case 4: {
                lock_guard<mutex> lock(m4);
                int ret = q4.front();
                q4.pop();
                printQueues("remove");
                break;
            }
            default: {
                break;
            }
        }
    }
}
int main(void) {
    int random_num = 0;
    MultiQueue mq;
    mq.initThreadPool();
    default_random_engine eng((random_device())());
    uniform_int_distribution<int> idis(1, 4);
    while (true) {
        random_num = idis(eng);
        mq.insert(random_num);
    }
    return 0;
}

我在您的代码中看到以下问题:

  1. 打印不一定反映元素的弹出顺序。一个线程从队列中提取元素,然后可以长时间等待print锁,之后得到元素的另一个线程可以是第一个得到print锁的线程
  2. 优先级队列也存在类似问题。可能会出现这样的情况:第一个线程从优先级队列中获得元素,并知道它应该弹出queue1,然后第一个线程被调度器关闭,第二个线程开始工作。它还弹出优先级队列,然后继续弹出queue2(当第一个线程关闭时)

我会遵循评论中的建议,使用单个priority_queue<std::pair<int,int>>,其中std::pair<int,int>的第一个元素是优先级,第二个元素是有效载荷。这将帮助你处理问题2。对于问题1,您应该在与pop相同的锁下打印内容。