前言

这是一份面向 .NET 工程师的 Rx.NET 学习与实践指南:从 IObservable/IObserver 基础,到 Subject 类型、冷/热源、多订阅控制,再到调度器与线程模型、时间类操作符、错误处理与测试,最后给出落地的最佳实践清单。适合希望系统性掌握响应式编程并在实际项目中安全使用 Rx 的读者。

你可以在Rxxxxxxx中找到一些代码示例

对于调度器一章节,为了更好理解,可以查看这个仓库:VaniRx

Part1 - Why Rx

.NET事件的问题

讽刺的是,如果没有event关键字,c#的对事件的处理会更好

.NET event关键字基本问题是: 它们在 .NET 类型系统中得到了特殊处理, 使得没法像对象一样操作事件,比如存储在字段中,作为参数传递,不能使用LINQ,不能拓展等等.

event唯一的优势在于: += & -=

IObservable

IEnumerable<T> vs. IObservable<T>

  • IEnumerable<T> 让代码(IEnumerator)可以主动获取值(通常通过 foreach 循环),
  • IObservable<T> 则在值可用时主动推送给代码(IObserver)。

这种区别通常被称为 拉取(pull)与推送(push)

  • 我们通过执行 foreach 循环从 IEnumerable<T> 中“拉取”值,
  • IObservable<T> 会把值“推送”(通过Subscribe)到我们的代码中。

接口定义

public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}

这个接口唯一的方法很清楚地表明:

  • 如果我们想要接收事件,就必须 订阅 它。
  • 我们也可以 取消订阅Subscribe 方法返回一个 IDisposable,调用它的 Dispose 方法就能取消订阅。

IObserver

接口定义

public interface IObserver<in T>
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted();
}

Subject

同时实现IObserver<T>IObservable<T>

Rx与外界的桥梁

大多数情况下,并不推荐使用 Subjects

Subject<T>

立即将对其 IObserver<T> 方法的调用转发给当前所有订阅它的观察者

有新的订阅者加入,它们只能看到订阅之后发生的事件

ReplaySubject

有新的订阅者加入,它们会收到迄今为止的所有历史事件

可以限制内存消耗,防止事件太多干爆内存

BehaviorSubject<T>

只记住一个值

AsyncSubject<T>

把它接收到的最后一个值提供给所有观察者

在调用 OnCompletedOnError 之前,根本不会给任何观察者发出通知

如果调用了 OnError,它只会把错误转发给所有当前和未来的订阅者

如果在调用 OnCompleted 之前从未调用过 OnNext,那么它没有最终值,只会完成所有观察者,而不会提供值

Hot & Cold Source

  • hot source 只提供订阅后的信息,比如鼠标移动

  • cold source 不管何时订阅,提供相同信息,比如IEnumerable<T>.ToObservable()

  • source 不是非冷即热,而是有温度

特殊情况: Cold-then-Hot

Windows 消息队列:如果你点击或输入时程序没反应,消息会先被排队,稍后再处理

可以看作是 冷-然后-热

  • 像冷源一样,你不会因为订阅得晚而错过过去的事件,
  • 但一旦开始消费数据,就无法回到最开始。

多订阅问题与 IConnectableObservable<T>

这种 冷-然后-热 源在多订阅场景下会出问题:

  • 第一个订阅者可能会独占所有缓冲区里的旧事件,
  • 后续订阅者则会错过。

解决办法:

  • 在发送事件流前挂好所有订阅者
public interface IConnectableObservable<out T> : IObservable<T>
{
IDisposable Connect();//调用Connect 后, 事件才会开始发送
}

Rx 序列的基本规则

1. OnNext的内建背压机制

信息源必须等待观察者OnNext()方法结束后才能再次调用OnNext()方法

2. 订阅的生命周期

一般情况下, .NET API 如果返回了一个实现了 IDisposable 的对象, 你却不调用 Dispose(), 那通常是一个错误.
Rx 的订阅是个例外: 只有当你希望提前停止时, 才需要主动调用 Dispose()

