"live"流的简单移动平均线 - 快速实施

simple moving average of "live" stream - fast implementation

本文关键字:live 简单 移动      更新时间:2023-10-16

在我的交易应用程序中,我有股票价格的"实时报价"。我需要维持SMA。假设我想要20支蜡烛的SMA,每支蜡烛的持续时间是10秒。这意味着

每10秒我就有一个"检查点",其中:

  1. 我"关闭"当前蜡烛并存储最后10秒的平均价格。平均值为(最大-最小)/2
  2. 我"开始"新蜡烛并存储最后价格
  3. 我清理"过时"的蜡烛

每个刻度:

  1. 我更新了当前"成型"蜡烛的"最后"价格,并重新计算SMA

所以在任何时候我都需要"重新计算"SMA。在大多数情况下,只更改最后一支蜡烛的价格(因为我们使用最后的价格)。每10秒我就需要多一点额外的工作——我需要"忘记"过时蜡烛的平均值,并"存储"刚刚创建"的蜡烛的平均数。

你能建议如何以最低的延迟实现这一点吗?低延迟是主要要求。

我不确定这是否是您想要的方法,但这里是非常快速SMA的伪代码。

简单移动平均线:

我假设您的数据以某种流的形式出现,并存储在连续的内存位置(至少具有连续可映射的地址)中

x[1] to x[2000] contain the first 2000 data points
// they don't have to be a real array, could just be a function which manages
// a 'circular' array and maps the 'locations' to real locations in memory.
// Either case, it's small enough to be fully in the cache hopefully
//
// Subsequent prices will be accessible in 'locations x[2001], etc.
// Here we are looking to calculate the MA over the latest 2000 ticks
MA2000(1,2000) = (x[1] + x[2] + ... + x[2000]) / 2000 // Usual average
                                                      // Done only once
MA2000(2,2001) = MA2000(1,2000) * 2000 + x[2001] - x[1]
MA2000(2,2001) /= 2000

这样,通过两次加法和一次乘法(1/2000),您可以为新的刻度生成后续的移动平均值。

指数移动平均线:如上所述,这是一个不错的选择:

// For an N-day EMA
Alpha = 2 / (N+1)      // one time calculation
EMA(new) = Alpha * NewTickPrice + (1-Alpha) * EMA(old)

这并不是一个真正的N日移动平均线。这只是一个加权移动平均线,与最后N天的权重约为87%,所以几乎N天更像它。

编译器优化注意事项:

请注意,如果可用,打开SSE或AVX选项将使这些算法大幅加速,因为在一个CPU周期内可以大量进行多个计算。

因此,您需要一个几乎固定大小的队列,在那里您可以有效地添加新项目并删除最旧的项目(将其从运行总数中删除)。为什么不std::queue

这可以放在各种容器的顶部,但如果您真的只有20个元素,我怀疑vector会表现得很好。(删除一个项目需要将所有其他项目下移一个,但移动连续的内存块很快。)不过,您可能需要将性能与deque或list进行比较。

(答案可能取决于你为每个"蜡烛"存储的内容——只是一个浮点/双精度/int,还是一个更复杂的结构?)

我的实现。.h:

#pragma once
#include <deque>
class MovingAverage
{
public:
    MovingAverage(int length);
    ~MovingAverage(void);
    void Add(double val);
    void Modify(double value);
    double Value;
    std::deque<double> _candlesExceptNewest;
private:
    MovingAverage(MovingAverage& rhs):
        _length(rhs._length)
    {
        printf("MovingAverage copy-constructor mustn't be executed, exiting.");
        exit(0);
    }
    const int _length;
    int addCounter;
    static const int RECALCULATE_VALUE_MASK;
    double _sumExceptNewest;
    double NewestCandleMedian() {
        return (_newestCandleMin + _newestCandleMax) / 2;
    }
    void RecalculateValue();
    double _newestCandleMin;
    double _newestCandleMax;
};

.cpp:

#include "MovingAverage.h"
#include "CommonsNative.h"
const int MovingAverage::RECALCULATE_VALUE_MASK = 1024 - 1;
MovingAverage::MovingAverage(int length):
    Value(quiet_NaN),
    _length(length),
    addCounter(0)
{
    if (length < 20) {
        std::cout << "Warning, MA is very short, less than 20! length = " 
            << length << std::endl;
    }
}
MovingAverage::~MovingAverage(void)
{
}
void MovingAverage::Add(double val)
{
    ++addCounter;
    if (addCounter & RECALCULATE_VALUE_MASK) {
        _sumExceptNewest = 0;
        for (double val : _candlesExceptNewest)
        {
            _sumExceptNewest += val;
        }
    }
    auto curQueueSize = _candlesExceptNewest.size();
    if (curQueueSize == 0) {
        _newestCandleMax = _newestCandleMin = val;
    }
    _sumExceptNewest += NewestCandleMedian();
    _candlesExceptNewest.push_back(NewestCandleMedian());
    if (curQueueSize == _length) {
        _sumExceptNewest -= _candlesExceptNewest.front();
        _candlesExceptNewest.pop_front();
    }
    _newestCandleMax = _newestCandleMin = val;
    RecalculateValue();
}
void MovingAverage::RecalculateValue()
{
    Value = (_sumExceptNewest + NewestCandleMedian())/(_candlesExceptNewest.size() + 1);
}
void MovingAverage::Modify(double val)
{
    if (_candlesExceptNewest.size() == 0) {
        Add(val);
    } else {
        if (val > _newestCandleMax) {
            _newestCandleMax = val;
        } 
        if (val < _newestCandleMin) {
            _newestCandleMin = val;
        }
    }
}