从C 中的多个线程调用Python函数

Calling a Python function from multiple threads in C++

本文关键字:线程 调用 Python 函数      更新时间:2023-10-16

找到了几个类似的线程,但没有任何帮助。

基本上,我有一个C 应用程序,它想从Python脚本调用函数。这一切都很好。

但是,因为我需要实时工作,而python功能需要一些时间,所以我想添加多线程。

基本上,这两种情况之一:

  1. c 中的线程池,每个线程都会调用python功能。

  1. python中的螺纹池,C 应用程序正在将任务添加到Python-Quele中。

我有点喜欢第一个选项,所以让我们继续前进。基本上,我有一个带有功能的Python脚本(实际上,有一个TensorFlow预测):

import time
def pfoo(msg):
  print("Python >> Function called)
  time.sleep(2)
  print("Python << Function finished)

和c threadpool(主要是从这里取):

ThreadPool pool(4);
// initialize the Python
Py_Initialize();
// initialize thread support
PyEval_InitThreads();
// ...
PyObject* m_PyModule = PyImport_ImportModule( "test" );
PyObject* m_PyDict = PyModule_GetDict( m_PyModule );
PyObject* m_PyFoo = PyDict_GetItemString( m_PyDict, "pfoo" );
for ( int i = 0; i < 10; i++ ) { 
    pool.enqueue( [&] { 
      PyEval_CallObject( m_PyFoo, Py_BuildValue( "(s)", arg ) );
    } );
}

您可以想象,什么也没发生,因为您不能在仍在运行时调用相同的功能。我尝试了Py_BEGIN_ALLOW_THREADS宏,我尝试了PyGILState_Ensure()的事情,并且此更为复杂。我没有想法。

a尝试了第二种情况,其中我在Python中有一个无限的循环线程,该线程从queue.Queue()读取任务并将其放入ThreadPoolExecutor中,并且C 应用程序调用一个函数以将任务添加到上述列表中。也不适合我(如果我只是在Python中运行它,但是如果嵌入到C 中,则无法使用)。

我想我可能找到了一个可通过的解决方案。使用合成示例,但需要通过TensorFlow预测进行实时测试:

python脚本仍然使用螺纹池,但没有无限的线索:

from concurrent.futures import ThreadPoolExecutor
import threading
import time
executor = ThreadPoolExecutor(max_workers=4)
lock = threading.Lock()
counter = 0
def worker(msg, n):
  global counter
  with lock:
    counter += 1
  print("Python >> Function called with (%s, %d)" % (msg, n))
  for i in range(n): # do some work...
    print("tPython :: %s: %d" % (msg, i))
    time.sleep(1)
  print("Python << Finished function for (%s, %d)" % (msg, n))
  with lock:
    counter -= 1
    if counter < 0:
      counter = 0

def add(msg, n):
  global executor, counter
  if counter < 4:
    print("++%d threads are free, adding task: %s" % (4 - counter, msg))
    executor.submit(worker, msg, n)

和C 应用只需在必要时调用"添加"函数:

std::map<std::string, PyObject*> m_PyFunctions;
m_PyFunctions["add"] = PyDict_GetItemString( m_PyDict, "add" );
for ( int i = 0; i < 100; i++ ) 
{
    std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
    PyEval_CallObject( m_PyFunctions["add"], 
                       Py_BuildValue( "(s, l)", "task_#" + std::to_string( i ), std::rand() % 5 ) );
}

编辑:是的,用我的主要应用程序对此进行了测试,它可以正常工作。一个粗糙的解决方案,但仍然可以通过。