Rx退订时的规则:

一旦 Dispose() 调用返回,信息源将不再对其观察者调用任何方法

不过,在 Dispose() 尚未返回的那段时间内,信息源可能还会继续发事件.比如多线程情况下.

取消订阅可能很慢甚至没用

Dispose 不会等待关闭完成,它只是发出取消请求就立即返回。

比如某源创建了线程, 这个线程需要时间来关闭或者不会关闭.

不管是源主动结束, 还是手动退订, 整个链路(包括中间的 operator)都会被关闭.

创建Rx序列

Observable.Create

  • Rx的包装方法,用于让用户不需要关心并发模型,订阅释放等复杂问题

    使用Observable.CreateSubscribe方法返回时, Create内可能还没有运行.
    这时订阅者取消, Create会保证不调用OnNext等规则

  • Create方法是延迟执行的. 只有真正订阅时, 委托才会被调用.

  • 多次订阅会多次执行委托

Observable.Defer

​#TODO#: 暂时不明白:
Create本身也有延迟调用机制,为什么要使用Defer?
这两个之间的区别是什么?

Observable.Generate

为什么使用

可以更方便处理订阅取消

// 不是最佳写法!
IObservable<int> Range(int start, int count) =>
Observable.Create<int>(observer =>
{
for (int i = 0; i < count; ++i)
{
observer.OnNext(start + i);
}

return Disposable.Empty;
});

使用Create创建一个无线序列. 这里没有处理订阅取消,虽然由于Create方法不会出现错误.但是其会在后台一直生成新的数字浪费CPU时间片.

Part2 - 处理数据流

这一章节和Linq基本重合

过滤器

IgnoreElements()

总返回false的Where

OfType()

由于Where()的返回类型与输入类型相同.

当需要过滤类型时,就需要使用OfType()

元素位置过滤

FirstAsync()

返回源序列第一个值,如果没有值就complete了就返回error

如果想要返回一个默认值使用FirstOrDefaultAsync(但是可能不知道返回的null是第一个值还是结束了返回的默认值)

Take() TakeLast()

LastAsync() LastOrDefaultAsync()

Skip() SkipLast()

由于在OnComplete()前不知道是不是最后一个,导致从接收到最后一个元素到将其转发给订阅者之间可能会有显著延迟.

SingleAsync() SingleOrDefaultAsync()

要求源有且只有一个元素,否则OnError()

时间过滤

SkipWhile() TakeWhile()

SkipUntil() TakeUntil()

Distinct() DistinctUntilChanged()

Distinct 是另一个标准的 LINQ 运算符。它可以从序列中移除重复项。

这个用来检测状态什么的,牛逼的.

uint exampleMmsi = 235009890;
IObservable<IAisMessageType1to3> statusChanges = receiverHost.Messages
.Where(v => v.Mmsi == exampleMmsi)
.OfType<IAisMessageType1to3>()
.DistinctUntilChanged(m => m.NavigationStatus)
.Skip(1);

TIPS:

  • Where() 总是会把源的终止通知(OnComplete()OnError())原样传递下去
  • 大多数 Rx 运算符本身既不是热的,也不是冷的,它们依赖于其源

序列处理

方法 输入序列 输出序列 特点 典型用途
Select IEnumerable IEnumerable 一对一映射,投影每个元素 类型转换、计算新值
SelectMany IEnumerable IEnumerable 一对多映射,扁平化集合 展平嵌套集合
Cast IEnumerable IEnumerable 类型转换 将非泛型集合转换为泛型序列
var data = new List<string[]> { new[] {"a","b"}, new[] {"c"} };

// Select
var select = data.Select(x => x);
// select = { {"a","b"}, {"c"} } // 二维

// SelectMany
var selectMany = data.SelectMany(x => x);
// selectMany = { "a", "b", "c" } // 一维

// Cast
ArrayList list = new ArrayList() { 1, 2, 3 };
var cast = list.Cast<int>();
// cast = { 1, 2, 3 } 转为 IEnumerable<int>

聚合

