C++信号量混淆

C++ Semaphore Confusion?

本文关键字:信号量 C++      更新时间:2023-10-16

所以,我正在编写一种示波器式的程序,该程序读取计算机上的串行端口,并对这些数据执行fft以将其转换为频谱。不过,我的程序布局遇到了一个问题,它被分解为一个SerialHandler类(利用boost::Asio)、一个FFTHandler类和一个main函数。SerialHandler类使用 boost::Asio`` async_read_some 函数从端口读取并引发一个名为 HandleOnPortReceive 的事件,该事件随后读取数据本身。

问题是我找不到一种方法将数据从事件处理程序(由另一个线程上的 io_service 对象引发)传递到另一个线程上的 FFTHandler 类。我被建议使用信号量来解决我的问题,但我对 semaphore.h 的使用几乎一无所知,所以我的实现现在相当破碎,并且没有做太多它应该做的事情。

这里有一些代码,如果这让它更清楚一点:

using namespace Foo;
//main function      
int main(void){
     SerialHandler serialHandler;
     FFTHandler fftHandler;
     sem_t *qSem_ptr = &qSem;
     sem_init(qSem_ptr, 1, 0);
     //create separate threads for both the io_service and the AppendIn so that neither will block the user input statement following
     serialHandler.StartConnection(tempInt, tempString); //these args are defined, but for brevity's sake, I ommitted the declaration
     t2= new boost::thread(boost::bind(&FFTHandler::AppendIn, &fftHandler, q, qSem));
    //allow the user to stop the program and avoid the problem of an infinite loop blocking the program
    char inChar = getchar();
    if (inChar) {...some logic to stop reading}
}


namespace Foo{
    boost::thread *t1;
    boost::thread *t2;
    sem_t qSem;
    std::queue<double> q;
    boost::mutex mutex_;
    class SerialHandler{
    private:
        char *rawBuffer; //array to hold incoming data
        boost::asio::io_service ioService;
        boost::asio::serial_port_ptr serialPort;
    public:
            void SerialHandler::StartConnection(int _baudRate, string _comPort){
                //some functionality to open the port that is irrelevant to the question goes here
                AsyncReadSome();  //starts the read loop
                //create thread for io_service object and let function go out of scope
                t1 = new boost::thread(boost::bind(&boost::asio::io_service::run, &ioService)); 
            }
            void SerialHandler::AsyncReadSome(){
                //there's some other stuff here for error_catching, but this is the only important part
                serialPort->async_read_some (
                            boost::asio::buffer(rawBuffer, SERIAL_PORT_READ_BUF_SIZE),
                            boost::bind(
                                    &SerialHandler::HandlePortOnReceive,
                                    this, boost::asio::placeholders::error,
                                    boost::asio::placeholders::bytes_transferred, q));
           }
           void SerialHandler::HandlePortOnReceive(const boost::system::error_code& error, size_t bytes_transferred, std::queue<double>& q){
                boost::mutex::scoped_lock lock(mutex_);
                 //more error checking goes here, but I've made sure they aren't returning and are not the issue
                 for (unsigned int i =0; i<bytes_transferred; i++){
                    unsigned char c = rawBuffer[i];
                    double d = (double) c;  //loop through buffer and read
                    if (c==endOfLineChar){
                    } else  //if not delimiting char, push into queue and post semaphore
                    {
                            q.push(d);
                            //cout << d  << endl;
                            sem_post(&qSem);
                            cout << q.front() << endl;
                            cout << "size is: " << q.size() << endl;
                    }
                }
                //loop back on itself and start the next read
                AsyncReadSome();
            }
    }
    class FFTHandler{
    private:
        double *in; //array to hold inputs
        fftw_complex *out; //holds outputs
        int currentIndex;
        bool filled;
        const int N;
    public:

        void AppendIn(std::queue<double> &q, sem_t &qSem){
                while(1){  //this is supposed to stop thread from exiting and going out of scope...it doesn't do that at all effectively...
                    cout << "test" << endl;
                    sem_wait(&_qSem); //wait for data...this is blocking but I don't know why
                    double d = _q.front();
                    _q.pop();
                    in[currentIndex]=d; //read queue, pop, then append in array
                    currentIndex++;
                    if (currentIndex == N){ //run FFT if full and reset index
                            currentIndex = N-overlap-1;
                            filled = true;
                            RunFFT();
                    }
                }
         }
    }
}

FFTHandler::AppendIn(..)中的调试行确实在触发,因此正在创建线程,但它似乎立即超出了范围并破坏了线程,因为似乎我已经设置了 while 以错误地响应信号量。

TLDR:这是一个很长的解释,简单地说,"我不理解信号量,但需要以某种方式实现它们。我试过了,失败了,所以现在我来这里希望从比我知识渊博的人那里得到有关此代码的帮助。

更新:因此,在尝试了一些调试语句之后,似乎问题在于while(1){...}语句确实在触发,但是,sem_wait(&_qSem);导致它阻塞。无论出于何种原因,它都会无限期地等待,尽管正在发布信号量,但它会继续等待并且永远不会超出该行。

由于您已经在使用 boost::mutex 及其作用域锁类型,因此我建议您使用 boost::condition_variable 而不是 POSIX 信号量。否则,您将C++11 样式同步与 POSIX 同步混合在一起。

添加到队列时锁定互

斥锁,但我没有看到任何锁定互斥锁以从队列中读取的内容。看起来您正在循环回以调用AsyncReadSome,而互斥锁仍处于锁定状态。

选择一种同步形式,然后正确使用它。

信号量的初始值为 0,对于这种情况有效。因此,它需要一个sem_post才能解除FFTHandler::AppendIn()的阻止。但是我没有看到第一次调用SerialHandler::AsyncReadSome()的代码来读取串行端口并将推送到队列中。如果你修复了这部分代码,我认为sem_post会发生,FFTHandler 线程会运行。作为第一步,您可以在sem_wait之后调试打印一个,然后在 AsyncReadSome() 函数中打印一个,我的猜测是两者都不会被执行。

因此,从本质上讲,您希望确保"读取"被启动并保持活动状态,作为主线程或不同线程的一部分。