伯克利数据库,并发队列

Berkeley DB, concurrent queues

本文关键字:队列 并发 数据库 伯克利      更新时间:2023-10-16

我正在尝试使用伯克利数据库实现并发持久队列。作为初学者,我试图使两个进程都附加到DB:

#include <unistd.h>
#include <sstream>
#include <db_cxx.h>
class Queue : public DbEnv
{
    public:
    Queue ( ) :
        DbEnv(0),
        db(0)
    {
        set_flags(DB_CDB_ALLDB, 1);
        open("/tmp/db", DB_INIT_LOCK  |
                DB_INIT_LOG   |
                DB_INIT_TXN   |
                DB_INIT_MPOOL |
                DB_RECOVER    |
                DB_CREATE     |
                DB_THREAD,
                0);
        db = new Db(this, 0); 
        db->set_flags(DB_RENUMBER);
        db->open(NULL, "db", NULL, DB_RECNO, DB_CREATE | DB_AUTO_COMMIT | DB_THREAD, 0);
    }
    virtual ~Queue ()
    {
        db->close(0);
        delete db;
        close(0);
    }
    protected:
    Db * db;
};
class Enqueue : public Queue
{
    public:
    Enqueue ( ) : Queue() { }
    virtual ~Enqueue () { }
    bool push(const std::string& s)
    {
        int res;
        DbTxn * txn;
        try {
            txn_begin(NULL, &txn, DB_TXN_SYNC | DB_TXN_WAIT );
            db_recno_t k0[4]; // not sure how mutch data is needs???
            k0[0] = 0;
            Dbt val((void*)s.c_str(), s.length());
            Dbt key((void*)&k0, sizeof(k0[0]));
            key.set_ulen(sizeof(k0));
            key.set_flags(DB_DBT_USERMEM);
            res = db->put(txn, &key, &val, DB_APPEND);
            if( res == 0 ) {
                txn->commit(0);
                return true;
            } else {
                std::cerr << "push failed: " << res << std::endl;
                txn->abort();
                return false;
            }
        } catch( DbException e) {
            std::cerr << "DB What()" << e.what() << std::endl;
            txn->abort();
            return false;
        } catch( std::exception e) {
            std::cerr << "What()" << e.what() << std::endl;
            txn->abort();
            return false;
        } catch(...) {
            std::cerr << "Unknown error" << std::endl;
            txn->abort();
            return false;
        }
    }
};
using namespace std;
int main(int argc, const char *argv[])
{
    fork();
    Enqueue e;
    stringstream ss;
    for(int i = 0; i < 10; i++){
        ss.str("");
        ss << "asdf" << i;
        cout << ss.str() << endl;
        if( ! e.push(ss.str()) )
            break;
    }
    return 0;
}
编译:

g++ test.cxx -I/usr/include/db4.8 -ldb_cxx-4.8
创建db-dir
mkdir /tmp/db

当我运行它的时候,我得到了各种各样的错误(分割错误,分配错误,有时它实际上工作)

我确信我错过了一些锁定,但我只是不知道如何做。所以,任何提示和/或建议来解决这个问题是非常受欢迎的。

只是为了记录,这是我在谷歌上搜索和试错之后确定的解决方案。

应用程序是一个call home进程,其中生产者正在添加数据,而消费者试图将其发送回家。如果消费者未能寄回家,它必须再试一次。当消费者试图接收数据时,数据库不能阻塞生产者。

代码有一个文件锁,并且只允许一个消费进程。

