使用boost::thread来启动/停止记录数据(第二次更新)

Using boost::thread to start/stop logging data (2nd update)

本文关键字:数据 记录 更新 第二次 thread boost 启动 使用      更新时间:2023-10-16

我目前正试图通过使用boost::thread和复选框记录实时数据。当我选中这个复选框时,日志线程就启动了。当我取消选中时,日志线程停止。当我非常快速地重复检查/取消检查时(程序崩溃,一些文件没有记录,等等),问题就出现了。我怎样才能编写一个可靠的线程安全程序,在重复和快速检查/取消检查时不会发生这些问题?我也不想使用join(),因为这会暂时停止来自主线程的数据输入。在次要线程中,我打开一个日志文件,从套接字读取到缓冲区,将其复制到另一个缓冲区,然后将该缓冲区写入日志文件。我在想,也许我应该使用互斥锁读/写。如果是的话,我应该使用哪些特定的锁?下面是代码片段:

//Main thread
 if(m_loggingCheckBox->isChecked()) {
...
if(m_ThreadLogData.InitializeReadThread(socketInfo))//opens the socket. 
//If socket is opened and can be read, start thread.
 m_ThreadLogData.StartReadThread();
 else
 std::cout << "Did not initialize threadn";
 }
 else if(!m_loggingCheckBox->isChecked())
 {
m_ThreadLogData.StopReadThread();
}
void ThreadLogData::StartReadThread()
 {
 //std::cout << "Thread started." << std::endl;
 m_stopLogThread = false;
 m_threadSendData = boost::thread(&ThreadLogData::LogData,this);
 }
void ThreadLogData::StopReadThread()
 {
 m_stopLogThread = true;
 m_ReadDataSocket.close_socket(); // close the socket
if(ofstreamLogFile.is_open())
 {
 ofstreamLogFile.flush(); //flush the log file before closing it.
 ofstreamLogFile.close(); // close the log file
 }
 m_threadSendData.interrupt(); // interrupt the thread
 //m_threadSendData.join(); // join the thread. Commented out since this
 temporarily stops data input.
}
//secondary thread
 bool ThreadLogData::LogData()
 {
unsigned short int buffer[1024];
 bool bufferflag;
 unsigned int iSizeOfBuffer = 1024;
 int iSizeOfBufferRead = 0;
 int lTimeout = 5;
if(!ofstreamLogFile.is_open())
 {
 ofstreamLogFile.open(directory_string().c_str(), ios::out);
if(!ofstreamLogFile.is_open())
 {
 return 0;
 }
 }
while(!m_stopLogThread)
 {
 try {
 int ret = m_ReadDataSocket.read_sock(&m_msgBuffer.m_buffer
 [0],iSizeOfBuffer,lTimeout,&iSizeOfBufferRead);
memcpy(&buffer[0],m_msgBuffer.m_buffer,iSizeOfBufferRead);
 bufferflag = m_Buffer.setBuffer(buffer);
 if(!bufferflag) return false;
 object = &m_Buffer;
unsigned int data = object->getData();
ofstreamLogFile << data << std::endl;
boost::this_thread::interruption_point();
} catch (boost::thread_interrupted& interruption) {
 std::cout << "ThreadLogData::LogData(): Caught Interruption thread." << std::endl;
 StopReadThread();
 } catch (...) {
 std::cout << "ThreadLogData::LogData(): Caught Something." << std::endl;
 StopReadThread();
 }
} // end while()

}

我喜欢使用Boost Asio异步的东西

#include <iostream>
#include <fstream>
#include <boost/asio.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/bind.hpp>
#include <boost/optional.hpp>
#include <thread>
using boost::asio::ip::tcp;
namespace asio = boost::asio;
struct program
{
    asio::io_service       _ioservice;
    asio::deadline_timer   _timer;
    asio::signal_set       _signals;
    std::array<char, 1024> _buffer;
    tcp::socket            _client;
    tcp::resolver          _resolver;
    std::ofstream          _logfile;
    std::thread            _thread;
    program() 
        : _timer(_ioservice),
        _signals(_ioservice),
        _client(_ioservice),
        _resolver(_ioservice)
    {
        do_connect(_resolver.resolve({ "localhost", "6767" }));
        do_toggle_logging_cycle();
        _signals.add(SIGINT);
        _signals.async_wait([this](boost::system::error_code ec, int) { if (!ec) close(); });
        _thread = std::thread(boost::bind(&asio::io_service::run, boost::ref(_ioservice)));
    }
    ~program()
    {
        if (_thread.joinable())
            _thread.join();
    }
    void close() {
        _ioservice.post([this]() { 
            _signals.cancel();
            _timer.cancel();
            _client.close(); 
        });
    }
private:
  void do_toggle_logging_cycle(boost::system::error_code ec = {})
  {
      if (ec != boost::asio::error::operation_aborted)
      {
          if (_logfile.is_open())
          {
              _logfile.close();
              _logfile.clear();
          } else
          {
              _logfile.open("/tmp/output.log");
          }
          _timer.expires_from_now(boost::posix_time::seconds(2));
          _timer.async_wait(boost::bind(&program::do_toggle_logging_cycle, this, boost::asio::placeholders::error()));
      } else 
      {
          std::cerr << "nDone, goobyen";
      }
  }
  void do_connect(tcp::resolver::iterator endpoint_iterator) {
      boost::asio::async_connect(
          _client, endpoint_iterator,
          [this](boost::system::error_code ec, tcp::resolver::iterator) {
                if (!ec) do_read();
                else     close();
          });
  }
  void do_read() {
    boost::asio::async_read(
        _client, asio::buffer(_buffer.data(), _buffer.size()),
        [this](boost::system::error_code ec, std::size_t length) {
            if (!ec) {
              if (_logfile.is_open())
              {
                    _logfile.write(_buffer.data(), length);
              }
              do_read();
            } else {
                close();
            }
    });
  }
};
int main()
{
    {
        program p; // does socket reading and (optional) logging on a separate thread
        std::cout << "nMain thread going to sleep for 15 seconds...n";
        std::this_thread::sleep_for(std::chrono::seconds(15));
        p.close(); // if the user doesn't press ^C, let's take the initiative
        std::cout << "nDestruction of program...n";
    }
    std::cout << "nMain thread endsn";
};

程序连接到本地主机的6767端口,并从该端口异步读取数据。

如果日志记录是活动的(_logfile.is_open()),所有接收到的数据都被写入/tmp/output.log

现在

  • 读/写是在一个单独的线程上,但是所有的操作都是用_ioservice序列化的(参见close()中的post)
  • 用户可以使用Ctrl+C终止套接字读取循环
  • 每2秒,日志记录将被(取消)激活(见do_toggle_logging_cycle)

主线程在取消程序之前只休眠15秒(类似于用户按Ctrl-C)。