方法 输出 何时发射结果 典型用途
Aggregate 单个最终值 序列完成后 汇总、最终统计
Scan 累积结果序列 每接收一个元素就发射 实时累积、动态指标

分区

操作 功能描述 输出类型 典型用途
GroupBy 根据指定键将元素分组,每个组生成独立子流 IObservable<IGroupedObservable<TKey, TElement>> 分类数据、分组处理(如按用户、传感器等)
Buffer 将元素按固定大小或时间间隔收集成列表 IObservable<IList<T>> 批量处理数据、时间窗口统计
var dataStream = new[]
{
("A", 1), ("B", 2), ("A", 3), ("B", 4), ("A", 5), ("B", 6)
}.ToObservable();

Console.WriteLine("=== GroupBy 示例 ===");
// 按类别分组
var grouped = dataStream.GroupBy(d => d.Item1);
grouped.Subscribe(group =>
{
Console.WriteLine($"Group {group.Key}:");
group.Subscribe(d => Console.WriteLine($" Value: {d.Item2}"));
});

Console.WriteLine("\n=== Buffer 示例 ===");
// 每 2 个元素收集成一个批次
var buffered = dataStream.Buffer(2);
buffered.Subscribe(batch =>
{
Console.WriteLine("Batch: " + string.Join(", ", batch.Select(b => b.Item2)));
});

=== GroupBy 示例 ===
Group A:
Value: 1
Value: 3
Value: 5
Group B:
Value: 2
Value: 4
Value: 6

=== Buffer 示例 ===
Batch: 1, 2
Batch: 3, 4
Batch: 5, 6

合并

操作 功能描述 简单示例代码
Concat 先输出第一个序列,再输出第二个序列 Observable.Range(1,2).Concat(Observable.Range(3,2)).Subscribe(Console.WriteLine);
Merge 将多个序列合并输出,元素交替发射 Observable.Range(1,2).Merge(Observable.Range(3,2)).Subscribe(Console.WriteLine);
Zip 按顺序配对多个序列的元素,生成元组或投影结果 Observable.Range(1,3).Zip(Observable.Range(10,3), (a,b) => a+b).Subscribe(Console.WriteLine);
CombineLatest 任意序列发射新元素时,组合所有序列的最新元素 Observable.Interval(TimeSpan.FromSeconds(1)).CombineLatest(Observable.Interval(TimeSpan.FromSeconds(2)), (x,y)=>x+y).Subscribe(Console.WriteLine);

Part3 - 开始务实

调度与线程

因为规定:如果源调用了 OnNext,它必须等待该调用返回后,才能再次调用 OnNextOnError/OnCompleted.所以即使每次调用可能来自不同的线程,这些调用在单个订阅上仍然是严格顺序

人话:
就算每次被调用的上下文不一样.
也可以保证调用是一个一个来的而不是同时来n个

大部分Rx 操作符没有固定线程,它们会在调用到来的线程上执行任务

source
.Where(x => x.MessageType == 3)
.Buffer(10)
.Take(20)
.Subscribe(x => Console.WriteLine(x));

调用栈(在同一线程):
source 调用:
Where 观察者调用:
Buffer 观察者调用:
Take 观察者调用:
Subscribe 观察者调用 lambda

但是使用了调度器的操作符如 Delay()不会在调用到来的线程上执行

调度器

调度器主要有三个职责:

  1. 决定执行工作的上下文(例如,在哪个线程上执行)
  2. 决定何时执行工作(例如立即执行,或延迟执行)
  3. 跟踪时间

接口定义

public interface IScheduler
{
DateTimeOffset Now { get; } // 当前调度器的“当前时间”

IDisposable Schedule<TState>( // 立即执行一个任务
TState state,
Func<IScheduler, TState, IDisposable> action);

IDisposable Schedule<TState>( // 延迟dueTime后执行任务
TState state,
TimeSpan dueTime,
Func<IScheduler, TState, IDisposable> action);

IDisposable Schedule<TState>( // 在绝对时刻执行任务
TState state,
DateTimeOffset dueTime,
Func<IScheduler, TState, IDisposable> action);
}

