c++快速IPC - boost消息队列似乎很慢

c++ Fast IPC - boost message queue seems slow?

本文关键字:队列 消息 快速 IPC boost c++      更新时间:2023-10-16

我有一个问题,我似乎无法解决自己。我有Process1在while循环中计算数据。这个过程必须尽快执行。我需要在Process1中计算的数据以供以后分析,并且写入文件的速度很慢。

我从未使用过IPC,但认为这是一个很好的方法,可以将Process1中的数据存储在内存中,并从另一个Process2(单独的程序)访问它,这不是时间关键和写入日期到文件。

我已经创建了我的小测试程序(了解IPC),所以:

  1. Process1将运行,即使Process2不可访问-它将跳过IPC,只执行
  2. 当运行Process2时,它将等待Process1 -如果Process1启动,则获取数据,然后再写入磁盘。
  3. Process2将只在低于10个样本中获取x量的数据(maxRunTime)。

我目前创建的程序非常慢,当通过IPC发送消息时,它慢了6倍。目前,我只在每个"TimeStep"传递三个浮点数,但这可以是100。运行时间可以是10000。

: 如果有人能指引我正确的方向,我会很高兴。下面的代码正在工作,这可能是运气,因为它不是很漂亮。

我需要找到一个解决方案,是尽可能快,但不一定是实时的。因为我不是专业程序员,我也需要在复杂性上妥协,因为我需要理解我在做什么。

希望有人能帮忙。

代码:

  1. 使用Boost.1.59和MSVC 11.0_x86
  2. 两个独立的程序- ConsoleApps

Process1中:

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <iostream>
#include <vector>
#include <windows.h>
#include <string>
#include <ctime>
#include <iostream>
#include <fstream>
#include <map>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <time.h>

#pragma comment(lib, "user32.lib")
using namespace std;
using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock; 

bool InitCreateMsgQ()
{
    bool initOK = false;
    //Create a msgQ for parsing data
    try
    {
        message_queue::remove("msgQData");
        //Create a message_queue.
        message_queue mqData
        (open_or_create     //create q 
        ,"msgQData"         //name
        ,1000000                //max message number
        ,sizeof(float)      //max message size
        );
        initOK = true;
    }
    catch(interprocess_exception &ex)
    {
        return false;
    }
//Create State
    try
    {
        message_queue::remove("msgState");
        //Create a message_queue.
        message_queue mqState
        (open_or_create     //create q 
        ,"msgState"     //name
        ,1                  //max message number
        ,sizeof(int)        //max message size
        );
        initOK = true;
    }
    catch(interprocess_exception &ex)
    {
        return false;
    }
    return initOK;
}
bool SetState(int state)
{
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqState
        (open_only       //only oepn q
        ,"msgState"  //name
        );
        timeout = !mqState.timed_send(&state, sizeof(int), 0, 
                                        ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(100));
    }
    catch(interprocess_exception &ex)
    {
        message_queue::remove("msgState");
        timeout = true;
    }
    return timeout;
}
bool SetData(float data)
{
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqData
        (open_only       //only oepn q
        ,"msgQData"  //name
        );
        timeout = !mqData.timed_send(&data, sizeof(float), 0, 
                                        ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(1));
        //mqData.send(&data, sizeof(float), 0);
    }
    catch(interprocess_exception &ex)
    {
        message_queue::remove("msgQData");
        timeout = true;
    }
    return timeout;
}
int main ()
{
    time_t start,end;
    int runTime = 0; //just for testing
    int dummyState = 2;
    float x;
    int state = 0;
    if (InitCreateMsgQ()){state = 1;} //If all msQ ok set state 1
    if (SetState(state)){state = 0;}// If timeout to set state go to state 0
    //Do twice to get error if observer is not started
    if (SetState(dummyState)){state = 0;}// Set Dummy state for obersver
                                         // If timeout to set state go to state 0
    time (&start);
    //Runtime!
    while(runTime<1000)
    {
        switch (state) 
        {
            case 0:
                state = 0;//force next state 0 - should not be needed
                //Do nothing and break loop if monitor tool is not ready                
                break;
            case 1:
                state = 1;
                cout << "Try SEND DATA" << endl;
                for (int i = 0; i < 3; i++)
                {
                    x = rand() % 100;
                    if (SetData(x)){state = 0;}
                }               
                break;
            default:
                break;
        }
        runTime++;
        cout << "runTime: " << runTime <<" state: " << state << endl;
    }
    message_queue::remove("msgQData");
    message_queue::remove("msgState");
    cout << "done - state: " << state << endl;
    time (&end);
    double dif = difftime (end,start);
    printf ("Elasped time is %.2lf seconds.", dif );
    getchar();
}

