chore: keep remote dyn filter docs local

This commit is contained in:
discord9
2026-05-18 17:20:20 +08:00
parent 7e505e8e12
commit 930b090061
12 changed files with 0 additions and 1900 deletions

2
.gitignore vendored
View File

@@ -76,5 +76,3 @@ AGENTS.md
# local design docs
docs/specs/
.vs/
.tmp/

View File

@@ -1,38 +0,0 @@
I hereby agree to the terms of the [GreptimeDB CLA](https://github.com/GreptimeTeam/.github/blob/main/CLA.md).
## Refer to a related PR or issue link (optional)
- RFC PR for remote dynamic filter propagation design
## What's changed and what's your intention?
This PR adds a new RFC for **remote dynamic filter propagation**.
The RFC explains why frontend-produced dynamic filters should be propagated to remote datanode scans in distributed queries, and proposes a minimal design that keeps remote dynamic filters as an **optimization only**, never a correctness dependency.
The document covers the main design points:
- end-to-end flow from join-produced alive dynamic filters to remote scan updates
- the identity model based on `query_id + filter_id`
- reusing the existing region unary RPC path for `update / unregister`
- a frontend query-engine runtime registry keyed by `query_id`
- registry lifecycle and cleanup semantics
- failure handling and safe degradation behavior
- alternatives that were considered and rejected
This is a documentation / design PR only. It does **not** implement the feature itself, and it does not change the current API, schema, or runtime behavior.
Current limitations described by the RFC:
- only the current minimal design is covered
- larger build-side membership propagation is left for later work
- some runtime policies, such as handling updates that arrive before scan registration, are still open questions
## PR Checklist
Please convert it to a draft if some of the following conditions are not met.
- [ ] I have written the necessary rustdoc comments.
- [ ] I have added the necessary unit tests and integration tests.
- [x] This PR requires documentation updates.
- [x] API changes are backward compatible.
- [x] Schema or data changes are backward compatible.

View File

@@ -1,315 +0,0 @@
# RFC: Remote Dynamic Filter Propagation (Phase 1)
## 状态
- Status: Draft
- Scope: Phase 1 最小可用版本
- Related:
- `remote-dyn-filter-task-index.md`
- `remote-dyn-filter-task-01-wire-abi.md`
- `remote-dyn-filter-task-02-region-rpc.md`
- `remote-dyn-filter-task-03-frontend-producer.md`
- `remote-dyn-filter-task-04-datanode-apply.md`
- `remote-dyn-filter-task-05-observability-fallback.md`
- `remote-dyn-filter-task-06-validation.md`
- `remote-dyn-filter-task-07-large-build-membership.md`
## 背景
GreptimeDB 现有动态过滤在单节点或本地执行链路中可以减少 probe side 扫描量,但在 distributed query 下join build side 产生的动态过滤不会自动传播到远端 datanode scan。结果是
- frontend 已经知道某个 dyn filter 可以收紧 probe side
- 远端 datanode scan 仍按较宽 predicate 扫描
- 优化收益局限在 frontend 本地,不能转化为分布式 pruning 收益
本 RFC 的目标是把 frontend 产生的 dyn filter 增量传播到 datanode table scan同时保持一个关键约束
> Remote dyn filter 是优化项,不是查询正确性的前提。
任何编码失败、RPC 失败、状态丢失、乱序、清理异常,都只能关闭优化,不能破坏查询结果正确性。
## 目标
Phase 1 目标:
1. 建立 frontend -> datanode 的最小控制面,使 dyn filter update 能跨节点传播。
2. 在 frontend 为 join 产生的 alive dyn filter 建立 query-scoped registry并持续观察其更新。
3. 在 datanode 维护 `query_id + filter_id` 对应的 query-scoped remote filter state并把 update 应用到 scan predicate。
4. 先支持分布式 `IN` filter 语义,为后续 `MIN_MAX` / `BLOOM` 预留协议与实现扩展位。
5. 在异常路径下保证安全降级与可观测。
## 非目标
Phase 1 不做:
- 独立 gRPC service 或 Arrow Flight 控制面
- batched update / streaming update / ack / heartbeat
- Bloom 或大 build-side membership 的最终方案
- 以 remote dyn filter 为前提的查询级阻塞或一致性协议
- 把 routing 元数据混入 filter state identity
## 端到端主链路
Phase 1 的主链路如下:
1. **join 产生 alive dyn filter**
- join 是 dyn filter producer。
- frontend 持有 alive `DynamicFilterPhysicalExpr`,后续 update 由它驱动。
2. **MergeScanExec 建立桥接关系**
- `MergeScanExec` 识别哪些 alive dyn filter 需要传播到哪些 remote subscriber regions/scans。
- `MergeScanExec` 为这些 dyn filter 生成稳定 `filter_id`
- `MergeScanExec` 把 alive dyn filter 注册到 query-scoped registry并建立 `query_id + filter_id -> subscribers` 的映射。
3. **初始 remote read 完成 register / 建链**
- `MergeScanExec::to_stream` 发起 remote read。
- 初始 remote read 负责把“该 query 下会存在这个 remote dyn filter”带到 datanode。
- datanode 在 scan 构建阶段建立本地注册项,并安装 placeholder / remote wrapper。
4. **registry 持续观察 dyn filter 更新**
- frontend registry 通过 `wait_update` 或 generation 变化监听 alive dyn filter。
- 每次有新快照时registry 生成新 epoch并构造 `DynFilterUpdate`
5. **后续 update / unregister 通过 unary RPC 下发**
- frontend registry 使用 Task 02 定义的 unary RPC 下发 `update / unregister`
- datanode 根据 `query_id + filter_id` 找到共享 remote filter state。
- 若 epoch 更新更大则应用;重复/过期则幂等忽略。
6. **query 结束或无人使用时清理**
- frontend `is_used()` 可作为“本 query 内已无人使用”的正常注销信号。
- query finish / cancel 触发 registry 进入 closing并执行 unregister / cleanup tail。
- datanode TTL 和 cancel hook 作为兜底回收,不替代显式 unregister。
## 核心设计决策
### 1. ABI 与 payload
Phase 1 采用 Task 01 已定义的最小 ABI
- `DynFilterUpdate`
- `protocol_version`
- `query_id`
- `filter_id`
- `epoch`
- `is_complete`
- `payload`
- `DynFilterPayload`
- Phase 1 当前只正式落地 `Datafusion(Vec<u8>)`
- 最小可交付以 `InListExpr` 为主
- 不支持的 runtime-only expr`HashTableLookupExpr`)必须静默降级为“不远端发送”
### 2. 控制面通道
Phase 1 复用现有 `greptime.v1.region.Region` unary gRPC
- `RegionRequest.body.remote_dyn_filter`
- `RemoteDynFilterRequest.oneof action`
- `update`
- `unregister`
重要边界:
- 初始 remote read 负责 register / scan 建链
- unary RPC 只负责后续 `update / unregister`
### 3. Identity model
Phase 1 固定:
- 逻辑 identity 为 `query_id + filter_id`
- region / scan 只承担 routing 与注册元数据,不是 filter state identity 的一部分
`filter_id` 只要求在单个 query 内稳定且唯一,不要求跨 query 全局唯一。
当前推荐规则:
- `region_id`
- `producer-local ordinal`
- `canonicalized children fingerprint`
明确不放入 identity
- `partition`
- datanode transport 信息
- 内存地址 / 临时对象 id
### 4. Frontend registry placement
Frontend query-scoped registry 采用 **方案 1**
- **实现位置**`src/query/src/dist_plan/remote_dyn_filter_registry.rs` 或等价邻近模块
- **物理存放**query-engine runtime map
- **管理方式**`query_id -> Arc<RemoteDynFilterRegistry>`
这样做的原因:
- registry 是 query execution runtime object而不是 session metadata
- 它包含 watcher task、cleanup tail、fanout 状态
- 不适合直接塞进 `QueryContext.mutable_session_data`
- 也不应挂在单个 `MergeScanExec` 上,否则 watcher 与 cleanup 责任会分散
### 5. Registry lifecycle
registry 生命周期分三态:
- `Active`
- 允许注册 subscriber
- watcher 正常运行
- 允许发送 update
- `Closing`
- query finish / cancel 后进入
- 停止新注册
- 发送最终 unregister / complete
- 等待少量 in-flight RPC 收尾
- `Closed`
- watcher 停止
- entry 与 subscriber map 清理完成
- 可从 query-engine runtime map 中移除
Phase 1 允许 registry 比查询主执行略长一点,但只用于善后,不作为长期全局状态。
## 组件职责
### Frontend
#### Join producer
- 产生 alive dyn filter
- 驱动本地 dyn filter 更新
#### MergeScanExec
- 建立 producer -> remote subscriber 的桥接关系
- 生成稳定 `filter_id`
- 注册 alive dyn filter 到 query-scoped registry
-`to_stream()` 时完成初始 register / remote read 建链
#### QueryRemoteDynFilterRegistryManager
- 维护 `query_id -> Arc<RemoteDynFilterRegistry>` map
- 提供:
- `get_or_init(query_id)`
- `begin_closing(query_id)`
- `reap_closed(query_id)`
#### RemoteDynFilterRegistry
- 持有 query 级状态:
- `query_id`
- lifecycle state
- `entries: filter_id -> RegisteredRemoteDynFilter`
- cleanup tail deadline
- 负责 watcher 启停、fanout、cleanup
#### RegisteredRemoteDynFilter
最小字段建议:
- `filter_id`
- `alive_dyn_filter: Arc<DynamicFilterPhysicalExpr>`
- `last_epoch`
- `last_observed_generation`
- `subscribers`
- `watch_task_handle`
### Datanode
#### Region unary RPC handler
- 接收 `RemoteDynFilterRequest`
- 校验并按 `action` 分发
- 真实 query-scoped state apply 逻辑由 datanode runtime 实现承接
#### Query-scoped remote filter state
-`query_id + filter_id` 建共享 state
- 处理 epoch 幂等与乱序
- 挂接本地 remote wrapper / scan consumer
#### Remote wrapper / apply path
- 将远端 update 转为本地 dyn filter snapshot
- 应用到现有 predicate / scan 更新路径
- 支持 remap / stable children / generation 语义
## 失败与降级语义
所有错误都必须保持“优化失败但查询继续”:
- payload 编码失败 -> 不远端发送,仅本地 dyn filter
- RPC 失败 -> 记录并降级,不中断查询
- update 早到 / 目标缺失 -> 明确为缓冲、丢弃+指标、或前端重试中的一种
- payload 解码失败 / remap 失败 -> 只关闭远端优化
- registry 或 datanode 状态异常 -> 允许失去 pruning 收益,但不能影响正确结果
## 观测与保护
Phase 1 需要至少具备:
- metrics
- update send/apply/drop
- stale epoch drop
- decode fail
- frontend/datanode registry register/unregister
- cleanup / complete / TTL
- tracing fields
- `query_id`
- `filter_id`
- `epoch`
- 必要时的 region / subscriber 元数据
- resource guards
- payload size budget
- cardinality threshold
- 节流 / 去抖
## Alternatives considered
### A. 把 registry 直接挂在 `MergeScanExec`
不选,原因:
- 同一 query 可能存在多个 bridge / exec 实例
- watcher 与 cleanup 会分散
- 不利于 query 结束后的统一善后
### B. 把 registry 直接塞进 `QueryContext.mutable_session_data`
不选,原因:
- 语义不对registry 是 execution runtime object不是 session metadata
- 并发状态、watcher task、cleanup tail 不适合放进 session-style rwlock 数据
### C. 直接上长期全局 manager
Phase 1 不选,原因:
- 过重
- 容易提前把 Task 05 的 TTL / 跨 query sweep / 全局观测复杂度引进来
- 目前 query-engine runtime map 已足够满足 query-scoped 生命周期管理
## 分阶段任务映射
- **Task 01**:定义 ABI 与 payload 边界
- **Task 02**:打通 unary RPC 控制面
- **Task 03**frontend producer / registry / dispatch scheduling
- **Task 04**datanode apply / remote wrapper / query-scoped state
- **Task 05**metrics / tracing / cleanup guard / fallback polish
- **Task 06**:端到端验证与回归基线
- **Task 07**large build-side membership 的后续扩展Bloom 等)
## Open Questions
1. datanode 对“update 早于 scan register 到达”在 Phase 1 采用哪种固定策略?
2. `children fingerprint` 的 canonicalization 是否需要单独抽成公共 helper
3. `is_complete` 与最终 unregister 在 frontend 和 datanode 两侧的最佳时序如何定义得更严格?
4. query-engine runtime map 的 sweep/reap 是否在 Phase 1 就需要后台定期任务,还是按 query finish / explicit reap 足够?
5. Task 07 的 Bloom payload 是否与 `MIN_MAX` 组合发送,还是独立演进?
## Rollout 建议
1. 先冻结 RFC 中的 identity、registry placement 和 lifecycle 决议。
2. 按 Task 01 -> 06 顺序落地 Phase 1。
3. 用 Task 06 建立回归基线后,再讨论 batched update、ack、heartbeat、Bloom membership 等 Phase 2/3 演进。

View File