ImmediateScheduler

  • 直接处理调度,没有其他机制
    所以当调用接受 TimeSpanSchedule 重载方法时,ImmediateScheduler 会直接Sleep()
  • 使用调用线程进行工作

CurrentThreadScheduler

  • 升级之处在于: 如何处理新的调度请求
    调度时产生一个新的task,当前线程有空再处理
  • 使用调用线程进行工作
Observable
.Range(1, 5)
.SelectMany(i => Observable.Range(i * 10, 5))
.Subscribe(
m => Console.WriteLine($"Received {m} on thread: {Environment.CurrentManagedThreadId}"));

// 这里输出顺序不严格

EventLoopScheduler

  • CurrentThreadScheduler不同之处在于: 在自己的独立线程上进行工作

DefaultScheduler

  • 所有基于时间的操作符使用的调度器
  • CLR的线程池中进行工作

NewThreadScheduler

  • 每次调度都会创建一个新线程

  • 希望执行一些长时间运行的工作时比较有效
    因为CLR的线程池是短执行时间优化的

  • 每次订阅都会新建一个线程(比如 Thread 8)。

  • 同一个订阅中的所有元素都在那个线程上执行

    Observable
    .Range(1, 5, NewThreadScheduler.Default)
    .Subscribe(static x =>
    Console.WriteLine($"OnNext {x} (Thread {Environment.CurrentManagedThreadId})")
    );

    Observable
    .Range(1, 5, NewThreadScheduler.Default)
    .Subscribe(static x =>
    Console.WriteLine($"OnNext {x} (Thread {Environment.CurrentManagedThreadId})")
    );

    Observable
    .Range(1, 5, NewThreadScheduler.Default)
    .Subscribe(static x =>
    Console.WriteLine($"OnNext {x} (Thread {Environment.CurrentManagedThreadId})")
    );
    //输出:
    OnNext 1 (Thread 11)
    OnNext 2 (Thread 11)
    OnNext 3 (Thread 11)
    OnNext 4 (Thread 11)
    OnNext 5 (Thread 11)
    OnNext 1 (Thread 12)
    OnNext 2 (Thread 12)
    OnNext 3 (Thread 12)
    OnNext 4 (Thread 12)
    OnNext 5 (Thread 12)
    OnNext 1 (Thread 13)
    OnNext 2 (Thread 13)
    OnNext 3 (Thread 13)
    OnNext 4 (Thread 13)
    OnNext 5 (Thread 13)

TaskPoolScheduler

  • 通过Task 线程池来执行工作
  • CLR线程池是为了旧代码兼容,尝试用 TaskPoolScheduler 替代,特别是在有大量线程池工作时,可能会带来性能收益。

TODO: CLR线程池和TPL线程池


Scheduler 线程来源 / 执行上下文 特点 / 使用场景 备注
ImmediateScheduler 调用线程 - 调度时直接执行,没有额外机制- Schedule(TimeSpan) 会导致线程 Sleep() 非常原始,几乎不用
CurrentThreadScheduler 调用线程(但任务排队,延后执行) - 调度请求会排队,等当前线程空闲再处理- 防止递归调度导致栈溢出 输出顺序可能不严格
EventLoopScheduler 独立专用线程 - 所有任务都在它的私有线程上串行执行- 与 CurrentThread 的区别是线程固定独立 适合希望隔离工作、不影响调用线程的场景
DefaultScheduler CLR 线程池 - 所有基于时间的操作符默认使用- 在线程池中并行执行 线程池针对短任务优化
NewThreadScheduler 每次订阅新建一个线程 - 每次订阅都会新开线程(线程 8、11、12…)- 同一订阅中的元素都在同一个线程上 适合 ​长时间运行任务,避免占用线程池
TaskPoolScheduler TPL Task 线程池(基于 Task 的实现) - 使用 Task 并行库的线程池- 对大量线程池任务性能可能更好 推荐用它替代 DefaultScheduler(旧 CLR 线程池)