Process2:

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
#include <iostream>
#include <vector>
#include <windows.h>
#include <string>
#include <ctime>
#include <iostream>
#include <fstream>
#include <map>
#include <stdio.h>
#include <conio.h>
#include <tchar.h>
#include <time.h>

#pragma comment(lib, "user32.lib")
using namespace std;
using namespace boost::interprocess;
using namespace boost::posix_time;
using boost::posix_time::microsec_clock; 
ofstream debugOut;      // Output file for debug    (DEBUG)
int getState()
{
    int state = 0;
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqState
        (open_only       //only oepn q
        ,"msgState"  //name
        );
        unsigned int priority;
        message_queue::size_type recvd_size;
        timeout = !mqState.try_receive(&state, sizeof(state), recvd_size, priority);    
    }
    catch(interprocess_exception &ex)
    {
        timeout = true;
    }
    if(timeout){state = 0;}
    return state;
}
float getData()
{
    float Data = -123456;
    bool timeout = true;
    try
    {
        //Open a message queue.
        message_queue mqData
        (open_only       //only oepn q
        ,"msgQData"  //name
        );
        unsigned int priority;
        message_queue::size_type recvd_size;
        //Receive the data
        //mqData.try_receive(&Data, sizeof(Data), recvd_size, priority);
        timeout = !mqData.timed_receive(&Data, sizeof(Data), recvd_size, priority,
                                        ptime(boost::posix_time::microsec_clock::universal_time()) + milliseconds(10));
    }
    catch(interprocess_exception &ex)
    {
        timeout = true;
    }
    if(timeout){Data = -123456;}
    return Data;
}
int main ()
{
    int state = 0;
    int maxRunTime = 10;
    float Data;
    float DataArray[100000];
    debugOut.open("IPCWriteTest.txt", std::ios::trunc);
    debugOut.close();
    while(true)
    {
        switch (state) 
        {
            case 0: 
                //Do nothing - data not ready state
                if(getState() == 1)
                {
                    state = 1;
                    cout << "State: 1" <<endl;
                } //If all msQ ok set state 1
                else{state = 0;}
                break;
            case 1:
                for (int runTime = 0; runTime < maxRunTime; runTime++)
                {
                    cout << "runTime: " << runTime << " Data: ";
                    for (int i = 0; i < 3; i++)
                    {
                        Data = getData();
                        cout << Data << "   ";
                        DataArray[runTime]=Data;
                    }   
                    cout << endl;
                }
                debugOut.open("IPCWriteTest.txt", std::ios::app);
                for (int runTime = 0; runTime < maxRunTime; runTime++)
                {
                    debugOut << "runTime: " << runTime << " Data: ";
                    for (int i = 0; i < 3; i++)
                    {
                        debugOut << DataArray[runTime] << " ";
                    }   
                    debugOut << endl;
                }
                debugOut.close();
                state = 0;
                break;
            default:
                break;
        }
    }
    std::cout << "done" << endl;
    getchar();
}

您正在为每个操作打开队列。

您应该尝试打开一次并将引用传递给所有相关代码(通常您将其作为类中的成员存储)。

同样,拥有单独的队列也会导致速度变慢。在我看来,你"滥用"mqState作为interprocess::condition_variable或信号量:

  • http://www.boost.org/doc/libs/1_59_0/doc/html/interprocess/synchronization_mechanisms.html interprocess.synchronization_mechanisms.conditions
  • http://www.boost.org/doc/libs/1_59_0/doc/html/interprocess/synchronization_mechanisms.html interprocess.synchronization_mechanisms.semaphores

无论如何,将异常转换为冗长的错误代码的效率并不高。你正在手动地做异常处理应该做的事情。

此外,跟踪调试消息到标准输出将大大降低程序的速度,特别是在Windows上

观察者须知

同样的事情发生了,debugOutput文件也不应该连续重新打开。

在三元组中"硬循环"是很奇怪的。如果它是一个队列,每次只弹出一个消息。如果消息"逻辑上"由三个浮点数组成,则发送包含三个浮点数的消息。现在我甚至认为这是一个bug:

            for (int i = 0; i < 3; i++) {
                data = getData();
                std::cout << data << "   ";
                DataArray[runTime] = data;
            }

