rxcpp - 为什么当可观察量发出值时,不调用所有观察者的on_next函数

rxcpp - why don't all observers' on_next function get called when an observable emits a value

本文关键字:调用 观察者 函数 next on 为什么 观察 rxcpp      更新时间:2023-10-16

我试图理解如何使用rxcpp,我的印象是,当一个可观察对象发出一个值时,所有订阅的观察者将通过调用他们的on_next()方法得到通知,传递他们发出的值。

下面的例子不是这样的:

auto eventloop = rxcpp::observe_on_event_loop();
printf("Start taskn");
auto values = rxcpp::observable<>::interval(std::chrono::seconds(2)).map(
        [](int i){
            printf("Observable sending: %dn", i);
            return i;
        }
);
values.
    subscribe_on(eventloop).
    take(2).
    as_blocking().
    subscribe(
        [](int v){printf("#1 onNext: %dn", v);},
        [](){printf("#1 onCompletedn");});
values.
    subscribe_on(eventloop).
    take(2).
    as_blocking().
    subscribe(
        [](int v){printf("#2 onNext: %dn", v);},
        [](){printf("#2 onCompletedn");});
printf("Finish taskn");

我期望输出是这样的:

Start task
Observable sending: 1
#1 onNext: 1
#2 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
#2 onNext: 2
#2 onCompleted
Finish task

。当新值到达时,在所有订阅的观察者上调用On_next。

相反,输出实际上是:
Start task
Observable sending: 1
#1 onNext: 1
Observable sending: 2
#1 onNext: 2
#1 onCompleted
Observable sending: 1
#2 onNext: 1
Observable sending: 2
#2 onNext: 2
#2 onCompleted
Finish task

这是经典的热vs冷行为。

一个热的可观察对象将做你所期望的。Interval是一个冷可观察对象,因此每个订阅都会产生一组独立的值。

publish操作符将接受单个冷可观察对象,并将其作为热可观察对象共享。

在本例中是。

auto sharedvalues = values.publish().ref_count();

则在订阅表达式中使用sharedvalues而不是values

对热观测和冷观测的搜索将发现这个主题的广泛讨论。