无法找到"断管"错误的原因,同时通过Beast websocket发送连续数据块

Unable to find the reason for "Broken Pipe" error while sending continuous data chunks through Beast websocket

本文关键字:quot websocket Beast 数据 连续 断管 错误      更新时间:2023-10-16

我正在使用IBM Watson语音到文本Web服务API进行流式音频识别。我在C++(std 11)中创建了一个带有boost(beast 1.68.0)库的Web-socket。

我已经成功连接到 IBM 服务器,并希望按以下方式向服务器发送231,296 字节的原始音频数据。

{
"action": "start",
"content-type": "audio/l16;rate=44100"
}
websocket.binary(true);
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 31,296 bytes>
websocket.binary(false);
{
"action": "stop"
}

来自 IBMServer 的预期结果是:

{"results": [
{"alternatives": [
{  "confidence": xxxx, 
"transcript": "call Rohan Chauhan "
}],"final": true
}], "result_index": 0
}

但我没有得到预期的结果:而是错误说"管道坏了">

DataSize is: 50000 | mIsLast is : 0
DataSize is: 50000 | mIsLast is : 0
what : Broken pipe
DataSize is: 50000 | mIsLast is : 0
what : Operation canceled
DataSize is: 50000 | mIsLast is : 0
what : Operation canceled
DataSize is: 31296 | mIsLast is : 0
what : Operation canceled

这是我的代码,它是对 beast 库中给出的示例示例的改编。

福.hpp

class IbmWebsocketSession: public std::enable_shared_from_this<IbmWebsocketSession> {
protected:
char binarydata[50000];
std::string TextStart;
std::string TextStop;
public:
explicit IbmWebsocketSession(net::io_context& ioc, ssl::context& ctx, SttService* ibmWatsonobj) :
mResolver(ioc), mWebSocket(ioc, ctx) {
TextStart ="{"action":"start","content-type": "audio/l16;rate=44100"}";
TextStop = "{"action":"stop"}";

/**********************************************************************
* Desc  : Send start frame
**********************************************************************/
void send_start(beast::error_code ec);
/**********************************************************************
* Desc  : Send Binary data
**********************************************************************/
void send_binary(beast::error_code ec);
/**********************************************************************
* Desc  : Send Stop frame
**********************************************************************/
void send_stop(beast::error_code ec);
/**********************************************************************
* Desc  : Read the file for binary data to be sent
**********************************************************************/
void readFile(char *bdata, unsigned int *Len, unsigned int *start_pos,bool *ReachedEOF);
}

福.cpp

void IbmWebsocketSession::on_ssl_handshake(beast::error_code ec) {
if(ec)
return fail(ec, "connect");
// Perform the websocket handshake
ws_.async_handshake_ex(host, "/speech-to-text/api/v1/recognize", [Token](request_type& reqHead) {reqHead.insert(http::field::authorization,Token);},bind(&IbmWebsocketSession::send_start, shared_from_this(),placeholders::_1));
}
void IbmWebsocketSession::send_start(beast::error_code ec){
if(ec)
return fail(ec, "ssl_handshake");
ws_.async_write(net::buffer(TextStart),
bind(&IbmWebsocketSession::send_binary, shared_from_this(),placeholders::_1));
}
void IbmWebsocketSession::send_binary(beast::error_code ec) {
if(ec)
return fail(ec, "send_start");
readFile(binarydata, &Datasize, &StartPos, &IsLast);
ws_.binary(true);
if (!IsLast) {
ws_.async_write(net::buffer(binarydata, Datasize),
bind(&IbmWebsocketSession::send_binary, shared_from_this(),
placeholders::_1));
} else {
IbmWebsocketSession::on_binarysent(ec);
}
}
void IbmWebsocketSession::on_binarysent(beast::error_code ec) {
if(ec)
return fail(ec, "send_binary");
ws_.binary(false);
ws_.async_write(net::buffer(TextStop),
bind(&IbmWebsocketSession::read_response, shared_from_this(), placeholders::_1));
}
void IbmWebsocketSession::readFile(char *bdata, unsigned int *Len, unsigned int *start_pos,bool *ReachedEOF) {
unsigned int end = 0;
unsigned int start = 0;
unsigned int length = 0;
// Creation of ifstream class object to read the file
ifstream infile(filepath, ifstream::binary);
if (infile) {
// Get the size of the file
infile.seekg(0, ios::end);
end = infile.tellg();
infile.seekg(*start_pos, ios::beg);
start = infile.tellg();
length = end - start;
}
if ((size_t) length < 150) {
*Len = (size_t) length;
*ReachedEOF = true;
// cout << "Reached end of File (last 150 bytes)" << endl;
} else if ((size_t) length <= 50000) {  //Maximumbytes to send are 50000
*Len = (size_t) length;
*start_pos += (size_t) length;
*ReachedEOF = false;
infile.read(bdata, length);
} else {
*Len = 50000;
*start_pos += 50000;
*ReachedEOF = false;
infile.read(bdata, 50000);
}
infile.close();
}

这里有什么建议吗?

从boost的文档中,我们有以下关于websocket::async_write的摘录

此函数用于异步写入完整的消息。这 呼叫总是立即返回。异步操作将 继续,直到满足以下条件之一:

  1. 完整的消息已写入。

  2. 发生错误。

因此,当您创建要传递给它的缓冲区对象时net::buffer(TextStart)例如,传递给它的buffer的生存期仅在函数返回之前。可能是即使在函数返回您之后,根据文档,异步写入仍在缓冲区上运行,但由于buffer是局部变量,因此内容不再有效。

为了解决这个问题,你可以将你的TextStart设置为静态的,或者将其声明为类的成员,并将其复制到boost::asio::buffer有很多关于如何做到这一点的例子。注意,我只在IbmWebsocketSession::send_start函数中提到TextStart。在整个代码中,问题几乎相同。

根据 IBM Watson 的 API 定义,Launch a 连接需要某种格式,然后可以表示为字符串。您有字符串,但缺少正确的格式,因此对等方正在关闭连接,并且您正在写入一个关闭的套接字,因此管道损坏。

启动连接需要:

var message = {
action: 'start',
content-type: 'audio/l16;rate=22050'
};

根据您的要求,可以表示为string TextStart = "action: 'start',rncontent-type: 'audio/l16;rate=44100'"

根据聊天中的讨论,OP 通过添加代码解决了该问题:

if (!IsLast ) {
ws_.async_write(net::buffer(binarydata, Datasize),
bind(&IbmWebsocketSession::send_binary, shared_from_this(),
placeholders::_1));
} 
else {
if (mIbmWatsonobj->IsGstFileWriteDone()) { //checks for the file write completion
IbmWebsocketSession::on_binarysent(ec);
} else {
std::this_thread::sleep_for(std::chrono::seconds(1));
IbmWebsocketSession::send_binary(ec);
}
}

讨论源于这样一个事实,即在同一组字节上完成文件写入之前,向客户端发送了更多字节。OP 现在会在尝试发送更多字节之前验证这一点。