在 Node.js C++ 插件中发出事件

Emitting an Event in Node.js C++ Addon

本文关键字:出事件 插件 C++ Node js      更新时间:2023-10-16

我有一个从工业控制器读取过程数据数组的应用程序。我想在数据更改时将该数据推送到网页。为此,我用 c++ 编写了一个 node.js 插件,它扫描流程数据并尝试在数据值更改时触发事件。插件一切正常,直到它尝试触发事件,此时 node.js 终止并显示错误:

undefined:0

TypeError: undefined is not a function

下面是 CPP、javascript shim 和测试 JavaScript。任何见解都非常感谢。

提前谢谢。

node_corelink.cpp

typedef struct CoreLinkValue
{
    // pointer to a CTS variant value
    CtsVariant* value;
    // copy of the last value that was broadcast
    CtsVariant lastValue;
} CoreLinkValue;

//
// An event structure for pushing events to node.js
// Requires the javascript shim code in node_corelink.js
//
struct Emitter: ObjectWrap 
{
    static Handle<Value> New(const Arguments& args);
    static Handle<Value> DataChange(const char* topic, CtsVariant* value);
};
//
// Create a message payload based on the variant type and
// initiate sending out on the topic
//
static Handle<Value>
createVariantHandle(CtsVariant* value)
{
    Handle<Value> ret;
    switch (value->type)
    {
    case CTSTYPE_BIT:
    case CTSTYPE_BYTE:
        ret = Integer::New(value->value.byte[0]);
        break;
    case CTSTYPE_WORD:
        ret = Integer::New(value->value.word[0]);
        break;
    case CTSTYPE_DWORD:
        ret = Integer::New(value->value.dword[0]);
        break;  
    case CTSTYPE_WORD64:
        ret = Number::New(value->value.word64);
        break;
    case CTSTYPE_REAL64:
        ret = Number::New(value->value.real64);
        break;
    default:
        ret = Undefined();
        break;
    }

    return ret;
}

Handle<Value> Emitter::New(const Arguments& args) 
{
  HandleScope scope;
  assert(args.IsConstructCall());
  Emitter* self = new Emitter();
  self->Wrap(args.This());
  return scope.Close(args.This());
}

// emits DataChange Event
Handle<Value> Emitter::DataChange( const char* topic, CtsVariant* value ) 
{
  HandleScope scope;
  Handle<Value> argv[3] = {
    String::New("DataChange"),  // event name
    String::New(topic),             // topic argument
    createVariantHandle(value)      // value argument
  };

  printf ("C++ Emitting event!n" );
  MakeCallback(context_obj_, "emit", 2, argv);
  return True();
}

//
// Triggered by the event loop on a regular interval.
// Scans the registered data to see if the latest value has been
// broadcast and does so if needed.
//
void
scan_task( uv_timer_t* timer, int status )
{
    std::map<std::string, CoreLinkValue>::iterator it;
    bool doUpdate;
    for(    it = pdos_.begin();
        it != pdos_.end();
        ++it )
    {
        if (forceRefreshPdos_ == true)
        {
            //
            // An update of this value was requested.
            //
            doUpdate = true;
        }
        else if ( it->second.value->type != it->second.lastValue.type )
        {
            //
            // If the types don't match, then this variant was obviously
            // updated.
            //
            doUpdate = true;
        } 
        else if ( it->second.value->value.word64 != it->second.lastValue.value.word64 )
        {
            //
            // Word64 contains all bits of the value. If this value has
            // changed, then they've all changed.
            //
            doUpdate = true;
        }
        else
        {
            doUpdate = false;
        }
        if (doUpdate)
        {
            it->second.lastValue.value = it->second.value->value;
            Emitter::DataChange( it->first.c_str(), it->second.value );
        }
    }

    if (forceRefreshPdos_)
    {
        forceRefreshPdos_ = false;
        printf("Completed refresh all.n");
    }       
}