SubscribeOn vs. ObserveOn

建议直接看这个项目

Console.WriteLine($"[T:{Environment.CurrentManagedThreadId}] Main thread");

Observable
.Interval(TimeSpan.FromSeconds(1))
.SubscribeOn(new EventLoopScheduler((start) =>
{
Thread t = new(start) { IsBackground = false };
Console.WriteLine($"[T:{t.ManagedThreadId}] Created thread for EventLoopScheduler");
return t;
}))
.Subscribe(tick =>
Console.WriteLine(
$"[T:{Environment.CurrentManagedThreadId}] {DateTime.Now}: Tick {tick}"));

Console.WriteLine($"[T:{Environment.CurrentManagedThreadId}] {DateTime.Now}: Main thread exiting");
输出示例:
[T:1] Main thread
[T:12] Created thread for EventLoopScheduler
[T:1] 21/07/2023 14:57:21: Main thread exiting
[T:6] 21/07/2023 14:57:22: Tick 0
[T:6] 21/07/2023 14:57:23: Tick 1
[T:6] 21/07/2023 14:57:24: Tick 2
···

原因:

  • 订阅事件使用EventLoopScheduler,在其创建的线程上执行订阅.
  • EventLoopScheduler调度Interval调度器.
  • Interval使用默认调度器,在线程池中执行调用,所以Tick 在6
  • 所以如果指定Interval使用ImmediateScheduler的话,Interval就会在EventLoopScheduler的线程上执行.

在发射元素时,Rx 提供的大多数数据源可以分为三类:

  1. 响应上游数据源输入的操作符(如 WhereSelectGroupBy),通常在自身 OnNext 中调用观察者方法。它们调用观察者的上下文与数据源调用 OnNext上下文相同
  2. 迭代或基于时间生成元素的操作符,会使用调度器(显式提供的或默认调度器)。
  3. 任意上下文生成元素的源,例如异步方法中使用 await 且指定 ConfigureAwait(false),在 await 完成后可能在任意线程上调用 OnNext

避免失去对OnNext执行上下文的掌控 - ObserveOn

比如Unity,你绝对不希望你包含Unity API的函数在非主线程被调用,因为log都没有

Observable
.Interval(TimeSpan.FromSeconds(1))
.SelectMany(tick => Observable.Return(tick, NewThreadScheduler.Default))
.Subscribe(tick =>
Console.WriteLine($"{DateTime.Now}-{Environment.CurrentManagedThreadId}: Tick {tick}"));
OUTPUT:
Main thread: 1
21/07/2023 15:19:56-12: Tick 0
21/07/2023 15:19:57-13: Tick 1
21/07/2023 15:19:58-14: Tick 2
21/07/2023 15:19:59-15: Tick 3
...

Observable
.Interval(TimeSpan.FromSeconds(1))
.SelectMany(tick => Observable.Return(tick, NewThreadScheduler.Default))
.ObserveOn(new EventLoopScheduler())
.Subscribe(tick =>
Console.WriteLine($"{DateTime.Now}-{Environment.CurrentManagedThreadId}: Tick {tick}"));
OUTPUT:
Main thread: 1
21/07/2023 15:24:23-13: Tick 0
21/07/2023 15:24:24-13: Tick 1
21/07/2023 15:24:25-13: Tick 2
21/07/2023 15:24:26-13: Tick 3
...

结合SubscribeOn ObserveOn

  • 你可以用 SubscribeOn 确保繁重工作不在 主 线程上完成,
  • 然后用 ObserveOn 确保通知回到正确的线程上。

调度器高级功能

如果你需要编写一个 自己决定何时产生元素的 observable 源,那你可能就需要使用这些高级功能。

TState

调度器原封不动传给回调函数,以给回调函数提供上下文.

虽然更简单的方式是lambda捕获变量,但Rx基本不这么做

Tstate 参数的设计用途:提供每个工作项所需的状态,从而避免在每次迭代中都捕获变量,减少分配开销。

