使用类似关系数据库的boost多索引

Using boost multi index like relational DB

本文关键字:boost 索引 关系数据库      更新时间:2023-10-16

以下是我试图模拟的情况:

  COL1                 Col2     Col3
CBT.151.5.T.FEED       S1       t1
CBT.151.5.T.FEED       s2       t2
CBT.151.5.T.FEED       s3       t3
CBT.151.5.T.FEED       s4       t4
CBT.151.5.T.FEED       s5       t1
CBT.151.8.T.FEED       s7       t1
CBT.151.5.Q.FEED       s8       t3

COL1-是ID,对于给定的ID,可以有几个符号
COL2-符号,它们是唯一的
COL3-一个符号的更新时间,两个不同的符号可能同时更新,因此它们不是唯一的

我的目标是获得最活跃的股票代码,比如说在过去60秒内更新的符号。为此,我使用了boost多指数。

头文件:

#ifndef __TICKER_INFO_MANAGER_IMPL__
#define __TICKER_INFO_MANAGER_IMPL__
#include <boost/interprocess/containers/string.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <TickerInfoManagerConstants.h>
#include <TickerInfo.h>
namespace bmi = boost::multi_index;
namespace bip = boost::interprocess;
struct id_index{};
struct symbol_index{};
struct last_update_time_index{};
struct Less {
  template<class T, class U>
    bool operator()(T const& t, U const& u) const {
      return t < u;
    }
};

typedef bmi::multi_index_container<
tickerUpdateInfoT,
  bmi::indexed_by<
  bmi::ordered_unique
  <bmi::tag<id_index>,  BOOST_MULTI_INDEX_MEMBER( tickerUpdateInfo, shm_string, m_id), Less>,
  bmi::ordered_unique<
  bmi::tag<symbol_index>,BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, shm_string, m_symbol), Less>,
  bmi::ordered_non_unique
  <bmi::tag<last_update_time_index>, BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, int, m_last_update_time), Less> >,
  bip::managed_shared_memory::allocator<tickerUpdateInfo>::type
  > ticker_update_info_set;
  class tickerInfoMangerImplementation {
    public:
      tickerInfoMangerImplementation( const sharedMemoryNameT & name );
      bool put_records( const tickerUpdateInfoT & record );
      int get_active_ticker_count( const thresholdT seconds );
      void print_contents();
      bip::managed_shared_memory& get_managed_memory_segment() {
        return m_managed_memory_segment;
      }
    private:
      const sharedMemoryNameT    m_name;
      bip::managed_shared_memory m_managed_memory_segment;
      ticker_update_info_set     *p_ticker_info_set;
  };
#endif

cpp文件

#include <TickerInfoMangerImplementation.h>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <iostream>
#include "basic_time.h"
using namespace boost::interprocess;
tickerInfoMangerImplementation::tickerInfoMangerImplementation( const sharedMemoryNameT & name ): m_name(name),
  m_managed_memory_segment( open_or_create, "test", 65536 )
{
  p_ticker_info_set = m_managed_memory_segment.find_or_construct<ticker_update_info_set>
    ("SetOfTickerUpdateInformation")            //Container's name in shared memory
    ( ticker_update_info_set::ctor_args_list()
      , m_managed_memory_segment.get_allocator<tickerUpdateInfoT>());  //Ctor parameters
}
bool tickerInfoMangerImplementation::put_records( const tickerUpdateInfoT & record ) {
  std::pair<ticker_update_info_set::iterator, bool> result_pair = p_ticker_info_set->insert( record );
  if( result_pair.second ) {
    return result_pair.second;
  }
  typedef ticker_update_info_set::index<symbol_index>::type ticker_update_info_set_by_symbol;
  ticker_update_info_set_by_symbol & sym_index = (*p_ticker_info_set).get<symbol_index>();
  ticker_update_info_set_by_symbol::iterator it = sym_index.find( record.m_symbol );
  tickerUpdateInfoT ticker_info = *it;
  ticker_info.m_last_update_time = record.m_last_update_time;
  return sym_index.replace( it, ticker_info );
}
int tickerInfoMangerImplementation::calculate_historical_time_using_threshold( const thresholdT seconds ) {
  basic_time::Secs_t seconds( threshold );
  basic_time tick_time;
  tick_time -= seconds;
  return ( tick_time.fullTime() );
}
int tickerInfoMangerImplementation::get_active_ticker_count( const thresholdT seconds, std::string key ) {
  typedef ticker_update_info_set::index<id_index>::type ticker_update_info_set_by_id;
  ticker_update_info_set_by_id & id_index = (*p_ticker_info_set).get<id_index>();
  int tick_time = calculate_historical_time_using_threshold( seconds );
  //Here I would like to find the key
  //Based on that key I would like to fetch all the symbols which have updated after a certain time(using lower bound)
  std::copy( it, time_index.end(), std::ostream_iterator<tickerUpdateInfoT>(std::cout) );
}

void tickerInfoMangerImplementation::print_contents() {
  const ticker_update_info_set::nth_index<1>::type& name_index = (*p_ticker_info_set).get<1>();
  std::copy( name_index.begin(), name_index.end(), std::ostream_iterator<tickerUpdateInfoT>(std::cout) );
}
std::ostream& operator<<(std::ostream& os, const tickerUpdateInfoT & obj) {
  os << obj.m_id << " ";
  os << obj.m_symbol << " ";
  os << obj.m_last_update_time << " " << "n";
  return os;
};