//
// Start the execution of the scan loop
//
int
startScanLoop( void )
{
    uv_timer_init( uv_default_loop(), &scanTimer_ );
    uv_timer_start( 
        &scanTimer_,    // timer instance
        &scan_task,         // callback function
        0,              // startup delay    (ms)
        100 );          // repeat interval (ms)
    return 1;
}

//
// Stop the execution of the scan loop
//
void
stopScanLoop( void )
{
    uv_timer_stop( &scanTimer_ );
}

//
// Connects to the kernel IPC 
//
Handle<Value> 
connect(const Arguments& args) 
{
    HandleScope scope;

    ...
    startScanLoop();
    return scope.Close( True() );
}

//
// Shuts down the kernel IPC 
//
Handle<Value> 
close(const Arguments& args) 
{
    HandleScope scope;
    stopScanLoop();
    ...
    return scope.Close( True() );
}
//
// Called by node.js to initialize the library.
//
void 
init(Handle<Object> target) 
{
    target->Set(String::NewSymbol("connect"),
        FunctionTemplate::New(connect)->GetFunction()); 
    target->Set(String::NewSymbol("close"),
        FunctionTemplate::New(close)->GetFunction());   

    //
    // Events interface
    //
    Local<FunctionTemplate> t = FunctionTemplate::New(Emitter::New);
    t->InstanceTemplate()->SetInternalFieldCount(1);
    t->SetClassName(String::New("Emitter"));
    target->Set(String::NewSymbol("Emitter"), t->GetFunction());    

}
NODE_MODULE(node_corelink, init)

node_corelink.js

module.exports = require(__dirname + '/build/Release/node_corelink.node');
var Emitter = require(__dirname + '/build/Release/node_corelink.node').Emitter;
var events = require('events');
inherits(Emitter, events.EventEmitter);
exports.Emitter = Emitter;
// extend prototype
function inherits(target, source) {
  for (var k in source.prototype)
    target.prototype[k] = source.prototype[k];
}

测试.js

process.stdin.resume(); //so the program will not close instantly
process.on('exit', function () {
    corelink.close();
    console.log('Goodbye!');
});
process.on('SIGINT', function () {
  console.log('Got SIGINT.');
  process.exit();
});

var corelink = require('./node_corelink');
var Emitter = require('./node_corelink').Emitter;
var e = new Emitter();
e.on('DataChange', function(s) {
  console.log('DataChange');
});

corelink.connect();

我能够以不太优雅的方法触发回调。

node_corelink.js

module.exports = require(__dirname + '/build/Release/node_corelink.node');

测试.js

var corelink = require('./node_corelink');
function onDataChange( topic, value )
{
    if ( value !== undefined )
        console.log ( topic + " ::: " + value.toString() );
}
function onMessage( msg )
{
    console.log ( "Message from kernel: " + msg.toString() );
}
corelink.connect(onDataChange, onMessage);

node_corelink.cpp

static void 
dataChange( const char* topic, CtsVariant* value ) 
{
    HandleScope scope;
    Handle<Value> argv[2] = 
    {
        String::New(topic),         // topic argument
        createVariantHandle(value)      // value argument
    };
    MakeCallback(Context::GetCurrent()->Global(), pfOnDataChange_, 2, argv);
}

static void 
onMessage( const char* message ) 
{
    HandleScope scope;
    Handle<Value> argv[1] = 
    {
        String::New(message)            // message argument
    };
    MakeCallback(Context::GetCurrent()->Global(), pfOnMessage_, 1, argv);
}

//
// Connects to the kernel IPC 
//
Handle<Value> 
connect(const Arguments& args) 
{
    HandleScope scope;
    if ( args.Length() < 2 
        || !args[0]->IsFunction()
        || !args[1]->IsFunction() )
    {
        return scope.Close( False() );
    }
    pfOnDataChange_ =  Persistent<Function>::New(args[0].As<Function>());
    pfOnMessage_ = Persistent<Function>::New(args[1].As<Function>());
    ...

    return scope.Close( True() );
}