ZMQ: socket_send/recv blocking
ZMQ: socket_send/recv blocking
所以我试图在Python中的ZMQ和C/C++扩展中的ZMQ之间建立一些简单的通信。Python设置上下文,绑定inproc套接字,并将上下文和套接字名称传递给扩展。扩展设置自己的套接字,连接并侦听消息。然后,Python向扩展发送一个标头,然后发送字典的字符串表示。使用REQ/REP套接字的非常简单的东西。然而,由于某种原因,我似乎找不到,对socket.send的调用被阻塞,并且扩展从未通过对zmq_recv的调用。我有一个测试环境,在那里我有几乎完全相同的场景,但套接字不会阻塞,我已经对代码进行了三次检查,它应该以相同的方式工作。
PYTHON:
import zmq
import cppextension
# No lectures about using threading please. I'm restricted to, in essence
# using this function because of the code base I'm working with.
from thread import start_new_thread
socket = self.zmq_context.socket(zmq.REQ)
socket_name = "inproc://agl"
socket.bind(socket_name)
t = start_native_thread(cppextension.actor,
(self.zmq_context, socket_name))
test_send = {"foo": 1, "bar": 2}
# BLOCKS ON THIS LINE VVVVV
socket.send("TEST", flags=zmq.SNDMORE)
socket.send(str(test_send))
socket.recv()
socket.send("STOP")
C/C++:
// Originally these used std::basic_string<Py_UNICODE> but I reverted
// back to normal std::string so I can use a JSON parsing library.
typedef string pystring;
typedef char pystring_t;
extern "C" PyObject *
actor(PyObject *self, PyObject *args) {
PyObject *py_context, *py_connect_to;
PyThreadState *_save;
void *context;
char *connect_to;
void *socket;
int rc;
if(!PyArg_ParseTuple(args, "OO", &py_context, &py_connect_to)) {
PyErr_SetString(PyExc_TypeError, "Expected two arguments (ZMQ context, name of socket to connect to)");
return NULL;
}
py_context = PyObject_GetAttrString(py_context, "_handle");
if(py_context == NULL) {
PyErr_SetString(PyExc_TypeError, "Could not get '_handle' from context");
return NULL;
}
if(!PyInt_Check(py_context)) {
PyErr_SetString(PyExc_TypeError, "_handle was not an integer");
return NULL;
}
context = (void*)PyInt_AsLong(py_context);
connect_to = new char[PyString_Size(py_connect_to) + 1];
strcpy(connect_to, PyString_AsString(py_connect_to));
_save = PyEval_SaveThread();
//
// GIL-less operation BEGIN
// ** WARNING: Do NOT call any functions that begin with 'Py', or touch any
// data structures that begin with 'Py' while in this section. It *WILL*
// blow up the Python interpreter.
//
socket = zmq_socket(context, ZMQ_REP);
rc = zmq_connect(socket, connect_to);
pystring TEST("TEST");
pystring STOP("STOP");
pystring SUCCESS("SUCCESS");
pystring FAILURE("FAILURE");
if(rc == 0) {
int going = 1;
// Should be able to hold a full megabyte of text, which should be enough
// for any message being passed in.
// Is there a way to query size of the incoming message...?
char buffer[1000000];
while(going) {
// BLOCKS ON THIS LINE VVVVVV
int size = zmq_recv(socket, buffer, 1000000, 0);
if(size == -1) {
// ERROR
continue;
}
// Assume we don't get larger than 1MB of data. Should put a
// check around this at some point, but not right now.
buffer[size] = 0;
pystring fullmsg(buffer);
cout << "ZMQ RECIEVED: " << fullmsg << endl;
if(fullmsg == TEST) {
size = zmq_recv(socket, &buffer, 1000000, 0);
if(size != -1) {
buffer[size] = 0;
pystring json_fullmsg(buffer);
cout << "ZMQ JSON: " << json_fullmsg << endl;
contacts.add(json_fullmsg);
zmq_send(socket, SUCCESS.c_str(), SUCCESS.size() + 1, 0);
}
else {
zmq_send(socket, FAILURE.c_str(), FAILURE.size() + 1, 0);
}
}
else if(fullmsg == STOP) {
going = 0;
zmq_send(socket, SUCCESS.c_str(), SUCCESS.size() + 1, 0);
}
}
}
else {
// ERROR
int err = zmq_errno();
switch(err) {
case EINVAL:
cout << "ZMQ CONNECT ERR: " << "Endpoint supplied is invalid" << endl;
break;
default:
cout << "ZMQ CONNECT ERR: " << err << endl;
break;
}
}
zmq_close(socket);
//
// GIL-less operation END
//
PyEval_RestoreThread(_save);
Py_INCREF(Py_None);
return Py_None;
}
我们非常感谢您对我们所做的一切的帮助。
编辑:还要注意,此代码将在gevent已对标准库进行猴痘修补的环境中运行。这也是我使用thread.start_new_thread的部分原因,因为它是在monkeypatching发生之前保存的,我想要一个真正的线程,而不是绿色线程。
两件事,
因为您在修改后的版本中使用req/rep,所以"send,send,recv,send…"不起作用。发送/接收必须以"锁定步骤"方式工作(发送、接收、发送、修订)
ZMQ_NOBLOCK将引发EAGAIN的异常,这可能意味着"套接字连接尚未完成,请稍后再来。"尝试在绑定后放置计时器/睡眠发送/接收。这就是导致"资源暂时不可用"消息的原因。
希望这能帮助
mr。onoffon
不确定这会有帮助,但这里是:
- 你确定
send()
阻止了吗,这不仅仅是一个永远得不到回复的问题?您可以用ZMQ_NOBLOCK
调用send()
,并查看是否引发异常。如果是,那么实际上send()
未能将消息排入队列 - 您是否考虑过
PAIR
套接字而不是REQ/REP
?本指南建议线程之间的inproc
通信使用此方法
相关文章:
- 通过套接字[TCP]传输数据 如何在C / C ++中打包多个整数并使用send() recv()传输数据
- WinSock Non-Blocking I/O
- Linux 服务器中的 Recv 缓冲区大小
- 在 for 循环中处理复杂的发送 recv 消息
- C++ TCP 套接字通信 - 连接按预期工作,几秒钟后失败,没有收到新数据,read() 和 recv() 块
- 在 Linux 中关闭 UDP 套接字后,recv 不会返回
- 为什么套接字中的 recv() 函数不返回任何内容?
- C++网络,recv() 无缘无故失败 (?)
- 设置SO_RCVTIMEO时,为什么recv返回-1并且errno=EINTR
- TCP套接字(客户端-服务器)recv()返回-1值
- 在UNIX中通过recv/send交换数据时,如何正确使用缓冲区
- C++中TCP套接字的recv函数
- 仅通过建立一次TCP连接将Recv从客户端发送到服务器套接字
- 为什么 sys 套接字 recv 函数不填充数据但返回字节长度?
- C++ 即使数据可用,Windows recv() 也不会返回
- c++ hooking ws2_32.dll recv
- Epoll zero recv() and negative(EAGAIN) send()
- Linux 套接字 recv 返回 -1,errno 110 连接超时
- ZeroMq recv not blocking
- ZMQ: socket_send/recv blocking