我将插入到boost多索引中的记录的结构

#ifndef __TICKER_INFO__
#define __TICKER_INFO__
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/string.hpp>
typedef boost::interprocess::managed_shared_memory::allocator<char>::type               char_allocator;
typedef boost::interprocess::basic_string<char, std::char_traits<char>, char_allocator> shm_string;
//Data to insert in shared memory
typedef struct tickerUpdateInfo {
  shm_string  m_id;
  shm_string  m_symbol;
  int         m_last_update_time;
  tickerUpdateInfo( const char * id,
      const char *symbol,
      int last_update_time,
      const char_allocator &a)
    : m_id( id, a), m_symbol( symbol, a), m_last_update_time( last_update_time) {
    }
  tickerUpdateInfo& operator=(const tickerUpdateInfo& other) {
   if (this != &other) {
       m_last_update_time = other.m_last_update_time;
      }
    return *this;
  }
} tickerUpdateInfoT;
#endif

现在,在函数get_active_ticker_count()中,我想指定像CBT.151.5.T.FEED这样的键,它应该返回:

   S1       t1
   s2       t2
   s3       t3
   s4       t4
   s5       t1

让我们假设t1>t2>t3>t4,那么我想找出时间大于t3的这样的集合,并且也想找到这样的符号的计数。我如何进行同样的操作,我已经能够插入,但我被检索部分卡住了。请帮忙!

我已经将您的(极其复杂的)模型简化为:

enum TimePoints { // Lets assume t1 > t2 > t3 > t4
    t1 = 100,
    t2 = 80,
    t3 = 70,
    t4 = 20,
};
using IdType = std::string;
using Symbol = std::string;
using TimeT  = unsigned int;
struct tickerUpdateInfo {
    IdType m_id;
    Symbol m_symbol;
    TimeT  m_last_update_time;
    friend std::ostream& operator<<(std::ostream& os, tickerUpdateInfo const& tui) {
        return os << "T[" << tui.m_id << ",t" << tui.m_symbol << ",t" << tui.m_last_update_time << "]";
    }
} static const data[] = {
    { "CBT.151.5.T.FEED", "S1", t1 },
    { "CBT.151.5.T.FEED", "s2", t2 },
    { "CBT.151.5.T.FEED", "s3", t3 },
    { "CBT.151.5.T.FEED", "s4", t4 },
    { "CBT.151.5.T.FEED", "s5", t1 },
    { "CBT.151.8.T.FEED", "s7", t1 },
    { "CBT.151.5.Q.FEED", "s8", t3 },
};

那里。我们可以这样做。你想要一个主要基于时间的索引,但你可以稍后为符号/id进行细化:

typedef bmi::multi_index_container<tickerUpdateInfo,
    bmi::indexed_by<
        bmi::ordered_non_unique<bmi::tag<struct most_active_index>,
            bmi::composite_key<tickerUpdateInfo,
                BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, TimeT,  m_last_update_time),
                BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, Symbol, m_symbol),
                BOOST_MULTI_INDEX_MEMBER(tickerUpdateInfo, IdType, m_id)
        > > >
    > ticker_update_info_set;

对于我们的实现,我们甚至不需要使用次要的关键组件,我们只需要编写

std::map<Symbol, size_t> activity_histo(ticker_update_info_set const& tuis, TimeT since)
{
    std::map<Symbol, size_t> histo;
    auto const& index = tuis.get<most_active_index>();
    auto lb = index.upper_bound(since); // for greater-than-inclusive use lower_bound
    for (auto& rec : boost::make_iterator_range(lb, index.end()))
        histo[rec.m_symbol]++;
    return histo;
}

请参阅Coliru直播

现在,如果卷变大,您可能会被诱惑使用辅助索引组件进行一点优化:

std::map<Symbol, size_t> activity_histo_ex(ticker_update_info_set const& tuis, TimeT since)
{
    std::map<Symbol, size_t> histo;
    auto const& index = tuis.get<most_active_index>();
    for (auto lb = index.upper_bound(since), end = tuis.end(); lb != end;) // for greater-than-inclusive use lower_bound
    {
        auto ub = index.upper_bound(boost::make_tuple(lb->m_last_update_time, lb->m_symbol));
        histo[lb->m_symbol] += std::distance(lb, ub);
        lb = ub;
    }
    return histo;
}

我不确定这是否会成为更快的方法(你的探查器会知道)。也可以在Coliru上观看直播

重新思考设计

TBH这整个多索引的事情可能会减慢您的速度,因为在迭代记录时,插入时间不理想,并且缺乏引用的局部性。

我建议看

  • 按更新时间排序的单个flat_multimap
  • 或者甚至是(固定大小)线性环形缓冲区随时间的顺序。这将非常有意义,因为无论如何,你最有可能以递增的时间顺序接收事件,所以你可以在最后继续追加(并在历史窗口满时结束)。这一次消除了所有重新分配的需要(假设您为环形缓冲区选择了适当的最大容量),并为您提供了遍历统计数据列表的最佳缓存预取性能

一旦您使用BoostLockfree的spsc_queue产品实现了环形缓冲区,第二种方法应该会有一些优点。为什么因为您可以将其托管在共享内存中

共享内存IPC同步(无锁)


cco复杂性是有保证的iff您的代码将是自包含的。遗憾的是,事实并非如此。我不得不把它修剪一下,以便找到工作。很明显,这是在删除了所有行号之后:)