LuaPlus:如何从多线程C++中调用Lua函数

LuaPlus: How to call Lua function from multithreaded C++?

本文关键字:调用 Lua 函数 C++ 多线程 LuaPlus      更新时间:2023-10-16

我的Lua脚本中有一种回调函数,我想从C++端的不同线程调用它(每秒0-100次)。到目前为止,它基本上可以工作,但一旦我在很短的时间内多次调用它,它就会崩溃程序,导致错误,如:
-As????ion failed: 0, file ...LuaFunction.h, line 146或这个(完全随机的)

我认为,当它在完成另一个任务之前被C++端调用时,就会发生这种情况。对我来说,最明显的尝试(在lua函数调用期间互斥锁锁定所有线程)根本没有帮助/
如果我每2秒只调用一次Lua函数,那么我根本不会得到任何错误(好吧,在清理部分之前,如果它达到这个点,它将崩溃,没有特定的错误)。

这是我的代码(我试图尽可能地裁剪和简化我的代码,并添加了很多注释):

#include "stdafx.hpp"
#include <pthread.h> //for multithreading
#include <windows.h>
#include <iostream>
#include <map>
using namespace std;
unsigned int maxThreads = 100;
map<unsigned int, pthread_t> threads;
map<unsigned int, bool> threadsState;
pthread_mutex_t mutex; //to lock the pthreads (to keep printing from overlapping etc)
LuaPlus::LuaState* pState = LuaPlus::LuaState::Create( true ); //initialize LuaPlus
LuaPlus::LuaObject globals = pState->GetGlobals();
struct argumentStruct { //to pass multiple arguments to the function called when starting a pthread
    unsigned int threadId;
    int a;
    int b;
};
map<unsigned int, struct argumentStruct> argumentMap; //we store the arguments of active threads in here
void *ThreadFunction(void *arguments) { //will be called for every pthread we're going to create
    struct argumentStruct*args = (struct argumentStruct*)arguments; //get the arrgument struct
    int threadId = args->threadId; //get variables for each struct field
    int a = args->a;
    int b = args->b;
    Sleep(3000); //since this is a very simplified version of my actual project
    int c = a+b;
    pthread_mutex_lock(&mutex); //lock pthreads for the next lines
      LuaPlus::LuaFunction<int> CPP_OnMyEvent = pState->GetGlobal("LUA_OnMyEvent"); //get the Lua callback function to call on the C++ side
      CPP_OnMyEvent(a,b,c); //call to our lua-callback function
    pthread_mutex_unlock(&mutex); //unlock pthreads
    threadsState[threadId] = false; //mark the thread as finished/ready to get overwritten by a new one
    return NULL;
}
bool AddThread(int a, int b) {
    for (;;) {
        if (threads.size() < maxThreads) { //if our array of threads isn't full yet, create a new thread
            int id = threads.size();
            argumentMap[id].threadId = threads.size();
            argumentMap[id].a = a;
            argumentMap[id].b = b;
            threadsState[id] = true; //mark the thread as existing/running
            pthread_create(&threads[id], NULL, &ThreadFunction, (void *)&argumentMap[id]);
            return true;
        } else {
            unsigned int id;
            for (auto thread=threads.begin(); thread!=threads.end(); ++thread) {
                id = thread->first;
                if(!threadsState[id]) { //if thread with id "id" has finished, create a new thread on it's pthread_t
                    argumentMap[id].threadId = id;
                    argumentMap[id].a = a;
                    argumentMap[id].b = b;
                    threadsState[id] = true; //mark the thread as existing/running
                    pthread_join(threads[id], NULL);
                    pthread_create(&threads[id], NULL, &ThreadFunction, (void *)&argumentMap[id]);
                    return true;
                }
            }
        }
    }
    return false;
}

int main() {
    pthread_mutex_init(&mutex, NULL); //initialize the mutex
    //LuaPlus::LuaState* pState = LuaPlus::LuaState::Create( true ); //we already initialized this globally
    //LuaPlus::LuaObject globals = pState->GetGlobals();
    //pState->DoString("function LUA_OnMyEvent(arg1,arg2) print(arg1..arg2) end"); //it's already in main.lua
    globals.RegisterDirect("AddThread", AddThread);
    char pPath[ MAX_PATH ];
    GetCurrentDirectory(MAX_PATH,pPath);
    strcat_s(pPath,MAX_PATH,"\main.lua");
    if( pState->DoFile(pPath) ) { //run our main.lua script which contains the callback function that will run a print
        if( pState->GetTop() == 1 )
            std::cout << "An error occured: " << pState->CheckString(1) << std::endl;
    }
    for (auto thread=threads.begin(); thread!=threads.end(); ++thread) { //wait for threads to finish
        unsigned int id = thread->first;
        if(threadsState[id])
            pthread_join(threads[id], NULL);
    }
    //clean up
    LuaPlus::LuaState::Destroy( pState );
    pState = nullptr;
    pthread_mutex_destroy(&mutex);
    getchar(); //keep console from closing
    return 0;
}

