您的位置:首页 >MongoDB 变更流暂停与恢复实战教程
发布于2026-03-01 阅读(0)
扫一扫,手机访问

本文详解如何在 Spring Reactive MongoDB 应用中动态停止并安全恢复变更流,利用 Disposable 控制订阅生命周期,并结合 Resume Token 实现断点续传,适用于数据库维护等场景。
本文详解如何在 Spring Reactive MongoDB 应用中动态停止并安全恢复变更流,利用 Disposable 控制订阅生命周期,并结合 Resume Token 实现断点续传,适用于数据库维护等场景。
在响应式 MongoDB 开发中,变更流(Change Stream)是监听集合数据实时变更的核心机制。但生产环境中常需临时中断流(例如执行索引重建、批量迁移或备份),之后从中断位置精准续订,而非丢失事件或重放全量历史。Spring Data MongoDB 的 ReactiveMongoTemplate.changeStream(...) 返回的是 Flux<T>,其本质是冷流(cold stream),每次订阅都会新建一次服务端游标——因此“暂停”不能靠阻塞线程实现,而必须通过取消订阅 + 保存/复用 Resume Token 完成。
Flux.subscribe() 返回 Disposable,调用 .dispose() 即可立即终止当前流订阅,释放客户端资源(注意:它不会自动通知 MongoDB 服务端关闭游标,但后续无心跳将由服务端超时清理,符合预期):
// 启动变更流并持有 Disposable 引用
private volatile Disposable currentSubscription;
private volatile BsonValue lastResumeToken;
public void startWatching() {
currentSubscription = reactiveMongoTemplate
.changeStream("collection",
ChangeStreamOptions.builder()
.returnFullDocumentOnUpdate()
.build(),
Example.class)
.filter(event -> event.getOperationType() != null)
.doOnNext(event -> lastResumeToken = event.getResumeToken()) // 关键:持续更新 token
.mapNotNull(ChangeStreamEvent::getBody)
.subscribe(
example -> exampleService.doSomething(example),
error -> log.error("Change stream error", error),
() -> log.info("Change stream completed")
);
}
public void stopWatching() {
if (currentSubscription != null && !currentSubscription.isDisposed()) {
currentSubscription.dispose();
log.info("Change stream stopped. Last resume token: {}", lastResumeToken);
}
}MongoDB 要求恢复时传入 resumeAfter(非 startAfter),且该 token 必须来自同一流上下文(即同一集合、相同聚合管道)。恢复代码示例如下:
public void resumeWatching() {
if (lastResumeToken == null) {
log.warn("No valid resume token available; starting from latest");
startWatching(); // 退化为新流
return;
}
// 构建带 resumeAfter 的选项
ChangeStreamOptions options = ChangeStreamOptions.builder()
.returnFullDocumentOnUpdate()
.resumeAfter(lastResumeToken) // ⚠️ 核心参数
.build();
currentSubscription = reactiveMongoTemplate
.changeStream("collection", options, Example.class)
.filter(event -> event.getOperationType() != null)
.doOnNext(event -> lastResumeToken = event.getResumeToken())
.mapNotNull(ChangeStreamEvent::getBody)
.subscribe(
example -> exampleService.doSomething(example),
error -> {
log.error("Resume failed, falling back to new stream", error);
// 可选:自动降级为新流(如 token 过期)
startWatching();
}
);
}变更流的“暂停-恢复”不是客户端流控,而是服务端游标生命周期管理。核心路径为:
① 订阅时持续提取 event.getResumeToken() 并持久化(如内存缓存或 Redis);
② 停止时调用 Disposable.dispose();
③ 恢复时构造 ChangeStreamOptions.resumeAfter(token) 重建流。
只要 token 有效,即可实现毫秒级断点续传,完美支撑运维灰度与弹性扩缩容场景。
售后无忧
立即购买>office旗舰店
售后无忧
立即购买>office旗舰店
售后无忧
立即购买>office旗舰店
售后无忧
立即购买>office旗舰店
正版软件
正版软件
正版软件
正版软件
正版软件
1
2
3
7
9