给同一个索引(runTime)赋三个不同的值…

简化代码

在我"审查它"(清理它)之后给制作人的代码:

<罢工>生活> <晚餐1> 罢工Coliru

#include <boost/date_time.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <fstream>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <map>
#include <string>
#include <vector>
namespace bip = boost::interprocess;
namespace pt  = boost::posix_time;
struct QueueLogic {
    bool forced_remove = bip::message_queue::remove("msgQData");
    bip::message_queue mqData{ bip::open_or_create, "msgQData", 1000000, sizeof(float) };
    bool SetData(float data) {
        return !mqData.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
    }
};
#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>
using Clock = boost::chrono::high_resolution_clock;
int main() {
    std::vector<float> pre_calculated;
    std::generate_n(back_inserter(pre_calculated), 10000*100, [] { return rand()%100; });
    auto start = Clock::now();
    try {
        QueueLogic instance;
        for (auto v : pre_calculated)
            instance.SetData(v);
    } catch(std::exception const& e) {
        std::cout << "Exception thrown: " << e.what() << "n";
        bip::message_queue::remove("msgQData");
        throw;
    }
    auto end = Clock::now();
    std::cout << boost::chrono::duration_cast<boost::chrono::milliseconds>(end-start) << "n";
}

消费者代码:

<罢工>生活> <晚餐1> 罢工Coliru

#include <iostream>
#include <fstream>
#include <vector>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
using namespace std;
namespace bip = boost::interprocess;
namespace pt  = boost::posix_time;
#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>
using Clock = boost::chrono::high_resolution_clock;
struct ObserverLogic {
    bip::message_queue mqData{bip::open_only, "msgQData"};
    float getData() {
        float data;
        bip::message_queue::size_type recvd_size;
        unsigned int priority;
        if (!mqData.timed_receive(&data, sizeof(data), recvd_size, priority,
                                  pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(10))) 
        {
            throw std::runtime_error("timeout in timed_receive");
        }
        return data;
    }
};
int main() {
    std::vector<float> DataArray;
    DataArray.reserve(100000);
    ObserverLogic instance;
    try {
        while (DataArray.size() <= 100000) {
            DataArray.push_back(instance.getData());
        }
    } catch (std::exception const &e) {
        std::cout << "Exception caught: " << e.what() << "n";
    }
    std::cout << "Received " << DataArray.size() << " messagesn";
    std::copy(DataArray.begin(), DataArray.end(), std::ostream_iterator<float>(std::cout, "; "));
    std::cout << "nndone" << std::endl;
}

指出

Live1 - Coliru上不允许共享内存

请在下面找到我更新的代码:使用MSVC14编译。

我现在只有一个问题。如果我关闭我的消费者,而生产者正在运行它停止?不知道为什么。

#include <boost/date_time.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <fstream>
#include <algorithm>
#include <iterator>
#include <iostream>
#include <map>
#include <string>
#include <vector>
#include <time.h>
#include <windows.h>
namespace bip = boost::interprocess;
namespace pt = boost::posix_time;
struct QueueLogic 
{
    //DataConfig Setup
    bool forced_removeDataConfig = bip::message_queue::remove("msgDataConfig");
    bip::message_queue mqDataConfig{ bip::open_or_create, "msgDataConfig", 2, sizeof(float) };
    bool SetDataConfig(float data) {
        return !mqDataConfig.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
    }
    //Data Setup
    bool forced_remove = bip::message_queue::remove("msgQData");
    bip::message_queue mqData{ bip::open_or_create, "msgQData", 1000000, sizeof(float) };
    bool SetData(float data) {
        return !mqData.timed_send(&data, sizeof(float), 0, pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(1));
    }
};