@@ -1,436 +0,0 @@
# Task 01 - Dyn Filter Wire ABI 与序列化边界
## 任务目标
定义一个可版本化、可幂等重放、可降级的 `DynFilterUpdate` 最小 wire contract让 frontend 能把运行时 dyn filter 状态投影为 datanode 可解释的消息体Task 01 只固定“传什么”和“什么情况下允许传”,不把 transport、producer 调度或 datanode runtime 混进同一任务。
## 主要落点
- `src/common/query/src/request.rs`
- 可能新增 `src/common/query/src/dyn_filter.rs` 或相近模块
- 可能涉及 protobuf / serde / flight request 相关公共定义
## 前置依赖
- 无,作为后续 RPC 和状态机实现的基础任务优先完成。
## 当前实现状态(截至本轮代码修改)
按当前已经收紧后的任务边界,**Task 01 在其自身范围内已经完成**。当前已落地的内容可归为三块:
- `query_id` / `remote_query_id` 的 Phase 1 contract 与传播基础设施
- `DynFilterUpdate` ABI 的核心共享类型骨架
- `DynFilterPayload::Datafusion(Vec<u8>)` 的共享编码边界与基础 helper
### 已完成
1. `remote_query_id` 作为 Phase 1 query-scoped execution id 的载体已经落地到 `QueryContext.extensions["remote_query_id"]`
2. fresh local `QueryContext` 创建路径现在都会自动补齐 `remote_query_id`
3. `QueryContext <-> api::v1::QueryContext` 现有序列化链路已经可以携带该 extension无需 proto schema 改动。
4. frontend 正常 distributed read 主路径会把 `query_context` 带入 `RegionRequestHeader`,因此 datanode 能恢复 `remote_query_id`
5. `src/common/query/src/request.rs` 中已经新增 `DynFilterUpdate``DynFilterPayload` 等 Task 01 ABI 核心类型。
6. ABI 当前采用:
- `#[non_exhaustive] enum DynFilterPayload { Datafusion(Vec<u8>) }`
作为 Phase 1 当前唯一 payload 变体,不再保留冗余的 `filter_kind` 字段。
7. `is_complete` 命名已经替代 `is_final`,与 DataFusion `mark_complete()` 语义对齐。
8. `DynFilterPayload::Datafusion(Vec<u8>)` 的共享 encode/decode helper 已经落地到 `common-query`
- 能把 `Arc<dyn PhysicalExpr>` 编码成 payload bytes
- 能在给定 `TaskContext + Schema` 的前提下解码回 `Arc<dyn PhysicalExpr>`
- 会拒绝 `HashTableLookupExpr` 这类不应进入 Phase 1 payload 的 runtime-only expr
- 会在解码后校验 `Column(name, index)` 与输入 schema 的一致性
- 会在 encode / decode 两侧都执行 payload bytes budget 检查
### 不再计入 Task 01 的后续实现
下面这些工作依赖 Task 01 已定义好的 contract但**不再属于 Task 01 自身的未完成项**
1. `filter_id` 的 concrete frontend helper 落地
- Task 01 只定义它必须在单个 `query_id` 内稳定且可重放;实际生成逻辑归 Task 03。
2. `epoch` / `is_complete` 的 producer 递增与 consumer 执行
- Task 01 只定义 contract-level 语义;真实发送与消费分别归 Task 03 / Task 04。
3. “可序列化 / 需静默降级”的 frontend 判定逻辑
- Task 01 只定义 Phase 1 支持边界;例如 `HashTableLookupExpr` 必须静默降级为“不远端发送”,具体判定逻辑归 Task 03。
以下能力依赖 Task 01 已定义好的 ABI但其真实实现归属后续任务
- **Task 02**`DynFilterUpdate` 的真实发送/接收控制面链路
- **Task 04**datanode 侧 remote dynamic wrapper / update apply / remap runtime
因此,当前代码状态更准确地说是:
- **Task 01 / query-id groundwork已完成**
- **Task 01 / DynFilterUpdate ABI 核心类型骨架:已完成**
- **Task 01 / DynFilterUpdate 共享消息 schema、编码边界与基础 encode/decode helper已完成**
- **Task 02 / DynFilterUpdate 真实发送链路:未开始或未实现**
- **Task 04 / datanode consumer runtime未开始或未实现**
## 实现范围
1. 定义 `DynFilterUpdate` 核心字段:
- `protocol_version`
- `query_id`(建议实现为 dedicated query-scoped execution id而不是复用现有 `process_id`
- `filter_id`
- `epoch`
- `is_complete`(语义上对应 DataFusion `mark_complete()`;表示后续不会再有新 update
- `payload`
2. 第一阶段只支持 `IN` payload但 ABI 必须保留 `MIN_MAX` / `BLOOM` 扩展位。
3. 对于 `DynFilterPayload::Datafusion(Vec<u8>)`,列引用由 expr tree 内部的 `Column` 节点承担,而不是额外的 top-level binding 字段。
4. 明确可序列化判定逻辑:仅底表列上的简单等值 join dyn filter 可下发。
5. 明确降级规则:无法投影为 ABI 的 filter 仅本地使用,不进入远端传播路径。
## Phase 1 默认落地的精确定义
为了让实现可以直接开始Phase 1 先把以下设计钉死,不再保持开放状态。
### 1. `DynFilterUpdate` 的 wire shape
Phase 1 先使用“结构化字段 + payload enum”的方案
- 路由、生命周期、关联字段继续放在 `DynFilterUpdate` 外层。
- 具体过滤内容放进 `payload`
- Phase 1 的 payload 先只承载 DataFusion physical expr protobuf bytes但类型本身保持为可扩展 enum。
推荐形式:
```rust
#[non_exhaustive]
enum DynFilterPayload {
Datafusion(Vec<u8>), // DataFusion physical expr protobuf bytes
}
```
建议最小结构:
- `protocol_version: u32`
- `query_id: String`(从 `QueryContext.extensions["remote_query_id"]` 读取)
- `filter_id: String`
- `epoch: u64`
- `is_complete: bool`
- `payload: DynFilterPayload`
其中 Phase 1 的约束是:
- `DynFilterPayload::Datafusion(Vec<u8>)`
- 只用于 **simple/snapshotted expr subset**
- 当前最小交付以 `InListExpr` 为主
- `MIN_MAX BinaryExpr` 与少量简单组合 expr 只代表后续可扩展编码空间,不视为本任务的端到端承诺
receiver 侧兼容性规则也在 Task 01 固定:
-`protocol_version` 不受支持,或 payload 变体对当前节点未知,则该 update 必须被安全拒绝/丢弃并触发优化降级
- 该类拒绝只能让 remote dyn filter 失效,不能影响查询结果正确性
`DynFilterPayload::Datafusion(Vec<u8>)` 来说,列信息的 authoritative source 是 payload 内部的 DataFusion expr tree
- `Column` 节点本身携带 `name + index`
- `InListExpr` / `BinaryExpr` / `CaseExpr` 等会递归携带其子 expr
- 多列引用(例如后续可能出现的 `struct(...)` key也应该由 expr tree 本身表达
因此Phase 1 先不在 `DynFilterUpdate` 顶层重复建模 `column_binding`,避免与 expr tree 内部列引用形成双重状态源。
### 1.0 Task 01 与 Task 02 的边界
为了避免和 Task 02 混淆Task 01 在这里固定边界如下:
- **Task 01 负责**
- `DynFilterUpdate` 长什么样
- `DynFilterPayload` 如何表达与如何编码
- `query_id` / `filter_id` / `epoch` / `is_complete` 的 contract-level 语义
- 哪些上游 expr 形态允许进入 payload哪些必须降级为“不远端发送”
- **Task 02 负责**
- 通过什么 unary RPC 把 `DynFilterUpdate` 发出去
- frontend/client 怎么调用该 RPC
- datanode `region_server` 怎么接收、返回错误、做鉴权/路由
也就是说:
- Task 01 更像“消息体和消息语义本身”
- Task 02 更像“承载这条消息体的控制面通道”
如果没有 Task 01Task 02 不知道该发什么;如果没有 Task 02Task 01 定义好的消息也没有真实控制面可走。
与 Task 04 的边界也需要同时明确:
- Task 01 只定义 consumer 必须能依赖的消息语义,例如:幂等 epoch 规则、payload 必须能解码为稳定 expr subset、以及不支持 payload 时必须安全降级。
- Task 04 负责把这些约束真正落到 datanode runtime包括 RemoteDynamicFilter wrapper、child remap、scan/apply 路径和状态生命周期。
结论:
- Phase 1 可以优先复用 `DynFilterPayload::Datafusion(Vec<u8>)` 来承载简单可序列化的 expr snapshot。
- 但不能把它理解成“arbitrary DataFusion physical expr 都能安全远端传输”。
- `HashTableLookupExpr` 等 runtime-only expr 仍然不能指望该分支解决。
- Phase 1 先不引入 `Custom`,避免在 wire 尚未真正稳定前过早扩展 payload 面。
### 1.1 `DynFilterPayload::Datafusion` 的边界
这个分支的主要价值,是最大程度复用 DataFusion 已有的 physical expr protobuf 编解码逻辑,减少我们自己维护 expr AST wire format 的成本。
但 Phase 1 必须明确限制:
1. 它传输的是 snapshot 后的 physical expr bytes不是远端可持续 update 的动态 wrapper。
2. `HashTableLookupExpr` 在 DataFusion proto 路径上会退化掉,因此不能被视作有效 payload。
3. `DynFilterPayload::Datafusion(Vec<u8>)` 在当前最小交付里主要承载小 build side `InListExpr``MIN_MAX` 或简单组合 expr 只作为后续扩展时可复用的编码空间,不是本任务的端到端承诺。
4. payload 内部若引用多个 `Column`,这是 expr tree 自身的语义,不应被错误压扁成单列 binding。
5.`HashTableLookupExpr` 等运行时专属 exprPhase 1 采用静默降级:不远端发送,而不是把它当成查询级错误。
### 1.2 为什么 Phase 1 先不引入 `Custom`
在 wire 还没有真正投入跨节点兼容场景之前,先只保留 `Datafusion` 变体更简单。
但需要明确兼容性边界:
- **当前阶段**`Custom` 等新变体可以先不实现,因为还没有稳定的跨版本/跨节点协议承诺。
- **未来阶段**:一旦真实 RPC/wire 稳定并开始跨版本传输,再新增 `Custom` 或 Bloom 一类 payload就应视为协议扩展事件。
- 这意味着后续若新增 `Custom`,需要配合:
- `protocol_version` 演进;或
- 显式 capability / feature gating
- mixed-version 节点兼容策略。
### 2. `filter_id` 的生成与唯一性范围
Phase 1 先规定:
- `filter_id` 由 frontend 生成。
- 唯一性范围是 **单个 `query_id` 内唯一**,不要求跨 query 全局唯一。
- 同一个 dyn filter 在整个查询生命周期内复用同一个 `filter_id`
- 重试、重复发送、final update 都必须沿用同一个 `filter_id`
推荐生成规则:
- 使用 plan/build 阶段稳定信息生成字符串 id例如
- `<source_stage_or_plan_node>/<join_side>/<target_region_group>/<join_key_ordinal>`
- 不要使用内存地址、裸指针、临时 expr pointer、随机数作为 `filter_id`
- Phase 1 优先把它做成 frontend-side helper输入是 MergeScan / distributed plan 中稳定可见的信息,加上对应 dyn filter expr 的稳定位置,而不是依赖 datanode 物理计划生成后的临时节点身份。
### 3. `filter_id` 是逻辑 filter identitytarget 只保留为路由/注册元数据
Phase 1 改为固定以下约束:
- `query_id + filter_id` 唯一标识一条 remote dyn filter 逻辑更新流。
- `target_region_id` / `target_scan_id` 若存在,只承担 transport routing 或 consumer registration 元数据,不再是 filter state identity 的一部分。
- 同一个 datanode 上若有多个 scan/region consumer 订阅同一 remote dyn filter应共享同一个 query-local filter state并各自持有 remapped consumer 视图。
因此Phase 1 的核心关联键正式固定为:
- `query_id + filter_id`
### 4. `epoch` / `is_complete` 的幂等语义
Phase 1 先规定以下规则:
1. `epoch` 由 frontend 对同一个 `query_id + filter_id` 单调递增生成。
2. datanode 处理规则:
- `incoming_epoch > current_epoch` -> 应用更新并覆盖状态
- `incoming_epoch == current_epoch` -> 视为幂等重放,允许忽略或视为成功
- `incoming_epoch < current_epoch` -> 丢弃为过期 update
3. `is_complete = true` 的含义仅是:
- 不再期待更高 epoch 的后续 update
- **不等于立即删除 filter state**
4. 若收到 `is_complete = true` 后又收到更高 epoch
- Phase 1 视为协议错误或异常重放,记录指标并丢弃
- 不要尝试重新打开已 complete 的 filter state
### 5. Phase 1 `DynFilterPayload::Datafusion`(承载 `InListExpr`)规范
为了避免两端归一化不一致Phase 1 先锁定:
1. **NULL 语义**
-`DynFilterPayload::Datafusion` 承载的是 `InListExpr`,其值集合默认不包含 NULL。
- 若 build side key 含 NULLPhase 1 直接忽略这些 NULL不把它们编码进 remote payload。
2. **类型归一化**
- `DynFilterPayload::Datafusion` 中承载的 expr 必须在 frontend 侧已经完成类型归一化。
- datanode 不做“猜测式”隐式转换,而是依赖 expr 自身与本地 schema / wrapper remap 的一致性。
3. **去重**
- 若底层语义是 `IN`frontend 发送前先做去重。
- payload 中 values 不要求保留原始顺序。
4. **顺序**
- 可选地按稳定编码排序,便于测试和重放;至少不要让语义依赖顺序。
5. **大小上限**
- 单次 `DynFilterPayload::Datafusion` update 必须受 cardinality / bytes budget 限制。
- 一旦超过 Phase 1 上限,直接降级为“不远端发送”,而不是发送超大 payload。
这意味着 Phase 1 的 `DynFilterPayload::Datafusion` remote payload 是:
- 非 NULL
- 已归一化
- 已去重
- 有大小上限
的稳定集合,而不是原始 build-side 明细流。
## 关于 `query_id` 字段的现实约束
当前仓库里并没有一个已经稳定打通 frontend -> datanode 远端请求链路的现成 `query_id` 字段,可以直接拿来当 remote dyn filter 的 query-scoped identity。
已确认的相近概念:
1. `QueryContext.process_id`
- `src/session/src/context.rs` 中确实存在 `process_id: u32`
- 它主要用于 frontend process manager / kill query / process list。
2. `api::v1::QueryContext`
- 当前 protobuf 转换并没有把 `process_id` 编进 `api::v1::QueryContext`
- 也就是说它今天不是一个天然能跨 frontend -> datanode 传播的 query-scoped id。
3. `request_id`
- 现有 `request_id` 更多用于 Flight DoPut 这类请求/响应配对,不等于一次查询生命周期 id。
因此Task 01 里写的 `query_id` 不应被理解成“代码里已经有这个字段”。当前更合理的方向是:
- 引入一个 dedicated query-scoped execution identifier
- 不把现有 `process_id` 当作 primary wire identity 复用;
- `process_id` 继续保留给 process-list / kill / connection 语义。
在真正落实现有代码前,文档里的 `query_id` 应当被视为“设计占位符”,不是现成 API 名称。
## 推荐的 `query_id` 方案
推荐把 `query_id` 实现为 frontend 在一次分布式查询执行开始时生成的全局唯一字符串 ID优先选择
1. **UUIDv7首选**
- 优点:
- 多 frontend 下天然全局唯一,不依赖 `server_addr + process_id` 拼接。
- 时间有序,日志和调试时更容易按查询开始时间排序。
- Rust 生态成熟,实现风险低。
- 适合作为 protobuf / RPC header 中的 `string query_id`
2. **UUIDv4可接受 fallback**
- 若当前依赖或运行环境不便引入 v7可先用 v4 起步。
- 缺点是无时间有序性,日志可读性和索引局部性较差。
当前不推荐作为 primary query id 的方案:
-`u32 process_id`
- `server_addr + process_id` 复合字符串
- DataFusion `task_id`
- Flight `request_id`
原因分别是:
- `process_id` 语义偏 connection / process 管理,且今天并未统一跨协议、跨节点传播。
- `server_addr + process_id` 更适合 kill/process-list 显示,不适合作为长期内部协议主键。
- `task_id``request_id` 都不是 query-scoped distributed execution identity。
## `query_id` 的生成与传播建议
推荐的实现路径:
1. **生成时机**
- 在 frontend 侧、一次真实查询执行开始时生成一次。
- 同一 distributed query 的所有 remote dyn filter update 与 region read 请求共用同一个 `query_id`
2. **Phase 1 主传播路径:先走 `QueryContext.extensions`**
- 暂时不修改 protobuf schema也不引入新的 typed proto field。
-`query_id` 放进 `QueryContext.extensions`,例如预留一个稳定 key`remote_query_id`
- 通过现有 `QueryContext -> api::v1::QueryContext -> RegionRequestHeader.query_context` 路径传到 datanode。
- 因为当前 `extensions` 已经走现有序列化/反序列化链路,所以这条路径可以先落地、后续再决定是否升级成显式字段。
3. **消费侧**
- datanode 通过 `RegionRequestHeader` 重建 `QueryContext` 时读取同一个 `query_id`
- remote dyn filter state 以 `query_id + filter_id` 作为核心逻辑关联键target 只承担路由/注册元数据。
4. **与其他 id 的关系**
- `process_id`:继续用于 kill/process-list/connection 语义。
- `filter_id`:在单个 query 内标识某个 dyn filter。
- `query_id`:跨 frontend/datanode 关联一次 distributed query execution。
## 为什么 Phase 1 先放在 `QueryContext.extensions`
当前只有 join dyn filter 这一个场景急需 query-scoped distributed identity而直接改 protobuf schema 会引入兼容性和演进成本。对于这个阶段,更合理的权衡是:
1. **先复用已存在的可传播通道**
- `QueryContext.extensions` 已经被现有 QueryContext protobuf 转换保留。
2. **把兼容性风险降到最低**
- 不需要立刻修改 proto、生成代码、处理 mixed-version 节点兼容问题。
3. **为后续收敛保留空间**
- 若最终 remote dyn filter 只有少数场景使用该字段,可以继续保留在 extensions。
- 若后续 query-scoped execution id 被更多功能复用,再从 extensions 提升为 typed field 也更有依据。
因此Task 01 当前推荐的是“两阶段策略”而不是一步到位:
- **Phase 1**`QueryContext.extensions["remote_query_id"] = <uuidv7>`
- **Phase 2+(可选)**:待功能稳定后,再评估是否把它迁移为显式 proto 字段。
到这里为止Task 01 对 `query_id` 只保留 contract 层结论:
- carrier 固定为 `QueryContext.extensions["remote_query_id"]`
- 同一 distributed query 的 remote read 与后续 dyn filter update 必须复用同一个值
- datanode 只消费该值,不回退到 `process_id`,也不在缺失时补生成新 id
- `remote_query_id + filter_id` 是 Phase 1 的最小逻辑关联键target 信息若存在,只用于路由/注册元数据
- header 传播、请求构造、runtime 读取、清理和相关测试的实现细节,归后续 Task 02/03/04/06
## Equal Join 需要先识别的上游表达式形态
基于当前 GreptimeDB 接入方式和 DataFusion hash join 实现equal join 场景里真正传到 scan 侧的并不是一个抽象的“join filter”标签而是 `DynamicFilterPhysicalExpr` 持有的运行时 physical expr 快照。对 ABI 设计最关键的是以下几种形态:
1. 单列等值 join小 build side
- membership 形态是 `InListExpr`
- 典型语义是 `col IN (...)`
2. 单列等值 join大 build side
- membership 形态是 `HashTableLookupExpr`
- 这是对 build side 哈希表的运行时引用,不适合跨节点直接序列化。
3. 单列等值 join带统计边界
- DataFusion 还会附加 `BinaryExpr` 形式的范围条件,即 `col >= min AND col <= max`
- 最终常见形态是 `bounds AND membership`
4. 多列等值 join
- DataFusion 会先把 probe side key 组装成 `struct(...)`,再做 `InListExpr``HashTableLookupExpr` membership 判定。
- 这种复合 key 形态不适合直接作为 Phase 1 wire ABI。
5. partitioned hash join
- 可能再外包一层 `CaseExpr`,按分区路由到不同 partition 的 bounds + membership 组合。
- 这同样超出 Phase 1 可直接远端运输的稳定边界。
因此Task 01 的 ABI 设计要以“从这些上游 physical expr 形态中投影出稳定子集”为目标,而不是假设 equal join 天然只会生成简单 `IN` 列表。
## 关于 DataFusion physical expr protobuf 的使用边界
DataFusion 现有的 physical expr protobuf 序列化能力,可以作为 Task 01 的一个**窄范围实现辅助**,但不应该成为 remote dyn filter 的正式 wire contract。
可以考虑直接复用 protobuf 的场景:
- 已经 snapshot 成静态 expr 的简单过滤表达式
- `InListExpr`
- `BinaryExpr` 形式的 `MIN_MAX` 范围条件
- 不依赖 runtime-only 对象的 `CaseExpr` / `HashExpr` 组合
不应把 protobuf 当成主协议的原因:
1. 它序列化的是 expr snapshot而不是可持续 update 的 dynamic wrapper。
2. `HashTableLookupExpr` 在 DataFusion proto 层会被显式替换成 `lit(true)`,不会保留 large build-side membership 优化语义。
3. expr tree 本身不携带 remote update 所需的生命周期与路由信息,例如 `query_id``filter_id``epoch``is_complete` 以及必要的路由/注册元数据等。
因此更合适的分工是:
- remote wire contract 仍然由 `DynFilterUpdate` ABI 负责。
- 若某个 payload 已经被约束在“simple/snapshotted expr subset”内可以把 DataFusion protobuf 当成内部编码捷径,而不是外部协议本体。
## Large Build-Side Equal Join 的 Phase 1 支持边界
Task 01 只需要把 Phase 1 的 contract 边界固定清楚:
- 小 build side、可 snapshot 的 `InListExpr` / simple expr subset 可以进入 `DynFilterPayload::Datafusion`
- `HashTableLookupExpr` 等 large build-side membership 形态必须降级为“不远端发送”
- large build-side membership 的精确阈值、上游触发条件和后续远端替代表达,归 Task 07 处理
## 推荐子步骤
1. 抽象 wire 层枚举和 payload 结构,避免 datanode 依赖 frontend 内部表达式结构。
2.`query_id` 选定最终格式和生成策略,优先采用 UUIDv7 字符串。
3. 明确 Phase 1 使用 `QueryContext.extensions["remote_query_id"]` 作为传播载体,而不是先改 proto schema。
4.`filter_id``target_region_id``epoch``is_complete` 制定文档化语义,保证消息关联、重复发送和乱序处理可实现。
5. 编写 ABI 编解码单测验证版本号、payload bytes budget、malformed bytes 与类型不匹配时的行为。
6. 增加“能下发 / 不能下发”的判定测试,锁定 `InListExpr` / simple snapshot expr 可发送,而 `HashTableLookupExpr` / 多列复杂形态默认降级。
7. 把 DataFusion physical-expr protobuf 的使用边界写清楚:它只是 simple snapshot expr 的内部编码 helper不是远端 dyn filter 协议本体。
## 验收标准
- wire 消息类型稳定且不泄漏物理表达式实现细节。
- query-scoped execution id 的来源和传播方式已被明确Phase 1 先通过 `QueryContext.extensions` 引入 dedicated `query_id`,而不是直接复用 `process_id` 或先改 proto schema。
- `IN` filter 能完整表达 payload 内部的列引用与值集合,且不会额外复制一份可能失真的 top-level binding。
- equal join 的上游 physical expr 形态已经被归类清楚,并且只有可稳定投影的子集会进入远端 ABI。
- DataFusion physical-expr protobuf 是否可复用的边界已被文档化:它只适合作为 simple snapshot expr 的内部编码 helper。
- 不支持的复杂 dyn filter 会被明确降级,而不是在远端执行时报错。
- ABI 结构能够无破坏扩展到 `MIN_MAX` / `BLOOM`
## 风险与注意点
- 不要把物理列索引当作唯一绑定键,否则跨节点重建后容易错位。
- 不要为了“通用性”做成通用表达式传输系统Phase 1 必须收敛范围。
- 不要假设 equal join 天然只会得到可序列化的 `InListExpr`;对 large build-side、复杂多列或 partition 路由形态必须安全降级。
- 不要误把 DataFusion expr protobuf 当成“统一远端 dyn filter 协议”:它对 simple expr 有帮助,但会丢失动态更新语义,并且无法保留 `HashTableLookupExpr`
- 不要为了省字段而复用现有 `process_id` 充当 remote query id二者生命周期和跨节点语义不同。
- 不要在 Phase 1 为了 typed field“整洁性”过早修改 proto schema先用 `extensions` 落地,再按实际复用范围决定是否提升为显式字段。
- payload 大小、值类型归一化和序列化成本要在接口层提前限制。

View File

@@ -1,171 +0,0 @@
# Task 02 - Region Unary RPC 控制面链路
## Goal
为远程 dyn filter 建立最小可用的 frontend -> datanode unary RPC 控制面链路。Phase 1 复用现有 `greptime.v1.region.Region` unary gRPC 服务,不新增独立 gRPC service也不使用 Arrow Flight。
## Touchpoints
- `src/client/src/region.rs`
- `src/datanode/src/region_server.rs`
- Region gRPC / service 定义所在模块
-`src/common/query/src/request.rs` 相邻的请求结构定义
## Dependencies
- 依赖 Task 01 提供稳定的 `DynFilterUpdate` ABI。
## Scope
### In Scope
1.`greptime.v1.region.RegionRequest.body` 中新增单一总入口:`remote_dyn_filter: RemoteDynFilterRequest`
2. `RemoteDynFilterRequest` 作为远程 dyn filter 控制面的统一 envelope内部使用 `oneof action` 区分具体操作Phase 1 仅定义:
- `update`
- `unregister`
3. 在 frontend/client 侧补齐 `RemoteDynFilterRequest` 的封装、序列化与发送逻辑。
4. 在 datanode `region_server` 中新增 special-case 接收入口、鉴权/校验与错误映射,处理方式与现有 `Sync` / `ListMetadata` 一致。
5. 明确 RPC 返回语义:成功、幂等重放、安全忽略、缺失 `query_id`、可降级失败。
6. 确保 RPC 失败不会让主查询失败,调用方只能记录并降级。
7. `query_id` 在 Phase 1 中显式放入 `RemoteDynFilterRequest.query_id`,不依赖 `RegionRequestHeader.query_context` 透传。
8. 初始 remote read 继续负责建立 datanode 侧 scan/consumer 注册关系Task 2 的 unary RPC 只负责后续 `update / unregister` 控制面。
9. `DynFilterUpdate` 的 payload 继续承接 Task 01 已定义的最小 ABITask 02 只负责其真实发送/接收通路。
10. `unregister` 语义必须与 `is_complete` 明确区分:
- `is_complete`:不会再有更强更新
- `unregister`:该 query 在该远端已不再需要该 filter
### Out of Scope
- dedicated gRPC service
- Arrow Flight 控制面
- datanode 侧真实 dyn filter 状态管理与 apply 语义
- scan wrapper 注入
- plan remap
- DN 本地消费侧替换逻辑
- batched / streaming update
## Protocol / Design Decisions
- `query_id` 在 Phase 1 中显式放入 `RemoteDynFilterRequest.query_id`,不依赖 `RegionRequestHeader.query_context` 透传。
- `query_id + filter_id` 是远程 dyn filter state 的逻辑 identity。
- `RegionRequest.body` 顶层只增加一个统一入口 `remote_dyn_filter`
- 具体操作通过 `RemoteDynFilterRequest.oneof action` 扩展。
## Execution Order
1. 对齐现有 Region RPC 风格,保持错误处理模式一致;由于 `RegionRequest` 现有处理链路主要消费 bodyPhase 1 的 `query_id` 直接放在 `RemoteDynFilterRequest` 中。
2.`RegionRequest.body` 顶层扩展固定为单一入口 `remote_dyn_filter`,避免后续每新增一个 dyn filter 操作都重复污染顶层 `body`
3. 将 dyn filter 的操作扩展点收敛到 `RemoteDynFilterRequest.oneof action`,而不是使用“单 message + op enum + 大量共享 optional 字段”的设计。
4. 固定 scan 注册契约:由初始 remote read 显式建立 datanode 注册,后续 unary RPC 只负责 `update / unregister`
- 不再保留“首次 update 懒注册”作为 Phase 1 选项,避免 frontend 与 datanode 对 consumer 生命周期理解不一致。
- region RPC 仍可作为 transport 路由入口,但 `target_region_id` 不再是 dyn filter state identity 的一部分。
5. datanode 侧抽出独立 handler使后续从 unary 演进到 batched 或 stream 时可以复用状态处理逻辑Phase 1 允许先以 placeholder 形式接线。
6. 补客户端重试与超时策略,但默认保持保守,避免控制面风暴。
7. 增加 RPC 层单测 / 集成测试,覆盖解码失败、权限失败、`query_id` 缺失等路径。
8.`query_id` 明确 extensions / header 级测试,至少验证:
- 多 frontend 同时发起查询时 `query_id` 不冲突。
- `QueryContext.extensions["remote_query_id"]` 能随 remote read 和后续 dyn filter update 一致传到 datanode。
- `process_id` 仍保留原有 kill/process-list 语义,不与新的 distributed `query_id` 混淆。
## Expected Proto Shape
```proto
message RegionRequest {
RegionRequestHeader header = 1;
oneof body {
// existing variants...
RemoteDynFilterRequest remote_dyn_filter = N;
}
}
message RemoteDynFilterRequest {
string query_id = 1;
oneof action {
RemoteDynFilterUpdate update = 2;
RemoteDynFilterUnregister unregister = 3;
}
}
message RemoteDynFilterUpdate {
string filter_id = 1;
bytes payload = 2;
uint64 generation = 3;
bool is_complete = 4;
}
message RemoteDynFilterUnregister {
string filter_id = 1;
}
```
### 字段约定
- `query_id` 在 Phase 1 中显式放入 `RemoteDynFilterRequest.query_id`,作为控制面状态的 query 维度标识。
- `filter_id` 在 Phase 1 先保持为稳定、简单的字符串标识,不把 proto 直接绑定到某个 Rust 内部 id 类型。
- `payload` 直接承接 Task 01 已定义的 `DynFilterPayload::Datafusion(Vec<u8>)`
- `generation` 用于幂等与乱序保护。
- `is_complete` 只表示该 filter 不会再有更强更新,不表示 consumer 生命周期结束。
## Datanode Handler Shape
`remote_dyn_filter` 应作为 `RegionServer` 的 special-case request 处理,方式与现有 `Sync` / `ListMetadata` 类似。
推荐分发逻辑:
- `Body::RemoteDynFilter(req)` -> `handle_remote_dyn_filter_request(req)`
总入口 handler 负责:
1. 提取 `query_id`
2. 根据 `action` 分发到:
- `handle_remote_dyn_filter_update`
- `handle_remote_dyn_filter_unregister`
其中:
- `update` handler 在 Phase 1 至少负责校验 `query_id / filter_id / payload / generation` 与接线;真实写入 DN shared filter state 由后续状态管理任务负责。
- `unregister` handler 在 Phase 1 至少负责校验与占位接线;真实删除/注销 `query_id + filter_id` 状态由后续状态管理任务负责。
Task 2 只要求打通最小接收、分发和状态更新入口,不在本任务内引入 scan wrapper 注入、plan remap 或 DN 本地消费侧替换逻辑。
## Cross-Repo Update Flow
由于 Task 2 需要修改 `greptime.v1.region.RegionRequest.body`,因此通常不能只在当前仓库内完成;需要先更新 `greptime-proto` 仓库,再把当前仓库的 proto 依赖切到新的 commit。
推荐流程如下:
1.`greptime-proto` 仓库中创建新分支。
2. 修改 region proto
-`RegionRequest.body` 中新增 `remote_dyn_filter: RemoteDynFilterRequest`
- 新增 `RemoteDynFilterRequest`
- 新增 `RemoteDynFilterUpdate`
- 新增 `RemoteDynFilterUnregister`
3.`greptime-proto` 仓库中执行生成流程(例如 `make all`),确保生成代码与校验全部通过。
4. 提交并 push `greptime-proto` 分支,得到新的 proto commit。
5. 回到当前仓库,在 `Cargo.toml` 中将 `greptime-proto` 依赖更新到新的 commit必要时同步更新 lockfile。
6. 之后再在当前仓库内实现 Task 2 的 client / server 逻辑。
### 任务边界说明
- `greptime-proto` 仓库负责协议定义与生成代码。
- 当前仓库负责消费新 proto并实现 frontend -> datanode 的 unary RPC 收发与分发逻辑。
- 若 proto 尚未合入或依赖 commit 尚未更新,则不要在当前仓库中先行写死临时本地结构体替代正式 proto。
## Validation
- frontend 能向指定 datanode 发送 dyn filter update并在 `RemoteDynFilterRequest` 中稳定携带 `query_id`
- frontend 能通过同一总入口发送 dyn filter unregister。
- datanode 能识别并 special-case 分发 `RemoteDynFilter` 请求。
- 缺失 `query_id` / `action` 等基础字段时能返回明确错误。
- 当前阶段若真实状态管理尚未引入handler 可以明确返回 placeholder / `NotYetImplemented`,但链路与协议形状必须稳定。
- RPC 故障不会影响查询结果正确性。
- 后续演进到 batched update 或 stream 时无需推翻 handler 核心语义。
- 后续若新增 `register / cancel / heartbeat / batched update` 等操作,只需扩展 `RemoteDynFilterRequest.oneof action`,不需要再次修改 `RegionRequest.body` 顶层设计。
## Risks / Caveats
- 不要把更新通道塞回一次性 `QueryRequest`,否则生命周期和职责会继续耦合。
- 错误码必须能表达“优化失败但查询继续”的语义。
- 不要使用“单 message + op enum + 大量共享 optional 字段”的设计,否则后续会把不同 action 的字段语义搅在一起。
- 若同一 query 面向多个 region高频单发请求会放大开销接口设计要为批量化留口子。

View File

@@ -1,252 +0,0 @@
# Task 03 - Frontend Dyn Filter 生产与建链
## Goal
在 frontend 执行侧识别 join 产生的 alive dyn filter生成稳定 `filter_id`,并通过 `MergeScanExec -> query-scoped state -> initial remote read metadata` 建立最小 frontend producer / bridge 闭环。
Task 03 收缩后只负责:
- producer 识别
- stable `filter_id`
- frontend query-scoped state
- 首次 remote read 的 initial-register handoff
持续 update / unregister runtime、datanode apply、successful-path lifecycle cleanup 不再由本任务承诺闭环。
## Touchpoints
- `src/query/src/dist_plan/merge_scan.rs`
- dyn filter 生产者相关执行模块join / topk 第一阶段至少覆盖 join
- frontend query 执行生命周期管理模块
## Subtask 01 conclusion - join producer and alive dyn filter access points
- Phase 1 的 producer 关注范围固定为 **distributed join**,不在本任务内扩展到 TopK。
- 从当前测试与接线情况看alive dyn filter 的真正 owner 在 **DataFusion join dynamic filter path**,而不是 `MergeScanExec` 本身:
- `src/query/src/datafusion.rs` 中的 `test_join_dynamic_filter_pushdown_reaches_region_scan()` 证明 `HashJoinExec` 在 optimizer 之后会把 dyn filter pushdown 到 region scan。
- 在该测试里,`PartitionMode::CollectLeft` 下左侧 scan 没有收到 filter右侧 scan 收到了 dyn filter说明当前 Phase 1 join coverage 至少已经验证了 probe-side / remote scan 接收路径。
- `MergeScanExec` 的职责不是生产 dyn filter而是作为 **frontend producer 与 remote subscriber 之间的 bridge**
- 它持有 `regions``query_ctx``plan` 和 distributed scan fanout 信息。
- `to_stream()` 在发起 remote read 时已经把 `query_context` 与 tracing header 带入 `RegionRequestHeader`,是后续 register / remote read 建链的自然挂点。
- 因此Task 03 的 frontend 接入点可分为两层:
- **producer owner**DataFusion join path 持有 alive dyn filter
- **bridge layer**`MergeScanExec` 负责把该 alive dyn filter 与 remote subscriber regions/scans 建立映射,并接到 query-scoped registry
- 当前 subtask 01 只要求识别 owner 与 bridge不要求在本子任务内完成 registry、watcher 或 RPC fanout 的代码实现。
## Subtask 02 conclusion - stable `filter_id` helper
- 已在 `src/query/src/dist_plan/merge_scan.rs` 中补上 Phase 1 `filter_id` helper`build_remote_dyn_filter_id(...)`
- 当前 identity 继续遵循 Phase 1 边界:`region_id + producer-local ordinal + canonicalized children fingerprint`
- `children fingerprint` 的实现不依赖内存地址,也不直接使用 `Debug` / `Display` 字符串;而是对 child physical expr 做 DataFusion proto 序列化后再生成稳定 hash降低 alias / 格式化漂移风险。
- `partition` 没有进入 helper 的输入,因此不会被混入 `filter_id` identitypartition 仍应只留在后续 subscriber / fanout 映射中。
- 已补最小单测覆盖:等价 children 稳定、identity 输入变化会改变 `filter_id`、以及 identity 不包含 partition 维度。
## Subtask 03 conclusion - query-scoped registry skeleton
- 已新增 `src/query/src/dist_plan/remote_dyn_filter_registry.rs`,先把 Task 03 需要的 query-scoped registry 模型落成在 `dist_plan` 邻近模块。
- 当前 skeleton 已覆盖 Phase 1 最小结构:
- `QueryRemoteDynFilterRegistryManager`
- `RemoteDynFilterRegistry`
- `RegisteredRemoteDynFilter`
- `RemoteDynFilterSubscriber`
- `RemoteDynFilterRegistry` 现在显式表达三态生命周期:`Active -> Closing -> Closed`,并提供 `register_remote_dyn_filter(...)``register_subscriber(...)``begin_closing(...)``mark_closed()``reap_closed_entries()` 等最小入口,把 register / watcher / cleanup 的职责边界先固定下来。
- `RegisteredRemoteDynFilter` 先保存了 Phase 1 后续子任务会继续使用的核心状态alive `DynamicFilterPhysicalExpr` 引用、`last_epoch``last_observed_generation`、subscriber 列表,以及 watcher 是否已启动的本地标志。
- 当前实现还**没有**把 registry manager 挂到 query-engine runtime map也**没有**让 `MergeScanExec` 开始真实注册或启动 watcher这些仍留给 subtask 04/05 继续接线,避免在本子任务里把生命周期接入和 bridge 注册提前揉在一起。
- 已补 focused unit tests覆盖同一 `query_id` 复用同一个 registry、filter/subscriber 注册行为、closing 后拒绝新注册、watcher 启动去重,以及 closed registry 被 manager 回收。
## Subtask 04 conclusion - attach registry to query lifecycle
- 已在 `src/query/src/query_engine/state.rs` 上挂载 query-scoped `DynFilterRegistryManager`,把 registry 的 runtime ownership 从 `dist_plan` skeleton 接到 query-engine state。
- `QueryEngineState` 现已提供 subtask 04 约定的最小访问入口:
- `dyn_filter_registry_manager()`
- `get_or_init_remote_dyn_filter_registry(...)`
- `begin_closing_remote_dyn_filter_registry(...)`
- `reap_closed_remote_dyn_filter_registry(...)`
- 这些 helper 已统一通过 `QueryContext::remote_query_id_value()` 获取 typed `QueryId`,因此 registry placement 现在满足“**query-engine runtime map keyed by query_id**”,而不是挂在单个 `MergeScanExec``QueryContext.mutable_session_data` 上。
- 当前 subtask 04 的边界是**完成 runtime ownership 与 lifecycle access path**,而不是在本子任务里把 query finish/cancel cleanup hook 全量接入:
- closing / reap helper 已暴露
- 真正的 finish/cancel hook 仍留给后续子任务在 distributed query lifecycle 上统一接线
- 已补 focused unit tests覆盖
- 同一个 `QueryContext` 复用同一个 query-scoped registry
- 不同 `QueryContext` 拿到不同 registry
- `begin_closing(...)` / `reap_closed(...)` helper 能驱动 registry 生命周期切换与回收
- `QueryContext``remote_query_id` contract 与 registry key 绑定正确
- 结论subtask 04 的 acceptance criteria 已满足;后续子任务只需要在此基础上把 `MergeScanExec` register / watcher / RPC fanout 正式接入。
## Subtask 05 conclusion - register alive dyn filters and subscribers through `MergeScanExec`
- `MergeScanExec` 现在通过 `handle_child_pushdown_result(...)` 捕获 parent pushdown 下来的 alive dyn filter并保留它们在 `parent_filters` 里的原始 index 作为 `producer_local_ordinal`
- `MergeScanExec::to_stream` 在每个 remote region 的 read path 上都会把这些 captured filters bridge 到 query-scoped frontend registry
- 使用 `build_remote_dyn_filter_id(region_id, producer_local_ordinal, children)` 生成稳定 `filter_id`
-`register_remote_dyn_filter(...)` 注册 entry
-`register_subscriber(...)` 注册 subscriber region 映射
- 当前 subscriber metadata 仍保持 Phase 1 最小集:`region_id`。这里不额外发明 scan id`MergeScanExec` 只负责 register / bridge而不是后续长跑 update loop。
- 已补 focused tests覆盖
- 只捕获 `DynamicFilterPhysicalExpr`
- `producer_local_ordinal` 与 parent filter list index 一致
- 同一 `filter_id` 重复 bridge 时复用已有 registry entry而不是重复创建
- 结论subtask 05 的 acceptance criteria 已满足;后续子任务只需要把首次 remote read 的 initial register 一起带到 datanode 并完成 pre-scan build-link。
## Subtask 06 conclusion - send initial register during `MergeScanExec::to_stream`
- `MergeScanExec::to_stream` 可以为每个 remote region 构造 initial-register metadata并通过 per-region `QueryContext.extensions` 随首次 remote read 带到 datanode。
- 该 metadata 仍采用两层编码:
- 外层 envelopeJSON
- 内层 child exprDataFusion physical expr proto bytes
- datanode 在 Task 03 范围内只需要完成**最小 arrival/handoff**
- 能读取 initial-register metadata
- 能做最小 duplicate / 越界校验处理
- 能在 failed remote read 时清理本次 region 对应的 arrival state
- initial-register metadata 现已收敛到明确默认边界:
- 最多 `64` 条 registration
- 所有 child expr DataFusion proto bytes 总量最多 `64 KiB`
- 同一 payload 内 `filter_id` 不允许重复
- 边界外 payload 的 Task 03 语义已明确:
- frontend attach 阶段校验失败时,直接 drop 该 region 的 initial-register metadata 并记录 warning
- datanode arrival 阶段再次复用同一校验;若 payload 非法则忽略,不把它升级成长期 consumer/runtime 语义
- subtask 06 **不再宣称**以下责任已经由 Task 03 收敛:
- datanode placeholder state 的长期 ownership
- successful remote read / stream 正常结束后的 lifecycle cleanup
- scan-side consumer 已真正 build-link 完成
- remote wrapper / apply path 已准备就绪
- 已补 focused tests覆盖
- initial-register metadata JSON round-trip
- child expr proto round-trip
- region query context 确实携带 per-region initial-register metadata
- datanode 能从 query context 读取 initial-register metadata
- failed remote read 能清理本次 region 对应的 arrival state
- 结论subtask 06 只负责“首次 remote read 传输 initial-register metadata + datanode 最小 arrival/handoff + failed-read cleanup”真正的 datanode consumer/runtime/lifecycle 继续由 Task 04 接管。
## Subtask 07 rewrite target - compact frontend registry back to minimal state
- subtask 07 重写后只负责把 frontend query-scoped registry 收缩回 **state-first** 的最小边界。
- registry 可以继续持有当前边界真正需要的最小状态:
- `query_id -> filter_id` query-scoped ownership
- alive dyn filter 引用
- subscriber region 映射
- 少量纯逻辑 helper若当前边界确实需要
- registry **不应**再在 subtask 07 内承担:
- 后台 watcher 主循环
- cleanup tail
- transport dispatcher
- 为未来任务预留但当前无法验证的 lifecycle 抽象
- 若 generation / throttle helper 仍保留,必须是**纯逻辑、可单测、与 transport 解耦**的最小组件,而不是 runtime scheduler 的半成品接口。
- 结论口径应改为subtask 07 的目标是**收缩 registry 边界**,而不是在 Task 03 中完成完整 watcher/runtime。
## Subtask 08 rewrite target - reduce transport scope to thin frontend plumbing
- subtask 08 重写后只负责 Task 03 需要的最薄 transport 接口面:
- frontend/query/client 层具备发送 remote dyn filter control request 的最小 plumbing
- update / unregister 的真正 runtime 闭环仍依赖 Task 04 的 datanode sink 与 lifecycle 边界
- subtask 08 **不应**再把以下内容写成已完成:
- bridge 主动 `spawn` fanout watcher
- registry 直接发送 update / unregister 并承担 retry 语义
- 以“degrade to local-only”为名的无限同代重试
- 在 datanode sink 仍是 placeholder 时就宣称 fanout 闭环已成立
- failure semantics 必须保持有限、清晰、可验证:失败只能表示“远端优化没有生效”,不能引入无界后台循环或误导性的成功语义。
- 结论口径应改为subtask 08 只收敛 frontend transport surface不在 Task 03 内提前实现完整 fanout runtime。
## Subtask 10 follow-up - repair multi-`MergeScanExec` cleanup and `filter_id` producer scope
- 触发原因:二次 review 发现当前修复仍有一个 query-scoped ownership 边界问题frontend registry 以 `QueryId` 为 key 存放,但 cleanup tracking 仍可能挂在单个 `MergeScanExec` / exec-local stream tracker 上。
- 需要修复的 cleanup 边界:
- 同一个 query 内存在多个独立构造的 `MergeScanExec` 时,任意一个 exec 的 stream drop 都不能提前 `remove(query_id)` 删除共享 registry。
- cleanup ownership 必须提升到 query / manager scope或采用等价的 query-scoped lease/refcount不能继续依赖单个 `MergeScanExec` 的局部 tracker 判断“最后一个使用者”。
- 该修复仍属于 Task 03 frontend query-scoped state 的 leak / premature-remove 修复,不应重新引入 watcher loop、fanout scheduler、transport retry 或 datanode apply/runtime lifecycle。
- 需要修复的 `filter_id` identity 边界:
- 当前 Phase 1 规则 `region_id + producer-local ordinal + canonicalized children fingerprint` 对单个 logical producer scope 是稳定的,但不足以区分同一 query 中多个独立 `MergeScanExec` producer scope。
- 后续实现必须引入 query 内稳定的 producer scope 维度,使不同独立 `MergeScanExec` 即使拥有相同 `region_id`、producer-local ordinal 与 children fingerprint也不会生成同一个 `filter_id`
- clone / rewrite 出来的同一个 logical `MergeScanExec` 必须保留同一个 producer scope因此仍应生成相同 `filter_id`
- producer scope 不能来自内存地址、runtime object id、partition 或 datanode transport metadata这些仍不属于 `filter_id` identity。
- 必补验证:
- 同一 `QueryId` 下两个独立 cleanup leasedrop 第一个不删除 registrydrop 最后一个才删除。
- 同一 producer scope + 相同 `region_id` / ordinal / children`filter_id` 稳定相同。
- 不同 producer scope + 相同 `region_id` / ordinal / children`filter_id` 必须不同。
- cloned / replanned `MergeScanExec` 保留 producer scope`filter_id` 不漂移。
- 结论subtask 10 是 Task 03 的 review follow-up用来修正 query-scoped registry 与 producer identity 的边界;它不扩大 Task 03 到 Task 04 的 datanode apply/runtime/lifecycle 范围。
## Dependencies
- Task 01: ABI 已定义。
- Task 02: unary RPC 已能发送 update。
## Scope
### In Scope
1. 在 distributed join 场景中识别简单等值 join key 的 dyn filter 生产点。
2. 为每个可分发 filter 生成稳定 `filter_id`,优先通过 frontend-side helper 基于 MergeScan / distributed plan 中稳定可见的信息生成。
3.`MergeScanExec` 侧把 alive dyn filter 注册到 frontend query-scoped registry并建立 `query_id + filter_id -> {remote subscriber regions/scans}` 的映射关系。
4.`MergeScanExec::to_stream` 发起首次 remote read 时,为每个 remote region 携带 per-region initial-register metadata。
5. 在 Task 03 范围内,只要求 datanode 能完成 initial-register metadata 的最小 arrival/handoff而不要求正式 apply/runtime 闭环。
6. 保留 frontend query-scoped registry 的最小 state 形态;它不应在本任务内继续长成 watcher/runtime/transport 组合体。
7. 对不可序列化、超边界或不支持远端传播的 filter 直接降级为本地 dyn filter不影响 query correctness。
### Out of Scope
- datanode apply 侧状态写入与消费逻辑
- datanode successful-path cleanup / query finish-cancel lifecycle
- 后台 watcher 主循环、fanout runtime、cleanup tail、retry/backoff 策略
- registry 直接承担 transport dispatcher 或完整 control-plane 调度
- 基于 `is_used()` 的完整 unregister runtime 闭环
- join/projection/alias remap 的完整远端 consumer 语义
- dedicated control-plane transport beyond Task 02 unary RPC
- bloom / other non-IN payload encodings in Phase 1
- 全量 observability/fallback 策略收尾
## Protocol / Design Decisions
- frontend 负责识别可远端下发的 dyn filter producer。
- join 是 dyn filter producer`MergeScanExec` 是 producer 与 remote subscriber 之间的注册/建链桥接层,而不是后台调度层。
- query-scoped registry 采用 **方案 1**:实现放在 `src/query/src/dist_plan/remote_dyn_filter_registry.rs`(或等价 `dist_plan` 邻近模块),实例物理存放在 query-engine runtime map 中,并以 `query_id` 为 key 管理;它是 query-scoped state但不直接塞进 `QueryContext.mutable_session_data`
- Task 03 中的 query-scoped registry 应保持 **state-first**:持有最小 query/filter/subscriber state但不在本任务内承担完整 watcher/runtime/transport 责任。
- `filter_id` 必须由 frontend 基于稳定 plan/runtime metadata 生成,不能依赖内存地址。
- Phase 1 中 `filter_id` 采用 query 内稳定、局部唯一的规则:`region_id + producer-local ordinal + canonicalized children fingerprint`
- `partition` 不进入 `filter_id`。如果同一 remote subscriber 下多个 partition 可共享同一个 dyn filter 状态,`partition` 只应留在 subscriber / fanout 映射中,而不应拆分 identity。
- `children fingerprint` 必须来自 canonicalized children 表示,而不是直接使用 `Debug` / `Display` 输出字符串,避免 alias、格式化或 rewrite 导致漂移。
- `MergeScanExec::to_stream` 只负责初始 remote read / register handoff不在本任务内承担后续持续 update / unregister 的主循环。
- initial-register metadata 是 Task 03 到 Task 04 的 handoff不等于 datanode consumer/runtime 已闭环。
- Task 03 中任何失败都只能导致“远端优化未生效”,不能引入误导性的 success 语义或无界后台行为。
- 不可序列化或超阈值 filter 必须降级为本地 dyn filter而不是升级为查询失败。
### Decision Note - `filter_id` identity boundary
- `filter_id` 在 Phase 1 中只要求 **query 内稳定且唯一**,不追求跨 query 或跨实现版本的全局稳定。
- `filter_id` 用于标识“同一个远端共享 dyn filter 状态”,而不是表达完整 fanout 路径。
- 因此:
- 放入 identity`region_id`、producer-local ordinal、canonicalized children fingerprint
- 不放入 identity`partition`、datanode transport 信息、内存地址、runtime 临时对象 id
- 如果未来确认远端状态需要跨 region 共享,再重新收敛 identityPhase 1 先优先保证稳定、可实现、低碰撞。
### Decision Note - Task 03 / Task 04 boundary
- Task 03 保留 frontend producer / bridge / initial handoff。
- Task 04 接管 datanode consumer/runtime、apply path、successful-path cleanup 与后续 lifecycle。
- 若某个实现只有在 datanode sink、remote wrapper 或 query cleanup 已存在时才成立,则它不应继续留在 Task 03 作为“半成品 placeholder”。
## Execution Order
1. 在 plan/build 阶段识别 join 产生的可远端下发 alive dyn filter并确定其对应的 remote subscriber regionsproducer owner 在 DataFusion join path`MergeScanExec` 只承担 bridge 职责。
2.`MergeScanExec` 侧为这些 alive dyn filter 生成稳定 `filter_id`,并把它们注册到 frontend query-scoped state。
3.`MergeScanExec::to_stream` 发起首次 remote read 时,携带 per-region initial-register metadata把“该 query 下存在这个 remote dyn filter”这件事带到 datanode。
4. 让 datanode 在 Task 03 范围内只完成最小 arrival/handoff而不提前宣称 apply/runtime/lifecycle 已闭环。
5. 对 frontend registry、bridge、transport surface 做最小化收缩:删除越界 watcher/runtime/fanout/lifecycle 责任。
6. 对 unsupported / oversized / runtime-only expr 维持 local-only degradation不影响 query correctness。
## Validation
- `filter_id` identity 继续稳定,且不把 `partition` 等 fanout 维度混进 identity。
- frontend 能在首次 remote read 中稳定携带 per-region initial-register metadata。
- datanode 能在 Task 03 范围内读取 initial-register metadata并完成最小 arrival/handoff 行为。
- frontend query-scoped registry 保持 state-first而不是继续承担 watcher/runtime/transport 组合责任。
- 不可下发的 filter 会自动降级,不影响本地执行路径。
- Task 03 文档不再把 datanode apply、successful-path cleanup、完整 fanout runtime 描述成已在本任务中收敛。
## Risks / Caveats
- `filter_id` 不能依赖不稳定的内存地址或临时物理表达式指针;若不同逻辑 filter 可能碰撞,修正 `filter_id` 生成规则,而不是把 region 再塞回 identity key。
- 不要把 `partition` 混入 `filter_id`;如果 partition 只是执行态 fanout 维度,把它放进 identity 会错误拆分本应共享的远端状态。
- 目标 region 绑定必须来自分布式 plan 的确定信息,不能靠运行时模糊匹配。
- 不要把无法序列化的 runtime-only expr 升级成查询错误Phase 1 的默认策略是静默放弃远端传播并保留本地正确性。
- 不要在 datanode sink 仍未成型时,把 frontend 侧 transport/plumbing 包装成“完整 fanout 闭环已成立”。
- 如果某段实现只有在 Task 04 的 datanode runtime/lifecycle 存在时才有意义,应优先移动或删除,而不是继续以 placeholder 形式留在 Task 03。

View File

@@ -1,188 +0,0 @@
# Task 03A - Frontend Dyn Filter 最小化收缩与 subtask 06-08 重写
## Goal
把当前已经失控的 Task 03 收缩回“**frontend producer / bridge 最小闭环**”边界:
- 先重写 `remote-dyn-filter-task-03-frontend-producer.md`**subtask 06 / 07 / 08** 的目标、结论与验收口径;
- 再按新的最小边界整理代码;
- 明确哪些责任必须留在 Task 03哪些应移回 Task 04 / Task 05。
这个任务的核心不是“继续堆实现”,而是**先纠偏,再收口,再继续**。
## Why this task exists
- subtask 06 已经把 datanode placeholder arrival/state 提前拉进了 Task 03但真正的 datanode apply / remote wrapper / successful-path lifecycle 仍属于 Task 04。
- 当前未提交的 subtask 07-08 又进一步把 `bridge + registry + watcher + fanout + transport + failure semantics` 混在一起,导致 Task 03 从“bridge 层”滑成“半成品 runtime/control-plane”。
- 如果不先收缩边界,后续 Task 04/05 只会建立在一个更难收拾的前提上。
## Touchpoints
- `remote-dyn-filter-task-03-frontend-producer.md`
- `remote-dyn-filter-task-04-datanode-apply.md`
- `src/query/src/dist_plan/merge_scan.rs`
- `src/query/src/dist_plan/dyn_filter_bridge.rs`
- `src/query/src/dist_plan/remote_dyn_filter_registry.rs`
- `src/query/src/query_engine/state.rs`
- `src/query/src/region_query.rs`
- `src/frontend/src/instance/region_query.rs`
- `src/client/src/region.rs`
- `src/datanode/src/region_server.rs`
- `src/datanode/src/region_server/registrations.rs`
- `src/common/query/src/request.rs`
- `src/common/query/src/request/initial_remote_dyn_filter_reg.rs`
## Scope
### In Scope
1. 重写 Task 03 的 subtask 06-08 文档边界与验收标准。
2. 将 Task 03 收缩为“frontend producer / bridge 最小闭环”:
- 识别 distributed join producer
- 生成稳定 `filter_id`
- 在 frontend 侧维护最小 query-scoped registry state
- 在首次 remote read 中携带 per-region initial-register metadata
3. 清理或下沉当前已经越界的实现:
- bridge 启动 watcher
- registry 直接做 RPC fanout
- registry 直接承担 transport/payload/runtime lifecycle
- datanode placeholder state 的过早 ownership
4. 把需要继续存在的 helper 收缩为最小、被当前边界真正需要的形态。
5. 为后续 Task 04/05 留出清晰 handoff而不是继续在 Task 03 内偷长功能。
### Out of Scope
- 在这个任务里补全 datanode apply / remote wrapper / physical-plan hook
- 在这个任务里补全 query finish/cancel cleanup tail
- 在这个任务里落地真正的后台 watcher / throttle / retry / backoff runtime
- 在这个任务里推进 `HashTableLookupExpr` / large membership / bloom 等后续能力
- 为了“以后可能会用到”继续增加 speculative lifecycle 或 transport 抽象
## Minimal Boundary After Shrink
Task 03 收缩后只保留以下责任:
1. **producer 识别**
- frontend 只在 distributed join 场景识别可传播的 alive dyn filter。
2. **identity 生成**
- `filter_id` 继续由 frontend 基于稳定 metadata 生成;
- 不依赖内存地址;
- 不把 `partition`、transport 信息、runtime 临时对象 id 混进 identity。
3. **frontend bridge / state**
- `MergeScanExec` 只负责 capture、register、attach initial metadata
- query-scoped registry 只保留当前边界真正需要的 state
- registry 不再直接长成 runtime scheduler / transport dispatcher。
4. **initial register handoff**
- initial-register metadata 仍可随首次 remote read 带到 datanode
- 但 Task 03 不再宣称自己拥有 datanode successful-path lifecycle 或 apply/runtime state。
5. **optimization-only safety**
- 任何失败都只能导致“远端优化未生效”,不能影响 query correctness
- 不能用“degrade to local-only”的名义偷偷引入无界重试或后台循环。
## Subtask Rewrite Targets
### Rewrite subtask 06
重写后的 subtask 06 只负责:
- 定义并发送 per-region initial-register metadata
- 允许 datanode 解码并保存**最小 arrival 信息**,仅作为 Task 04 的 handoff
- 为 metadata 增加明确边界count / bytes / duplicate 处理口径;
- 当前默认边界已落地为:最多 `64` 条 registration、最多 `64 KiB` child-expr proto bytes、同一 payload 内 `filter_id` 不可重复;
- frontend attach 与 datanode arrival 复用同一校验,越界 payload 统一按 drop-and-warn 处理;
- 明确写清:
- successful-path cleanup **不是** subtask 06 责任;
- placeholder / wrapper 的正式 lifecycle **不是** Task 03 责任。
不应再把以下内容写成 subtask 06 已完成:
- datanode placeholder registry 的长期 ownership
- query finish/cancel/TTL cleanup 设计已定
- scan-side consumer 已经 build-link 完成
### Rewrite subtask 07
重写后的 subtask 07 只负责:
- 收缩 frontend registry 到 state-first、可测试的最小形态
- 如果 generation / throttle helper 仍需要存在,必须保持为**纯逻辑 helper**
- 不再让 registry 自己承担 watcher runtime、cleanup tail、transport fanout。
不应再把以下内容写成 subtask 07 范围:
- query 运行期后台 watcher 主循环
- `Closing -> Closed` 的完整 lifecycle 语义
- query finish/cancel 后的 cleanup tail
- 依赖远端 sink 已存在的 fanout 行为
### Rewrite subtask 08
重写后的 subtask 08 只负责:
- 收敛 Task 03 需要的最薄 transport 接口面;
- 明确 update / unregister 真正落地前提Task 04 必须已有可消费的 datanode sink 与生命周期边界;
- 把 failure semantics 定义成**有限、可验证、不会误导的行为**。
不应再把以下内容写成 subtask 08 已完成:
- bridge 主动 `spawn` fanout watcher
- registry 直接发送 update / unregister 并承担 retry 语义
- 以“degrade to local-only”为名的无限同代重试
- 在 datanode sink 仍是 placeholder 时就宣称 fanout 闭环成立
## Code Cleanup Principles
1. **先删职责,再谈重构**
- 优先删除越界责任,而不是继续包装它。
2. **registry 保持 state-first**
- registry 可以有最小 state 与纯逻辑 helper
- 不应同时拥有 runtime task management、transport、cleanup tail。
3. **bridge 保持 bridge**
- `MergeScanExec` / bridge 负责 register 与 initial handoff
- 不负责后台调度。
4. **Task 04 拥有 datanode consumer/runtime**
- 任何涉及 remote wrapper、apply path、successful-path lifecycle 的实现,都应回到 Task 04 文档和代码中定义。
5. **删掉当前边界无法被真实验证的抽象**
- 如果一个接口目前只在测试里自洽、在生产路径里并不成立,应优先删掉或下沉,而不是保留“以后可能会用”。
## Suggested Execution Order
1. 先重写 `remote-dyn-filter-task-03-frontend-producer.md` 中 subtask 06-08 的目标、范围、验收标准与结论口径。
2. 再对照新边界整理代码:
- 删除越界 watcher / fanout / lifecycle
- 收缩 registry / bridge / state ownership
- 把 datanode 侧提前落入 Task 03 的责任退回 Task 04 handoff
3. 补齐新的 focused tests只验证收缩后仍成立的最小边界。
4. 最后再决定:哪些能力进入 Task 04哪些推迟到 Task 05。
## Deliverables
- 更新后的 `remote-dyn-filter-task-03-frontend-producer.md`(重点是 subtask 06-08
- 必要时同步调整 `.tmp/tasks/task-03-frontend-producer/subtask_06.json`
- 必要时同步调整 `.tmp/tasks/task-03-frontend-producer/subtask_07.json`
- 必要时同步调整 `.tmp/tasks/task-03-frontend-producer/subtask_08.json`
- 收缩后的 frontend-side code changes
- 与 Task 04 对齐后的责任边界说明
## Validation
- Task 03 文档不再把 watcher/runtime/fanout/lifecycle 混成一个任务。
- `MergeScanExec` / bridge 不再直接启动后台 fanout loop。
- frontend registry 不再直接承担 transport dispatcher 责任。
- datanode 侧在 Task 03 范围内不再宣称自己已拥有正式 apply/runtime lifecycle。
- 保留下来的 helper 都能被当前边界真实验证,而不是依赖未来任务兜底。
- 所有失败路径继续满足“只影响优化收益,不影响 query correctness”。
## Risks / Notes
- 这个任务会让 Task 03 暂时“能力变少”,但这是预期结果;目标是先恢复边界一致性。
- 如果发现某段代码只有在 Task 04 存在时才有意义,应优先移动或删除,而不是继续在 Task 03 保留 placeholder。
- 文档重写必须先于代码整理,否则很容易一边删代码、一边沿用旧边界,导致第二轮返工。

View File

@@ -1,71 +0,0 @@
# Task 04 - Datanode Filter State 管理与 Scan 应用
## 任务目标
在 datanode 侧维护 query-scoped dyn filter state`query_id + filter_id` 做幂等更新,并把远端 update 安全应用到现有 scan predicate 更新路径region/scan 只承担本地 subscriber registration 元数据。
Task 04 必须把“remote dyn filter consumer 侧的 remap 能力”视为正式实现要求而不是附带优化datanode 侧需要一个类似 `RemoteDynamicFilterPhysicalExpr` 的 runtime wrapper由它承接来自 frontend 的 update并向本地物理执行计划暴露 `current()` / `update()` / `mark_complete()` 风格的动态 expr 语义,同时支持 `with_new_children` / child remap保证同一个远端 filter 能在不同 scan / projection / file reader 视图下被安全复用。
## 主要落点
- `src/datanode/src/region_server.rs`
- `src/table/src/predicate.rs`
- `src/table/src/table/scan.rs`
- `src/mito2/src/read/scan_region.rs`
- 可能新增 query-scoped runtime state 模块
## 前置依赖
- Task 01: ABI 与 payload / remote-wrapper 语义稳定。
- Task 02: datanode 已能接收 RPC。
## 实现范围
1. 建立两层注册结构:
- 共享 filter state键为 `query_id + filter_id`
- 本地 subscriber 注册:记录哪些 region/scan consumer 正在订阅这个共享 filter state
- 该注册结构应优先在初始 remote read / scan 构建阶段建立,使后续 unary update 只需要查共享 state 并更新现有 wrapper。
2. 校验 epoch 语义:更大则应用、相同则幂等、过小则丢弃。
3.`IN` wire payload 解码为本地 predicate 可识别的 dyn filter expr snapshot并挂入 remote dynamic wrapper。
4. 将更新接入现有 `predicate` / `scan_region` 路径,而不是重写扫描裁剪机制。
5. 明确定义“update 先于 scan 注册到达”时的策略Phase 1 至少固定一种行为:缓冲等待、显式丢弃并打指标、或要求前端按可重试错误重发。
6. 处理异常路径query 已结束、scan 不存在、`query_id` 缺失或关联键不完整、payload 解码失败、wrapper remap 失败、类型不兼容。
7. 在 query 正常结束、取消或 TTL 超时后回收状态,并保留 complete 后的最终 filter state 直到确实没有本地 consumer 使用它。
- 若 frontend 明确发来“该 dyn filter 已无人使用”的注销信号datanode 也应及时移除对应注册项。
- 若 datanode 本地 remote wrapper / `DynamicFilterPhysicalExpr` 已通过 `is_used()` 类语义确认没有 scan consumer也可作为 eager local cleanup 信号。
8. 确保 remote wrapper 固定原始 children 集合,并在不同 consumer 调用 `with_new_children` 时生成各自 remapped 视图,而不是共享一个会被后续 remap 覆盖的单一实例。
9. 在 datanode 将接收到的逻辑计划转成物理计划时,为预期接收远端 dyn filter 的 scan 安装 placeholder / remote wrapper使后续 update 到达时能够挂接到正确的 consumer 上。
- **这一步是 subtask 06 之后的正式落点**subtask 06 只保证 initial-register metadata 到达 datanode 并落入 placeholder state**不**提前声明 scan-side consumer 已完成 build-link。
10. 接管 placeholder registry 的 successful-path lifecycleremote read 正常完成、stream drop、query cancel、query finish、TTL 超时都必须能回收 query-scoped placeholder / wrapper state避免 datanode 常驻泄漏。
11. 为 remote wrapper 保持 generation / snapshot version 语义,使 scan 侧能判断 update 是否改变了当前过滤快照。
12. 保证 remote wrapper 暴露的 dyn filter 在 update 前后维持稳定的 boolean 返回类型与 nullability 契约;不兼容 update 只能安全降级。
## 推荐子步骤
1. 先复用现有本地 dyn filter 容器能力,只在最外层补 query-scoped 状态管理和远端写入口。
- 把“初始 remote read 注册 scan / 后续 unary RPC 更新已有注册项”固定为首选契约,避免在首次 update 时才临时创建 consumer。
2. 把 epoch 校验与状态更新封装成纯逻辑单元,便于单测覆盖乱序和重复更新。
3. 先实现 remote wrapper固定 children、支持 `current()``update()``mark_complete()`,并验证 remap 后的 expr 仍与本地 schema 一致。
4. 为 scan 提供“收到远端 update 后刷新 predicate 快照”的清晰边界,避免并发下直接改底层状态导致竞态。
5.`is_complete` 只承担“不会再有后续 update”的语义不在收到 complete marker 时立即回收最终 filter state。
6. 将 query finish / cancel / TTL 作为兜底状态回收入口,并把 registry 生命周期接口固定下来;同时补上 successful remote read / stream 正常结束 / stream drop 的 cleanup避免 subtask 06 留下的 placeholder state 长驻。
7. 在 datanode physical-plan optimizer / scan build hook 上完成真正的 consumer 安装,而不是依赖 `table_provider(...)` 一类过早的弱信号。
8. 为 remote wrapper 增加 generation/version 与类型不变式检查,确保 remap 或后续 update 不会把 bool filter 变成不兼容 expr。
## 验收标准
- datanode 能接收并应用远端 `IN` filter 更新,且实际扫描量下降。
- 乱序和重复 update 不会破坏状态机。
- scan 可以在没有 dyn filter 时先启动update 到达后继续渐进增强 pruning不会因为等待控制面而阻塞执行。
- “update 早到”路径有明确且可测试的行为定义。
- frontend 发来的 unregister / end-of-use 信号能让 datanode 及时移除对应 subscriber当共享 filter state 已无本地 consumer 时datanode 也能借助本地 `is_used()` 类语义主动回收,而 query cancel / TTL 仍可作为兜底。
- remote wrapper 能在多个 scan consumer 上正确 remap children不会因为 projection / schema 变化把 filter 应用到错误列索引。
- remote wrapper 能稳定暴露 generation/version 变化,并保持 boolean / nullability 契约不被后续 update 破坏。
- 找不到目标 scan 或解码失败时只降级,不影响查询正确性。
- query 结束后不会遗留 filter state。
## 风险与注意点
- 不要绕开已有 predicate/scan 更新能力重新造一套 scan 侧状态系统。
- 状态表生命周期必须和 query 生命周期绑定,否则会出现内存泄漏。
- 不要把 remote wrapper 做成“单实例共享 remapped children”的可变对象不同 consumer 必须拥有各自 remapped 视图,否则极易出现“更新成功但裁剪错误”的隐蔽问题。

View File

@@ -1,57 +0,0 @@
# Task 05 - 观测性、降级与生命周期保护
## 任务目标
补齐 remote dyn filter 机制的可观测性、资源保护和错误降级能力,确保它始终是“可失效的优化项”,而不是新的稳定性风险源。
说明query-scoped filter state 的生命周期契约与实际清理由 Task 04 主持Task 05 只负责为这些行为补充指标、日志、预算和兜底策略。
## 主要落点
- frontend / datanode metrics 模块
- tracing / logging 模块
- query lifecycle cleanup 相关逻辑
- RPC 调用保护、内存预算与节流配置模块
## 前置依赖
- Task 02-04 至少具备基本端到端语义。
## 实现范围
1. 增加 metrics:
- update 发送次数
- update 应用次数
- 过期 epoch 丢弃次数
- payload 解码失败次数
- complete marker / cleanup 次数
- frontend registry 注册 / 注销次数
- datanode registry 注册 / 注销次数
2. 增加 tracing 字段:`query_id``filter_id``epoch`,以及必要时的 route/subscriber region 元数据。
3. 为 Task 04 已定义的 TTL 清理和 query cancel cleanup 补充指标、日志、超时配置和告警信号。
4. 增加 cardinality、payload 大小、内存预算保护。
5. 明确降级语义:控制面失败、目标缺失、类型不匹配、状态丢失都只关闭优化,不中断查询。
6. 为 batched update / heartbeat / ack 预留配置和抽象边界。
7. 为 frontend 基于 `is_used()` 触发的 unregister 路径,以及 datanode 基于本地 `is_used()` 类语义触发的 eager cleanup 路径,补充日志、指标和异常回收兜底,避免“逻辑上已无人使用但状态未清理”不可观测。
## 推荐子步骤
1. 先定义最关键的 metrics 名称和标签,避免后续改动观测口径。
2. 在 frontend 和 datanode 两端都打 tracing便于串联一次 update 的全链路。
3. 将清理逻辑做成统一入口,避免 query finish、cancel、timeout 各自分叉。
4. 把降级事件写入日志或指标,让“优化没生效”的原因可追踪。
5. 为“update 早到被缓冲/丢弃/重试”的策略增加可观测性字段,方便后续判断是否需要演进到 stream 或更强注册机制。
6. 区分“frontend 正常 unregister”与“query cancel / TTL 兜底回收”,避免把两种清理路径混为一谈。
## 验收标准
- 能看见每个 filter 的发送、接收、应用、丢弃和清理情况。
- query 结束、取消、异常断连后,状态能被可靠回收。
- 当控制面出错时,查询结果保持正确且问题可观测。
- 内存与频率保护能阻止异常 workload 打爆控制面或状态表。
## 风险与注意点
- 没有指标的优化很难上线调参Task 05 不能被视作“后补文档工作”。
- TTL 只是兜底,不应替代 complete marker 和 query cancel hook。
- tracing 不能打印过大的 payload 内容,避免日志膨胀与敏感数据泄露。

View File

@@ -1,58 +0,0 @@
# Task 06 - 端到端验证与回归基线
## 任务目标
建立 remote dyn filter 的功能、一致性、可靠性和性能验证矩阵,证明该机制在带来 pruning 收益的同时,不会因异常路径破坏查询正确性。
## 主要落点
- `tests/`
- `tests-integration/`
- 分布式 query / datanode / region 相关测试目录
- 基准或 benchmark 脚本目录
## 前置依赖
- Task 01-05 完成最小可用实现。
## 实现范围
1. 功能测试:
- distributed hash join小 build side 对大 probe side 产生远端 pruning
- 无法序列化表达式时自动降级且结果正确
- 初始 remote read 与后续 dyn filter update 使用同一个 `remote_query_id + filter_id` 逻辑 identity并稳定绑定到对应 subscriber scan 集合
2. 一致性测试:
- update 乱序
- 重复 epoch
- complete marker 早到 / 晚到
- 仅部分 region 收到 update
- `remote_query_id` / `filter_id` 缺失或关联键不完整时安全降级,不串线到其他 query state
- datanode 本地 `is_used()` 类 eager cleanup 与 frontend unregister / TTL 兜底不会互相打架或造成误删
3. 可靠性测试:
- frontend 中途停止发送
- datanode 重启或 scan 提前结束
- query cancel 后状态释放
4. 性能验证:
- 高频小 update 与低频批量 update 对比
- 不同 `IN` cardinality 阈值收益曲线
5. 建立回归基线,为后续 Phase 2/3 演进提供对照数据。
## 推荐子步骤
1. 先补最小集成测试,证明“能工作且失败不影响结果”。
2. 再补乱序 / 幂等 / 清理类单测,把状态机语义锁住。
3. 最后增加 benchmark 或 profiling 脚本,比较不同更新策略的控制面开销。
4. 为日志和 metrics 增加断言,确保观测口径不会回归。
## 验收标准
- 至少一个 distributed join 集成测试证明远端 pruning 生效。
- 所有异常路径测试都证明查询结果仍然正确。
- 状态机相关测试覆盖乱序、重复、complete marker、超时回收。
- 有可重复运行的性能对照基线,能支撑是否继续推进 stream/batch 演进。
## 风险与注意点
- 只测 happy path 没有意义remote dyn filter 的价值在于异常时也必须安全退化。
- 性能测试必须同时看收益和控制面成本,不能只看扫描量下降。
- Phase 2/3 若没有基线,后续优化很难判断是真提升还是只是在搬移开销。

View File

@@ -1,238 +0,0 @@
# Task 07 - Large Build-Side Equal Join Membership 远端表达
## 任务目标
为“大 build side 的等值 join dyn filter”设计一个可跨 frontend / datanode 传输的 membership 表达方案,覆盖当前 `HashTableLookupExpr` 无法远端序列化的问题,并明确它与 Phase 1 `IN` ABI 的关系。
## 为什么值得独立成任务
Phase 1 只支持把小 build side 的 `InListExpr` 投影为远端 `IN` payload但 DataFusion 对 equal join 的 membership 形态会在 build side 稍大时自动切到 `HashTableLookupExpr`。如果不单独处理这一类场景,分布式 dyn filter 在很多“build side 不算很大、但已经超出 InList 阈值”的 join 上会直接失效。
当前代码里的触发条件是按 partition 计算的双阈值:
- `hash_join_inlist_pushdown_max_size`,默认 128KB / partition。
- `hash_join_inlist_pushdown_max_distinct_values`,默认 150 distinct values / partition。
因此 Task 07 要解决的不是极端超大 build side 的罕见场景,而是“中等规模等值 join 一旦越过 InList 边界就缺少远端表达”的普遍问题。
## 已澄清的上游事实
### 1. build side 不是边读边多次下发 membership 更新
对当前 DataFusion hash join 实现来说dynamic filter 的关键 membership 更新并不是随着 build side 每个 batch 逐步下发的高频流,而是:
- 先完成 build side 的 hash table / bounds 收集。
- 每个 build partition 上报一次 `PartitionBuildData`
- `SharedBuildAccumulator` 等待所有相关 partition 都上报后,统一构造最终 filter expr。
- 之后执行一次 `dynamic_filter.update(...)`,并立即 `mark_complete()`
也就是说:
- 从“单个 query 的最终 consumer”视角看hash join dyn filter 更接近“一次最终更新”而不是持续增量流。
- 在 partitioned join 下,内部会有多个 partition 各自上报一次,但对 probe-side scan 可见的是 barrier 汇总后的最终 filter。
这使得“大 build side membership 走一次性远端 payload”成为现实可行的设计方向。
### 2. `HashTableLookupExpr` 不能直接远端序列化
`HashTableLookupExpr` 持有 build-side runtime hash table 引用。DataFusion 自己在 physical expr proto 序列化时都无法直接传它,当前做法是把它降级成 `lit(true)` 跳过这个优化。
因此,后续任务的目标不应该是“直接序列化 `HashTableLookupExpr`”,而应该是“为它代表的大 build side membership 语义设计一个稳定、可传输的替代表达”。
## 主要问题陈述
当前 equal join dyn filter 的 membership 形态分裂为:
- 小 build side: `InListExpr`
- 大 build side: `HashTableLookupExpr`
这两种形态在本地单机执行都成立但远端传播只适合前者。Task 07 需要为后者定义远端替代表示,并让 datanode scan 能消费它。
## 初步方向Bloom-Based Membership Payload
一个合理的后续方向是:
- 不运输 `HashTableLookupExpr` 本身。
- 不继续强行塞进 `Payload::Datafusion(Vec<u8>)` 分支。
- 而是把它落到 Task 01 预留的 `Payload::Custom(...)` 分支里。
- 在 frontend / build side 完成 hash join build 阶段后,把最终 membership 状态投影成 `BLOOM` 类型的稳定 payload。
- 将该 payload 一次性发送给 datanode scan作为远端 dyn filter 的 membership 部分。
这个方向成立的前提是:
1. Bloom 只承担“近似 membership 过滤”的职责,允许 false positive但绝不能有 false negative。
2. 查询正确性仍由 join 本身保证Bloom 只是优化项。
3. scan 侧需要新的 apply 路径来消费 `BLOOM` payload而不是继续假设上游一定给它 DataFusion 的 `DynamicFilterPhysicalExpr`
## 外部实现先例(用于校准方向,不直接照搬)
- Spark、Velox、Doris 等系统都存在“用 Bloom filter 表达大 build-side 等值 join membership”或相近 runtime filter 思路,说明 Bloom 作为远端 membership payload 是有现实先例的。
- Trino、TiDB 一类实现更偏向“小集合走离散值,大集合退化到范围 / 其他保守表示”,说明 Bloom 不是唯一答案,但它是大集合 membership 过滤里最常见的折中选择之一。
- 这些系统的共同点是Bloom 都被当成独立的过滤表示,而不是去序列化执行器内部的 hash table 引用对象。
## 但不能直接简化成“转成 Bloom 就完了”的原因
### 1. 语义并不完全等价
`HashTableLookupExpr` 是精确 membershipBloom 是近似 membership。它可以作为远端替代方案但不是等价序列化。
### 2. 多列等值 join 需要稳定的复合 key 编码
DataFusion 多列等值 join 会先构造 `struct(...)` 再做 lookup。若要生成 Bloom payload必须先定义
- 多列 key 的序列化顺序
- 每列类型编码规则
- null 值处理策略
- schema / encoding version
否则 frontend 构造的 key 与 datanode probe side 计算出的 key 无法一致命中。
### 3. partitioned join 需要处理分区路由语义
DataFusion 在 partitioned 模式下可能构造带 `CaseExpr` 的分区路由 filter。Task 07 需要明确:
- 是生成一个全局 Bloom牺牲部分选择率换简单协议
- 还是生成按 partition 分片的 Bloom并要求 datanode 复现同样的 routing 逻辑;
- 还是直接先从 CollectLeft / 非分区方案切入。
### 4. 仅有 Bloom 不一定覆盖全部收益
当前 hash join dyn filter 还可能叠加 `min/max` bounds。若只传 Bloom不传 bounds就会失去一部分 statistics pruning 收益。因此更合理的长期方案可能是:
- membership: `BLOOM`
- bounds: `MIN_MAX`
两者组合使用,而不是二选一。
### 5. datanode 当前没有“远端 Bloom dyn filter payload -> scan pruning”现成路径
GreptimeDB 当前 scan 侧虽然有 bloom filter index applier但它是从逻辑 `Expr`(如 `IN` / `=` / `OR-of-EQ`)构建的,不是直接接收一个远端传来的 Bloom payload。
所以 Task 07 必须包含 datanode apply 层设计,而不是只定义 wire message。
## 主要落点
- `src/common/query/src/request.rs`
- 可能新增 `src/common/query/src/dyn_filter.rs`
- `src/query/src/dist_plan/merge_scan.rs`
- `src/client/src/region.rs`
- `src/datanode/src/region_server.rs`
- `src/table/src/predicate.rs`
- `src/mito2/src/read/scan_region.rs`
- `src/mito2/src/sst/index/bloom_filter/applier/builder.rs`
## 可复用的现有模块
Task 07 不需要从零手写 Bloom 算法或位图 probing 逻辑,以下模块值得优先复用:
1. `src/index/src/bloom_filter/applier.rs`
- 已有基于 `fastbloom::BloomFilter` 的 probe 逻辑。
- 其中 `InListPredicate``contains(...)` 风格的匹配方式可作为 runtime Bloom dyn filter 的低层参考。
2. `src/index/src/bloom_filter/creator.rs`
- 已有 Bloom 构建参数、误判率、编码格式和内存控制经验。
- 即便最终不直接复用文件格式,也应复用其参数选型与 codec 思路。
3. `src/index/src/bloom_filter/reader.rs`
- 若后续决定让远端 payload 与本地 Bloom bytes 表示保持兼容,可借用 reader/metadata 约定。
4. `fastbloom::BloomFilter`
- 当前代码栈已经在使用,第一版无需再引入新的 Bloom 实现。
复用原则优先复用“Bloom 数据结构、编码、probe 逻辑”,不要直接复用“面向 SST 持久化索引的整条业务流程”。
## 不建议直接复用的现有模块
以下模块和当前 SST bloom index 工作流绑定很深,不适合被直接拿来当 remote dyn filter 路径:
1. `src/mito2/src/sst/index/bloom_filter/applier/builder.rs`
- 它的职责是从逻辑 `Expr` 中提取 `Eq` / `InList` / `OR-of-EQ`,再转成 `InListPredicate`
- remote dyn filter 场景里,上游输入将是 query-scoped 的远端 `BLOOM` payload而不是逻辑 SQL 表达式。
2. `src/mito2/src/sst/index/bloom_filter/applier.rs`
- 它的职责是读取 SST / puffin 中的 bloom index blob并对 row-group 搜索范围做裁剪。
- remote dyn filter 需要的是“内存中的 query-scoped filter state + scan apply 路径”,不是读取持久化索引文件。
3. `src/mito2/src/sst/index/bloom_filter/creator.rs`
- 这是文件级索引构建流程的一部分,关注点是落盘和 segment 化,不是 runtime membership payload 的生成。
边界原则:
- 可复用的是 low-level Bloom primitive。
- 不应直接复用的是 SST index builder / applier 的文件级工作流。
## 第一版最小实现路径
建议第一版把 remote Bloom dyn filter 做成一条独立、简单、query-scoped 的 runtime 路径,而不是硬接入现有 SST bloom index 框架。
### Step 1 - 定义独立的 `BLOOM` wire payload
-`src/common/query/src/request.rs` 或新建 `src/common/query/src/dyn_filter.rs` 中定义:
- bloom bytes
- hash / seed 参数
- false positive rate 或构建参数
- key encoding version
- null policy
- scopesingle-column / composite / optional partition scope
### Step 2 - frontend 只做“一次最终 Bloom 更新”
- 在 hash join build 完成后,把最终 membership 投影为 Bloom payload。
- Phase 1.5/2 的最小实现先只覆盖单列等值 join。
- 暂不尝试复用 DataFusion 的 `HashTableLookupExpr` 本体,也不做增量 Bloom update。
### Step 3 - datanode 维护独立的 runtime Bloom state
-`src/datanode/src/region_server.rs` / query-scoped state 模块中保存远端 Bloom dyn filter。
- 它和现有 `DynamicFilterPhysicalExpr` / SST bloom index 都保持解耦,只共享底层 Bloom bytes / probing primitive。
### Step 4 - scan 增加专门的 apply / evaluate 路径
-`src/table/src/predicate.rs``src/mito2/src/read/scan_region.rs` 增加 remote Bloom dyn filter 的应用入口。
- 第一版可以只要求它参与 runtime filtering不强求直接接入所有现有 pruning optimizer。
- 若需要与统计裁剪协同,则继续保留并组合 `MIN_MAX` payload。
### Step 5 - 再考虑与现有 bloom index 的协同
- 等 runtime Bloom dyn filter 路径稳定后,再评估是否能与 `src/index/src/bloom_filter/*` 的表示格式进一步统一。
- 不要在第一版就把 query-scoped runtime filter 和 file-scoped persistent bloom index 强行合流。
## 前置依赖
- Task 01 完成 `IN` / `MIN_MAX` / `BLOOM` ABI 扩展位定义。
- Task 02-04 完成 query-scoped dyn filter control plane 和 datanode apply 基础能力。
## 实现范围
1. 明确“大 build side equal join membership”的远端目标语义。
2. 设计 `BLOOM` payload 结构,至少包括:
- bitset bytes
- hash function / seeds
- key encoding version
- false positive rate 或构建参数
- null policy
- 是否为单列 / 多列 key
- 可选 partition scope
3. 设计从 DataFusion build-side membership 状态投影到远端 `BLOOM` payload 的 frontend 逻辑。
4. 设计 datanode 侧的 apply 逻辑,让 table scan 能把远端 `BLOOM` membership 用于 pruning / filtering。
5. 决定 Bloom 与 `MIN_MAX` 是否组合下发,以及组合后的应用顺序。
6. 明确 Phase 2/3 的支持边界:单列优先,还是直接覆盖多列等值 join。
## 推荐子步骤
1. 先锁定“单列等值 join + 一次最终 Bloom 更新”的最小可行路径。
2. 明确 Bloom key 编码规范,并做 frontend / datanode 一致性测试。
3. 增加 per-partition 与 global Bloom 的方案比较,选择 Phase 1.5/2 的默认方向。
4. 评估是否需要保留 fallback若 Bloom 构建开销或 bitset 太大,则继续降级为不下发。
5. 评估如何与现有 bloom filter index / parquet bloom filter 能力协同,而不是重复实现两套近似过滤机制。
## 验收标准
- 已有证据证明 hash join 大 build side membership 只需要一次最终远端更新即可表达主要收益。
- `HashTableLookupExpr` 的远端替代表达已定义,且不依赖传输 DataFusion runtime hash table 本体。
- 单列等值 join 场景下,远端 `BLOOM` dyn filter 能在 datanode scan 侧带来可观过滤收益。
- false positive 不影响查询正确性false negative 被设计和测试明确禁止。
- 对多列 key、partitioned join、null 语义、bounds 组合的限制被文档化。
## 风险与注意点
- 不要把 Bloom 方案描述成“序列化 HashTableLookupExpr”它只是一个可运输的近似替代表达。
- per-partition 阈值不等于全局 build side 大小,分区 join 下可能需要不同策略。
- 多列 key 的编码协议如果不先锁定,后续很容易出现 frontend / datanode 不一致。
- 只做 Bloom 而不考虑 `MIN_MAX`,可能拿不到 hash join dyn filter 的完整收益。
- 现有 scan 侧 bloom 能力主要围绕逻辑表达式和索引 applierTask 07 需要补新的远端 payload 消费路径。

View File

@@ -1,74 +0,0 @@
# Remote Dyn Filter Implementation Tasks
基于 `/home/discord9/greptimedb_for_build/docs/dyn-filter-propagation-plan-zh.md` 拆分的实现任务文档集合。
统一设计说明见:`remote-dyn-filter-rfc.md`
## 目标
- 将 frontend 产生的 dyn filter 增量传播到 datanode table scan。
- 先用 Phase 1 跑通分布式 `IN` filter 语义,再为后续传输优化与过滤类型扩展留出协议空间。
- 所有任务都以“不影响查询正确性,只影响优化收益”为硬约束。
## 任务清单
1. `remote-dyn-filter-task-01-wire-abi.md`
- 定义 Dyn Filter 最小 wire contractidentity、payload 边界、版本/epoch 语义与降级规则。
2. `remote-dyn-filter-task-02-region-rpc.md`
-`RegionRequest.body` 中新增 `remote_dyn_filter` 总入口,并用 `oneof action` 承载 `update / unregister`,打通 frontend -> datanode unary 控制面。
3. `remote-dyn-filter-task-03-frontend-producer.md`
- 在 frontend 侧识别可分发 dyn filter完成 filter_id、epoch、目标 region 的生成与下发。
4. `remote-dyn-filter-task-03-minimal-shrink.md`
- 收缩已经越界的 Task 03先重写 subtask 06-08 文档,再按最小边界整理 frontend/datanode 相关代码责任。
5. `remote-dyn-filter-task-04-datanode-apply.md`
- 在 datanode 侧维护 query-scoped filter state并将 update 应用到 scan predicate。
6. `remote-dyn-filter-task-05-observability-fallback.md`
- 增加 metrics、tracing、TTL 清理、错误降级与控制面保护。
7. `remote-dyn-filter-task-06-validation.md`
- 完成功能、一致性、可靠性、性能验证,并为 Phase 2/3 演进保留回归基线。
8. `remote-dyn-filter-task-07-large-build-membership.md`
- 为 large build-side equal join 的 lookup-like membership 设计远端可传输表示,重点覆盖 `HashTableLookupExpr` 无法直接序列化的问题。
## 建议执行顺序
1. 先完成 ABI 定义,避免 RPC 和状态机实现反复返工。
2. 然后基于 `RegionRequest.body.remote_dyn_filter` 打通 unary RPC 请求链路,让 frontend 和 datanode 拥有最小控制面,并把 dyn filter 操作扩展点统一收敛到 `RemoteDynFilterRequest.oneof action`
3. 若 Task 03 的实现已经漂出最小边界,先执行 `remote-dyn-filter-task-03-minimal-shrink.md`,再继续推进 Task 03 / Task 04。
4. 接着分别实现 frontend 生产者和 datanode 消费者,完成端到端语义闭环。
5. 最后补齐观测性、清理、降级和测试,确保该优化在失败时能自动失效而不是破坏查询。
## 完整工作流程Phase 1
1. **frontend 发现可下推 dyn filter**
-`MergeScanExec` 所在查询树上DataFusion 调用 `gather_filters_for_pushdown` 时会把当前可下推的 dyn filters 暴露给下游 scan。
- 这一刻是 frontend 侧为 remote dyn filter 建立注册关系的最佳时机:记录 `query_id + filter_id` 这一逻辑 filter identity并维护它当前订阅了哪些 remote region/scan同时保存对应本地 `DynamicFilterPhysicalExpr` 的引用。
2. **初始 remote read 负责把“会有这个 dyn filter”这件事带到 datanode**
- 后续 `MergeScanExec::to_stream` 发起 remote read 时,仍然带着同一个 `query_id` 与目标 region/scan 身份。
- datanode 在接收初始请求并构建 scan 时,为这些即将接收远端更新的 dyn filters 建立注册项,并安装 placeholder / remote wrapper。
3. **frontend 持续观察 dyn filter 更新并转成远端 update**
- frontend 注册中心通过本地 dyn filter 的更新通知(例如 `wait_update` / generation 变化)感知新快照。
- 每次有新快照时frontend 将其编码为 `DynFilterUpdate`,并通过 Task 02 定义的 unary RPC 发送给对应 datanode。
4. **datanode 注册中心接收 update 并刷新本地 dyn filter**
- datanode 根据 `query_id + filter_id` 找到共享的 remote filter state并把 update 分发给当前挂接在该 state 上的本地 consumer / wrapper 视图。
- 若 epoch 更大则应用更新,若重复/过期则按幂等规则安全忽略。
- scan 在后续 pruning / predicate 计算时读取 wrapper 当前快照,从而渐进增强裁剪收益。
5. **frontend 负责判定“这个 dyn filter 在本查询里是否已无人使用”**
- 对 frontend 本地持有的 `DynamicFilterPhysicalExpr``is_used()` 可作为“该查询内是否仍有 consumer 在使用这个 dyn filter”的判据。
- 一旦 frontend 确认该 dyn filter 在本查询中已无人使用,就可以把它从 frontend 注册中心注销,并通知相关 datanode 注销对应注册项。
- datanode 侧也可以把本地 remote wrapper / `DynamicFilterPhysicalExpr``is_used()` 作为“当前节点上是否还有 scan consumer”的快速判据用于 eager local cleanup但它只是正常清理的补充信号不替代显式 unregister / cancel / TTL。
6. **异常和兜底清理**
- `is_used()` 触发的是正常路径下的前端注销信号异常断连、query cancel、frontend 崩溃等场景仍需依赖显式 cancel/complete 和 datanode TTL 兜底。
## 实施前先澄清的契约
- 初始 remote read 请求必须携带与后续 dyn filter update 相同的 `query_id + filter_id` 逻辑 identityregion/scan 只承担注册与路由元数据,不再是 filter state identity 的一部分。
- datanode 需要明确定义“update 早于 scan 注册到达”时的行为Phase 1 应至少固定为缓冲、显式丢弃加指标、或前端重试中的一种。
- `is_final` 仅表示后续不会再有更强 update不等于可立即删除最终 filter state真正回收仍以 query finish / cancel / TTL 为准。
- query-scoped filter state 的生命周期与清理责任以 Task 04 为主Task 05 只补充观测、预算和兜底机制。
## Phase 对应关系
- Task 01-06 覆盖 Phase 1 的最小可用版本。
- Task 05 和 Task 06 同时为 Phase 2 的 batched update、ack、watermark、heartbeat 做准备。
- Phase 3 的 `MIN_MAX` / `BLOOM` 只需要在 Task 01 预留 ABI 扩展点,再在后续增量任务中落地。
- Task 07 是一个独立的后续任务,面向“大 build side equal join membership”的远端表示设计更接近 Phase 2/3 的增强项。