TIPS: **** 就算不需要传入state,也请传入this而不是null,防止隐式调用this

取消调度

对齐订阅的取消.以Interval为例的无限循环调度.如果没人订阅了,那Interval就该取消调度了.

基于时间的序列

时间在 Rx 中始终只是尽力而为(best effort)需要更加严格准确的时间点需要更高级的操作

Timestamp & TimeInterval

  • Timestamped 给元素一个到达时间
  • TimeInterval 给元素附加一个 TimeSpan(即与上一个元素(第一个元素是和订阅的时间间隔)的时间间隔)
‍Observable.Interval(TimeSpan.FromSeconds(1))
.Take(3)
.TimeInterval()
.Dump("TimeInterval");

Delay

值得注意的是, 被延迟的不是订阅本身, 而是转发到最终订阅者的过程

IObservable<Timestamped<long>> source = Observable
.Interval(TimeSpan.FromSeconds(1))
.Take(5)
.Timestamp();

IObservable<Timestamped<long>> delay = source.Delay(TimeSpan.FromSeconds(2));

delay.Subscribe(value =>
Console.WriteLine(
$"Item {value.Value} with timestamp {value.Timestamp} received at {DateTimeOffset.Now}"),
() => Console.WriteLine("delay Completed"));

所以以上代码输出会发现时间戳与订阅接受时间相差2s

Item 0 with timestamp 09/11/2023 17:32:20 +00:00 received at 09/11/2023 17:32:22 +00:00
Item 1 with timestamp 09/11/2023 17:32:21 +00:00 received at 09/11/2023 17:32:23 +00:00
Item 2 with timestamp 09/11/2023 17:32:22 +00:00 received at 09/11/2023 17:32:24 +00:00
Item 3 with timestamp 09/11/2023 17:32:23 +00:00 received at 09/11/2023 17:32:25 +00:00
Item 4 with timestamp 09/11/2023 17:32:24 +00:00 received at 09/11/2023 17:32:26 +00:00
delay Completed

Sample(采样)

Sample 方法 会按照指定的时间间隔输出值.每次输出时,它都会报告源序列中最后产生的那个值.

  • Sample 并不会做任何插值,它只会返回源中最后一个产生的值。
  • 如果采样间隔比源产生值的间隔还短,Sample 会重复返回同一个值。

Throttle(节流)

运算符 行为
Throttle 保留安静期后的最后一个值(防抖的效果)
Sample 每隔固定时间取最近一个值(不管有没有事件)

Timeout(超时)

提供一个 TimeSpan,且在该时间段内没有任何值产生,则序列会失败并抛出 TimeoutException

Timeout 还支持在超时时返回备用序列

// 如果超过 dueTime,则切换到 other 序列
public static IObservable<TSource> Timeout<TSource>(
this IObservable<TSource> source,
TimeSpan dueTime,
IObservable<TSource> other)
{...}

public static IObservable<TSource> Timeout<TSource>(
this IObservable<TSource> source,
TimeSpan dueTime,
IObservable<TSource> other,
IScheduler scheduler)
{...}

public static IObservable<TSource> Timeout<TSource>(
this IObservable<TSource> source,
DateTimeOffset dueTime,
IObservable<TSource> other)
{...}

public static IObservable<TSource> Timeout<TSource>(
this IObservable<TSource> source,
DateTimeOffset dueTime,
IObservable<TSource> other,
IScheduler scheduler)
{...}

集成Rx

通常最好将所有 Rx 逻辑集中处理,这样与外部世界的交互只需两次:一次输入,一次输出。

async & await

  • 可以对任何 IObservable<T> 使用 C# 的 await 关键字

ForEachAsync

ToEnumerable

  • 源可观察序列将在你开始枚举序列时订阅(即延迟订阅)
  • foreach 调用枚举器的 MoveNext 会阻塞,直到源产生一个元素
  • 如果源报告错误,该错误会被抛出

这意味着在使用 ToEnumerable 时,技术上有可能在同一个线程上同时消费和生成项目,但这依赖于生产者始终领先。这是一种危险的方法,因为如果 foreach 循环赶上了生产者,就会发生死锁