main.lua

function LUA_OnMyEvent(a,b,c)
    print(a.."+"..b.."="..c)
end
for i=1, 999, 1 do
    AddThread(i,i*2)
end

我对Lua的了解不足以在Lua方面为您提供解决方案,但这种问题的观点可能会帮助您实现这一点。

当你从Lua调用AddThread()时,会发生这样的事情:

1. LuaState allocations
2. AddThread() execution
3. LuaState unwinding

在ThreadFunction()上时。。。

A. Mutex lock
B. LuaState allocations
C. LUA_OnMyEvent() execution
D. LuaState unwinding 
E. Mutex Unlock

AddThread没有互斥控制,因此竞争条件可能发生在1/3和B/D之间。但是,将互斥对象添加到AddThread中并不能解决问题,因为它仍然会在1到3之间运行。

如果AddThread()只在程序初始化时调用,那么您可以阻塞所有线程,直到初始化完成。如果在程序执行期间频繁调用它,那么我会从一个单独的LuaState进行这些调用。

[EDIT]第二个想法:使用生产者/消费者的方法。那么C++线程就不需要运行Lua代码了。

C++建议:

//-- start Task.h --
struct Task{
  static list<Task*> runningTasks;
  static list<Task*> doneTasks;
  static pthread_mutex_t mutex;
  list<Task*>::iterator iterator;
  virtual ~Task(){}
  bool start(){
    pthread_mutex_lock(&mutex);
    bool hasSpace = runningTasks.size() < 100;
    if(hasSpace){
      runningTasks.push_front(this);
      iterator = runningTasks.begin();
      pthread_t unusedID;
      pthread_create(&unusedID, NULL, Task::threadBody, this);
    }
    pthread_mutex_unlock(&mutex);
    return hasSpace;
  }
  virtual void run() = 0;
  virtual void processResults() = 0;
protected:
  void finish(){
    pthread_mutex_lock(&mutex);
    runningTasks.erase(iterator);
    doneTasks.push_front(this);
    pthread_mutex_unlock(&mutex);
  }
  static void* threadBody(void* instance){
    Task* task = static_cast<Task*>(instance);
    task->run();
    task->finish();
    return NULL;
  }
};
//-- end Task.h --
//-- start Task.cpp --
//Instantiate Task's static attributes
pthread_mutex_t Task::mutex;
list<Task*> Task::runningTasks;
list<Task*> Task::doneTasks;
//-- end Task.cpp --
struct SumTask: public Task{
  int a, b, c;
  void run(){
    Sleep(3000);
    c = a+b;
  }
  void processResults(){
    LuaPlus::LuaFunction<int> CPP_OnMyEvent = pState->GetGlobal("LUA_OnMyEvent");
    CPP_OnMyEvent(a,b,c);
  }
}
//functions called by Lua
bool runSumTask(int a, int b){
  SumTask task* = new SumTask();
  task->a = a; task->b = b;
  bool ok = task->start();
  if(!ok)
    delete task;
  return ok;
}
int gatherResults(){
  pthread_mutex_lock(&Task::mutex);
  int totalResults = Task::doneTasks.size();
  while(Task::doneTasks.size() > 0){
    Task* t = Task::doneTasks.front();
    Task::doneTasks.pop_front();
    t->processResults();
    delete t;
  }
  pthread_mutex_unlock(&Task::mutex);
  return totalResults;
}
int main() {
    //Must initialize/destroy Task::mutex
    pthread_mutex_init(&Task::mutex, NULL);
    //...
    pthread_mutex_destroy(&Task::mutex);
}

Lua代码:

function LUA_OnMyEvent(a,b,c)
  print(a.."+"..b.."="..c)
end
local totalRunning = 0;
for i=1, 999, 1 do
  if (runSumTask(i,i*2))
    totalRunning = totalRunning + 1;
  totalRunning -= gatherResults();
end
while(totalRunning > 0) do
  totalRunning -= gatherResults();
  mySleepFunction(...);
end