int main() 
{
    time_t start, end;
    time(&start);
    float noVarsToMonitor = 10.f;
    float minTimeStep = 1.f;// 0.001f;
    std::vector<float> pre_calculated;
    std::vector<float> data_config;
    //Set Vars to monitor
    data_config.push_back(noVarsToMonitor); //Add noVars as first param in vector
    data_config.push_back(minTimeStep); //Add noVars as first param in vector
    //Parse parameters into vector
    std::generate_n(back_inserter(pre_calculated), noVarsToMonitor, [] { return rand() % 100; });
    //Create instance of struct
    QueueLogic instance;
    //Setup data config
    try
    {       
        for (auto v : data_config)
        {
            instance.SetDataConfig(v);
        }
    }
    catch (std::exception const& e)
    {
            std::cout << "Exception thrown: " << e.what() << "n";
            bip::message_queue::remove("msgQData");
            bip::message_queue::remove("msgDataConfig");
            throw;
    }
    //Get Data
    for (size_t i = 0; i < 1000; i++) //simulate that code will be called 1000 times after data is recalculated
    {
        try
        {
            for (auto v : pre_calculated)
            {
                instance.SetData(v);
            }
            std::cout << "case: " << i << std::endl;
            Sleep(20); //sleep to test code including consumer
        }
        catch (std::exception const& e)
        {
            std::cout << "Exception thrown: " << e.what() << "n";
            bip::message_queue::remove("msgQData");
            bip::message_queue::remove("msgDataConfig");
            throw;
        }
    }
    time(&end);
    double dif = difftime(end, start);
    printf("Elasped time is %.2lf seconds.", dif);
    getchar();
}
消费者:

#include <iostream>
#include <fstream>
#include <vector>
#include <windows.h>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/date_time.hpp>
using namespace std;
namespace bip = boost::interprocess;
namespace pt = boost::posix_time;
struct ObserverLogic 
{
    //Get Config Data
    bip::message_queue mqDataConfig{ bip::open_only, "msgDataConfig" };
    float getDataConfig()
    {
        float data;
        bip::message_queue::size_type recvd_size;
        unsigned int priority;
        if (!mqDataConfig.timed_receive(&data, sizeof(data), recvd_size, priority,
            pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(250)))
        {
            throw std::runtime_error("timeout in timed_receive");
        }
        return data;
    }
    //Get Var Data
    bip::message_queue mqData{ bip::open_only, "msgQData" };
    float getData() 
    {
        float data;
        bip::message_queue::size_type recvd_size;
        unsigned int priority;
        if (!mqData.timed_receive(&data, sizeof(data), recvd_size, priority,
            pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(250)))
        {
            throw std::runtime_error("timeout in timed_receive");
        }
        return data;
    }
};
int main() {
    std::vector<float> DataArray;
    int count = 0; 
    float maxMonitorTime = 10.f;
    DataArray.reserve(100000);
    //Fetch this from Producer
    float noVarsToMonitor = 0.f; 
    float minTimeStep = 0.f;
    float maxSimSamples = 0.f;
    while (true)
    {
        try
        {
            ObserverLogic instance;
            //Get Numbers of vars to monitor - used another thread!
            noVarsToMonitor = instance.getDataConfig();
            minTimeStep = instance.getDataConfig();
            maxSimSamples = (noVarsToMonitor*(maxMonitorTime * floor((1 / minTimeStep) + 0.5)))-1;
            std::cout << "noVarsToMonitor: " << noVarsToMonitor << std::endl;
            std::cout << "minTimeStep: " << minTimeStep << std::endl;
            std::cout << "maxSimSamples: " << maxSimSamples << std::endl;
            std::ofstream ofs("IPQ_Debug.log", std::ios::trunc); //Only clear when data is ready from Producer
            //Get Var Data below here:
            try
            {
                while (DataArray.size() <= maxSimSamples)
                {
                    float value = instance.getData();
                    DataArray.push_back(value);
                    ofs << value << "; ";
                    count++;
                    if (count>noVarsToMonitor - 1) //Split Vector to match no Vars pr. Timestep
                    {
                        ofs << std::endl;
                        count = 0;
                    }
                }
                std::cout << "Received " << DataArray.size() << " messagesn";
                std::cout << "nndone" << std::endl;
                std::cout << std::endl;
            }
            catch (std::exception const &e)
            {
                std::cout << "Exception caught: " << e.what() << "n";
            }
        }
        catch (std::exception const &e)
        {
            std::cout << "Exception caught: " << e.what() << "n";
        }
        std::cout << "Wait 5 seconds to try fetch again" << "n";
        Sleep(5000); //sleep and wait to run loop again before looking at for msqQ
    }
    getchar();
}

输出到txt:

41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 
41; 67; 34; 0; 69; 24; 78; 58; 62; 64; 

然后可以根据"模拟时间"绘制输出,保持数据在正确的列和行中。

它可能仍然不是漂亮的代码,但我仍在学习,我感谢在我的第一篇文章中得到的支持。欢迎留言