转为单一集合

  • 等调用了OnComplete后才会转换为单一集合

  • 如果源在生成值后出现错误,你将无法接收到任何这些值

APIs:

  • ToArray
  • ToList
  • ToDictionary
  • ToLookup

ToTask

  • 当任务完成时,该任务的结果就是序列的最终输出

  • 如果源序列完成但未产生任何元素,则任务将进入 faulted 状态,并抛出 InvalidOperationException

  • 如果在 observable 序列完成前传入一个取消令牌,它会取消对源的订阅,并将任务置为已取消状态

  • 源序列调用 OnError,Rx 会使用提供的异常将任务置于 faulted 状态

ToEvent

最简单的方式

var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5); 
var result = source.ToEvent();
result.OnNext += val => Console.WriteLine(val);

ToEvent 方法返回 IEventSource<T>,它只有一个成员:OnNext 事件。

public interface IEventSource<T> 
{
event Action<T> OnNext;
}

转换成.Net事件

  • ToEventPattern

为什么应该直接使用Rx

事件有以下局限性:

  • 难以组合
  • 不能作为参数传递或存储在字段中
  • 难以随时间轻松查询
  • 没有标准的错误报告模式
  • 没有标准的序列结束指示模式
  • 对并发或多线程应用几乎没有帮助

Do - 注入副作用

IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3);
IObservable<long> loggedSource = source.Do(
i => Log(i),
ex => Log(ex),
() => Log());

loggedSource.Subscribe(
Console.WriteLine,
() => Console.WriteLine("completed"));

AsObservable - 封装

public class UltraLeakyLetterRepo
{
public ReplaySubject<string> Letters { get; }

public UltraLeakyLetterRepo()
{
Letters = new ReplaySubject<string>();
Letters.OnNext("A");
Letters.OnNext("B");
Letters.OnNext("C");
}
}

这段代码最大的问题: 使用者可以调用 OnNext/OnError/OnCompleted

你可以更改为:

public class ObscuredLeakinessLetterRepo
{
public IObservable<string> Letters { get; }

public ObscuredLeakinessLetterRepo()
{
var letters = new ReplaySubject<string>();
letters.OnNext("A");
letters.OnNext("B");
letters.OnNext("C");
this.Letters = letters;
}
}

但是这里Letters的实际类型仍然是ReplaySubject

导致你可以使用以下代码捣乱

var repo = new ObscuredLeakinessLetterRepo();
IObservable<string> good = repo.GetLetters();

good.Subscribe(Console.WriteLine);

// 调皮行为
if (good is ISubject<string> evil)
{
// 调皮,"1" 不是字母!
evil.OnNext("1");
}
else
{
Console.WriteLine("could not sabotage");
}

因此最好的方式应该是:

this.Letters = letters.AsObservable();

错误处理

Cathc()

using System.Reactive.Linq;

// 模拟一个可能抛出异常的 Observable
var source = Observable.Create<int>(observer =>
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnError(new Exception("发生错误!"));
observer.OnNext(4); // 不会被执行
return System.Reactive.Disposables.Disposable.Empty;
});

// 使用 Catch 捕获异常
var handled = source.Catch<int, Exception>(ex =>
{
Console.WriteLine($"捕获异常: {ex.Message}");
// 提供备用 Observable
return Observable.Return(999);
});

handled.Subscribe(
x => Console.WriteLine($"接收到: {x}"),
ex => Console.WriteLine($"流终止: {ex.Message}"),
() => Console.WriteLine("流完成")
);

var first = Observable.Throw<int>(new Exception("第一个流出错"));
var second = Observable.Return(42);

var result = first.Catch(second);

result.Subscribe(
x => Console.WriteLine($"接收到: {x}"),
ex => Console.WriteLine($"流终止: {ex.Message}"),
() => Console.WriteLine("流完成")
);

输出:

接收到: 1
接收到: 2
接收到: 3
捕获异常: 发生错误!
接收到: 999
流完成
接收到: 42
流完成

