使用boost::thread来启动/停止记录数据(第二次更新)
Using boost::thread to start/stop logging data (2nd update)
我目前正试图通过使用boost::thread和复选框记录实时数据。当我选中这个复选框时,日志线程就启动了。当我取消选中时,日志线程停止。当我非常快速地重复检查/取消检查时(程序崩溃,一些文件没有记录,等等),问题就出现了。我怎样才能编写一个可靠的线程安全程序,在重复和快速检查/取消检查时不会发生这些问题?我也不想使用join(),因为这会暂时停止来自主线程的数据输入。在次要线程中,我打开一个日志文件,从套接字读取到缓冲区,将其复制到另一个缓冲区,然后将该缓冲区写入日志文件。我在想,也许我应该使用互斥锁读/写。如果是的话,我应该使用哪些特定的锁?下面是代码片段:
//Main thread
if(m_loggingCheckBox->isChecked()) {
...
if(m_ThreadLogData.InitializeReadThread(socketInfo))//opens the socket.
//If socket is opened and can be read, start thread.
m_ThreadLogData.StartReadThread();
else
std::cout << "Did not initialize threadn";
}
else if(!m_loggingCheckBox->isChecked())
{
m_ThreadLogData.StopReadThread();
}
void ThreadLogData::StartReadThread()
{
//std::cout << "Thread started." << std::endl;
m_stopLogThread = false;
m_threadSendData = boost::thread(&ThreadLogData::LogData,this);
}
void ThreadLogData::StopReadThread()
{
m_stopLogThread = true;
m_ReadDataSocket.close_socket(); // close the socket
if(ofstreamLogFile.is_open())
{
ofstreamLogFile.flush(); //flush the log file before closing it.
ofstreamLogFile.close(); // close the log file
}
m_threadSendData.interrupt(); // interrupt the thread
//m_threadSendData.join(); // join the thread. Commented out since this
temporarily stops data input.
}
//secondary thread
bool ThreadLogData::LogData()
{
unsigned short int buffer[1024];
bool bufferflag;
unsigned int iSizeOfBuffer = 1024;
int iSizeOfBufferRead = 0;
int lTimeout = 5;
if(!ofstreamLogFile.is_open())
{
ofstreamLogFile.open(directory_string().c_str(), ios::out);
if(!ofstreamLogFile.is_open())
{
return 0;
}
}
while(!m_stopLogThread)
{
try {
int ret = m_ReadDataSocket.read_sock(&m_msgBuffer.m_buffer
[0],iSizeOfBuffer,lTimeout,&iSizeOfBufferRead);
memcpy(&buffer[0],m_msgBuffer.m_buffer,iSizeOfBufferRead);
bufferflag = m_Buffer.setBuffer(buffer);
if(!bufferflag) return false;
object = &m_Buffer;
unsigned int data = object->getData();
ofstreamLogFile << data << std::endl;
boost::this_thread::interruption_point();
} catch (boost::thread_interrupted& interruption) {
std::cout << "ThreadLogData::LogData(): Caught Interruption thread." << std::endl;
StopReadThread();
} catch (...) {
std::cout << "ThreadLogData::LogData(): Caught Something." << std::endl;
StopReadThread();
}
} // end while()
}
我喜欢使用Boost Asio异步的东西
#include <iostream>
#include <fstream>
#include <boost/asio.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/bind.hpp>
#include <boost/optional.hpp>
#include <thread>
using boost::asio::ip::tcp;
namespace asio = boost::asio;
struct program
{
asio::io_service _ioservice;
asio::deadline_timer _timer;
asio::signal_set _signals;
std::array<char, 1024> _buffer;
tcp::socket _client;
tcp::resolver _resolver;
std::ofstream _logfile;
std::thread _thread;
program()
: _timer(_ioservice),
_signals(_ioservice),
_client(_ioservice),
_resolver(_ioservice)
{
do_connect(_resolver.resolve({ "localhost", "6767" }));
do_toggle_logging_cycle();
_signals.add(SIGINT);
_signals.async_wait([this](boost::system::error_code ec, int) { if (!ec) close(); });
_thread = std::thread(boost::bind(&asio::io_service::run, boost::ref(_ioservice)));
}
~program()
{
if (_thread.joinable())
_thread.join();
}
void close() {
_ioservice.post([this]() {
_signals.cancel();
_timer.cancel();
_client.close();
});
}
private:
void do_toggle_logging_cycle(boost::system::error_code ec = {})
{
if (ec != boost::asio::error::operation_aborted)
{
if (_logfile.is_open())
{
_logfile.close();
_logfile.clear();
} else
{
_logfile.open("/tmp/output.log");
}
_timer.expires_from_now(boost::posix_time::seconds(2));
_timer.async_wait(boost::bind(&program::do_toggle_logging_cycle, this, boost::asio::placeholders::error()));
} else
{
std::cerr << "nDone, goobyen";
}
}
void do_connect(tcp::resolver::iterator endpoint_iterator) {
boost::asio::async_connect(
_client, endpoint_iterator,
[this](boost::system::error_code ec, tcp::resolver::iterator) {
if (!ec) do_read();
else close();
});
}
void do_read() {
boost::asio::async_read(
_client, asio::buffer(_buffer.data(), _buffer.size()),
[this](boost::system::error_code ec, std::size_t length) {
if (!ec) {
if (_logfile.is_open())
{
_logfile.write(_buffer.data(), length);
}
do_read();
} else {
close();
}
});
}
};
int main()
{
{
program p; // does socket reading and (optional) logging on a separate thread
std::cout << "nMain thread going to sleep for 15 seconds...n";
std::this_thread::sleep_for(std::chrono::seconds(15));
p.close(); // if the user doesn't press ^C, let's take the initiative
std::cout << "nDestruction of program...n";
}
std::cout << "nMain thread endsn";
};
程序连接到本地主机的6767端口,并从该端口异步读取数据。
如果日志记录是活动的(_logfile.is_open()
),所有接收到的数据都被写入/tmp/output.log
。
- 读/写是在一个单独的线程上,但是所有的操作都是用
_ioservice
序列化的(参见close()
中的post
) - 用户可以使用Ctrl+C终止套接字读取循环
- 每2秒,日志记录将被(取消)激活(见
do_toggle_logging_cycle
)
主线程在取消程序之前只休眠15秒(类似于用户按Ctrl-C)。
相关文章:
- 优化使用 C++ 查询 SQLite DB 中超过 5000 万条数据记录的方式
- C++ 仅读取输入文件中的部分(不是全部)数据以添加到记录中
- 我想使用 C++ 从 excel 获取记录,并希望使用特定的列数据(例如书籍 ID)控制输出
- Qt:如何使用sqlite数据库修改2个表中的数据记录
- 我可以在视频游戏进入 GPU 之前记录图形数据吗?
- spdlog 记录单个wchar_t字符串,其中包含包含 {..} 的数据
- 尝试将数据从记录读取到程序(C++)时难以捉摸的错误
- C++类似cout的函数,将数据写入文件(日志记录)
- 将记录推送到数据结构中的堆栈
- 运行此程序时的垃圾数据(学生记录程序)
- 试图找到一种在OpenCV / BOOST中记录图形数据的方法
- C++为数据记录定义宏
- 一个简单的PC应用程序制造商的数据记录应用程序
- 如何有效地运行后台处理和记录数据(Debian,Beaglebone Black)
- 为什么我的 read 语句不记录二进制数据
- 记录极少量数据的最有效方法是什么
- TCP 数据记录超时对我不起作用
- 加速数据记录代码
- Valgrind是否有像Purify/Quantify这样的API,可以让您禁用数据记录
- 数据记录和提取软件,c++