From 8240a1ace1ebcbbe5d39cb50e8f62f484e3ccbf8 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 25 Apr 2024 15:06:02 +0800 Subject: [PATCH] feat: find all used collection --- src/flow/src/plan.rs | 47 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index 5e5723e44d..993666294b 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -18,12 +18,15 @@ mod join; mod reduce; +use std::collections::BTreeSet; + use datatypes::arrow::ipc::Map; use serde::{Deserialize, Serialize}; use crate::adapter::error::Error; use crate::expr::{ - AggregateExpr, EvalError, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, TypedExpr, + AggregateExpr, EvalError, GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, ScalarExpr, + TypedExpr, }; use crate::plan::join::JoinPlan; pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan}; @@ -182,3 +185,45 @@ pub enum Plan { consolidate_output: bool, }, } + +impl Plan { + /// Find all the used collection in the plan + pub fn find_used_collection(&self) -> BTreeSet { + fn recur_find_use(plan: &Plan, used: &mut BTreeSet) { + match plan { + Plan::Get { id } => { + match id { + Id::Local(_) => (), + Id::Global(g) => { + used.insert(*g); + } + }; + } + Plan::Let { value, body, .. } => { + recur_find_use(value, used); + recur_find_use(body, used); + } + Plan::Mfp { input, .. } => { + recur_find_use(input, used); + } + Plan::Reduce { input, .. } => { + recur_find_use(input, used); + } + Plan::Join { inputs, .. } => { + for input in inputs { + recur_find_use(input, used); + } + } + Plan::Union { inputs, .. } => { + for input in inputs { + recur_find_use(input, used); + } + } + _ => {} + } + } + let mut ret = Default::default(); + recur_find_use(self, &mut ret); + ret + } +}