Finally()

var source = Observable.Create<int>(observer =>
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnError(new Exception("出错了!"));
return System.Reactive.Disposables.Disposable.Empty;
});

var handled = source.Finally(() => Console.WriteLine("流结束,执行 Finally 操作!"));

handled.Subscribe(
x => Console.WriteLine($"接收到: {x}"),
ex => Console.WriteLine($"流发生异常: {ex.Message}"),
() => Console.WriteLine("流正常完成")
);

输出

接收到: 1
接收到: 2
流发生异常: 出错了!
流结束,执行 Finally 操作!

Using

class MyResource(string name) : IDisposable
{
private string _name = name;

public void Use(int value) => Console.WriteLine($"{_name} 使用中: {value}");

public void Dispose() => Console.WriteLine($"{_name} 已释放");
}

class Program
{
static void Main()
{
var observable = Observable.Using(
resourceFactory: () => new MyResource("资源A"),
observableFactory: res => Observable.Range(1, 3).Do(x => res.Use(x))
);

observable.Subscribe(
x => Console.WriteLine($"接收到: {x}"),
ex => Console.WriteLine($"异常: {ex.Message}"),
() => Console.WriteLine("流完成")
);
}
}

输出

资源A 使用中: 1
接收到: 1
资源A 使用中: 2
接收到: 2
资源A 使用中: 3
接收到: 3
流完成
资源A 已释放

测试

TestScheduler

TestScheduler定义了允许我们控制和监控虚拟时间的方法

public class TestScheduler : // ...
{
public bool IsEnabled { get; private set; }
public TAbsolute Clock { get; protected set; }
public void Start()
public void Stop()
public void AdvanceTo(long time)
public void AdvanceBy(long time)

...
}

TestScheduler 使用 TimeSpan.Ticks 作为时间单位。如果你想让时间前进 1 秒,可以调用:

scheduler.AdvanceBy(TimeSpan.FromSeconds(1).Ticks);

1 个 tick 对应 100ns,因此 1 秒等于 10,000,000 tick。

  • AdvanceTo(long) 方法将虚拟时间设置为指定的 tick 数量

  • AdvanceBy(long) 方法允许我们将时钟向前移动指定时间量

Part4 - 最佳实践

  1. 返回序列的成员永远不应返回 null
    适用于 IEnumerableIObservable 序列。应返回空序列而非 null。

  2. 仅在需要提前取消订阅时才调用 Dispose

  3. 始终提供 OnError 处理程序

  4. 避免阻塞操作符
    如 ​First, FirstOrDefault, Last, LastOrDefault, Single, SingleOrDefault, ForEach
    可使用非阻塞替代方案,例如 ​FirstAsync

  5. 避免在 IObservable 与 IEnumerable 之间频繁转换

  6. 优先使用延迟求值而非立即求值

  7. 将大型查询拆分成多个部分
    大型查询的关键标志包括:

    • 嵌套查询
    • 查询表达式语法超过 10 行
    • 使用 into 关键字
  8. 为 Observable 命名规范
    避免使用模糊变量名,如 query, q, xs, ys, subject 等。

  9. 避免副作用
    如果无法避免,不要将副作用隐藏在函数式操作符(如 Select 或 ​Where)的回调中。应使用 Do 操作符明确表示副作用。

  10. 尽可能使用 Observable.Create 定义新的 Rx 源
    避免直接使用 Subject,除非确实需要。

  11. 避免自己实现 IObservable 接口
    使用 ​Observable.Create(或必要时使用 Subject)。

  12. 避免自己实现 IObserver 接口
    优先使用 Subscribe 扩展方法的重载。

  13. 应用程序应定义并管理并发模型

  14. 如需调度延迟工作,请使用调度器(Schedulers)

  15. SubscribeOn 和 ObserveOn 应始终紧接 Subscribe 方法
    避免夹在其他操作符中,例如:

    source.SubscribeOn(s).Where(x => x.Foo) // 不推荐

Reference