下面是代码:
#include <db_cxx.h>
#include <sstream>
#include <fstream>
#include <vector>
#include <boost/interprocess/sync/file_lock.hpp>
class Queue : public DbEnv
{
public:
    Queue ( bool sync ) :
        DbEnv(0),
        db(0)
    {
        set_flags(DB_CDB_ALLDB, 1);
        if( sync )
            set_flags(DB_TXN_NOSYNC, 0);
        else
            set_flags(DB_TXN_NOSYNC, 1);
        open("/tmp/db", DB_INIT_LOCK |
             DB_INIT_LOG | DB_INIT_TXN | DB_INIT_MPOOL |
             DB_REGISTER | DB_RECOVER | DB_CREATE | DB_THREAD,
             0);
        db = new Db(this, 0);
        db->set_flags(DB_RENUMBER);
        db->open(NULL, "db", NULL, DB_RECNO, DB_CREATE | DB_AUTO_COMMIT | DB_THREAD, 0);
    }
    virtual ~Queue ()
    {
        db->close(0);
        delete db;
        close(0);
    }
protected:
    Db * db;
};
struct Transaction
{
    Transaction() : t(0) { }
    bool init(DbEnv * dbenv ){
        try {
            dbenv->txn_begin(NULL, &t, 0);
        } catch( DbException e) {
            std::cerr << "DB What()" << e.what() << std::endl;
            return false;
        } catch( std::exception e) {
            std::cerr << "What()" << e.what() << std::endl;
            return false;
        } catch(...) {
            std::cerr << "Unknown error" << std::endl;
            return false;
        }
        return true;
    }
    ~Transaction(){ if( t!=0) t->abort(); }
    void abort() { t->abort(); t = 0; }
    void commit() { t->commit(0); t = 0; }
    DbTxn * t;
};
struct Cursor
{
    Cursor() : c(0) { }
    bool init( Db * db,  DbTxn * t) {
        try {
            db->cursor(t, &c, 0);
        } catch( DbException e) {
            std::cerr << "DB What()" << e.what() << std::endl;
            return false;
        } catch( std::exception e) {
            std::cerr << "What()" << e.what() << std::endl;
            return false;
        } catch(...) {
            std::cerr << "Unknown error" << std::endl;
            return false;
        }
        return true;
    }
    ~Cursor(){ if( c!=0) c->close(); }
    void close(){ c->close(); c = 0; }
    Dbc * c;
};
class Enqueue : public Queue
{
public:
    Enqueue ( bool sync ) : Queue(sync) { }
    virtual ~Enqueue () { }
    bool push(const std::string& s)
    {
        int res;
        Transaction transaction;
        if( ! transaction.init(this) )
            return false;
        try {
            db_recno_t k0[4]; // not sure how mutch data is needs???
            k0[0] = 0;
            Dbt val((void*)s.c_str(), s.length());
            Dbt key((void*)&k0, sizeof(k0[0]));
            key.set_ulen(sizeof(k0));
            key.set_flags(DB_DBT_USERMEM);
            res = db->put(transaction.t, &key, &val, DB_APPEND);
            if( res == 0 ) {
                transaction.commit();
                return true;
            } else {
                std::cerr << "push failed: " << res << std::endl;
                return false;
            }
        } catch( DbException e) {
            std::cerr << "DB What()" << e.what() << std::endl;
            return false;
        } catch( std::exception e) {
            std::cerr << "What()" << e.what() << std::endl;
            return false;
        } catch(...) {
            std::cerr << "Unknown error" << std::endl;
            return false;
        }
    }
};
const char * create_file(const char * f ){
    std::ofstream _f;
    _f.open(f, std::ios::out);
    _f.close();
    return f;
}
class Dequeue : public Queue
{
public:
    Dequeue ( bool sync ) :
        Queue(sync),
        lock(create_file("/tmp/db-test-pop.lock")),
        number_of_records_(0)
    {
        std::cout << "Trying to get exclusize access to database" << std::endl;
        lock.lock();
    }
    virtual ~Dequeue ()
    {
    }
    bool pop(size_t number_of_records, std::vector<std::string>& records)
    {
        if( number_of_records_ != 0 ) // TODO, warning
            abort();
        Cursor cursor;
        records.clear();
        if( number_of_records_ != 0 )
            abort(); // TODO, warning
        // Get a cursor
        try {
            db->cursor(0, &cursor.c, 0);
        } catch( DbException e) {
            std::cerr << "DB What()" << e.what() << std::endl;
            abort();
            return false;
        }
        // Read and delete
        try {
            Dbt val;
            db_recno_t k0 = 0;
            Dbt key((void*)&k0, sizeof(k0));
            for( size_t i = 0; i < number_of_records; i ++ ) {
                int get_res = cursor.c->get(&key, &val, DB_NEXT);
                if( get_res == 0 )
                    records.push_back(std::string((char *)val.get_data(), val.get_size()));
                else
                    break;
            }
            number_of_records_ = records.size();
            if( number_of_records_ == 0 ) {
                abort();
                return false;
            } else {
                return true;
            }
        } catch( DbException e) {
            std::cerr << "DB read/delete What() " << e.what() << std::endl;
            abort();
            return false;
        } catch( std::exception e) {
            std::cerr << "DB read/delete What() " << e.what() << std::endl;
            abort();
            return false;
        }
    }
    bool commit()
    {
        if( number_of_records_ == 0 )
            return true;
        Transaction transaction;
        Cursor      cursor;
        if( ! transaction.init(this) )
            return false;
        if( ! cursor.init(db, transaction.t) )
            return false;
        // Read and delete
        try {
            Dbt val;
            db_recno_t k0 = 0;
            Dbt key((void*)&k0, sizeof(k0));
            for( size_t i = 0; i < number_of_records_; i ++ ) {
                int get_res = cursor.c->get(&key, &val, DB_NEXT);
                if( get_res == 0 )
                    cursor.c->del(0);
                else
                    break; // this is bad!
            }
            number_of_records_ = 0;
            cursor.close();
            transaction.commit();
            return true;
        } catch( DbException e) {
            std::cerr << "DB read/delete What() " << e.what() << std::endl;
            return false;
        } catch( std::exception e) {
            std::cerr << "DB read/delete What() " << e.what() << std::endl;
            return false;
        }
    }
    void abort()
    {
        number_of_records_ = 0;
    }
private:
    boost::interprocess::file_lock lock;
    size_t  number_of_records_;
    sigset_t orig_mask;
};

如果你看到任何错误,请告诉我,或者如果你知道一个更简单的方法来做到这一点。