diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index bbd4abcb54..aed05f4f41 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -14,7 +14,7 @@ //! for getting data from source and sending results to sink //! and communicating with other parts of the database -use std::borrow::BorrowMut; +use std::borrow::{Borrow, BorrowMut}; use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::sync::Arc; @@ -530,7 +530,7 @@ impl FlownodeManager { pub async fn create_flow( &self, flow_id: FlowId, - sink_table_id: TableId, + sink_table_name: TableName, source_table_ids: &[TableId], create_if_not_exist: bool, expire_when: Option, @@ -549,15 +549,15 @@ impl FlownodeManager { let mut node_ctx = self.node_context.lock().await; // assign global id to source and sink table - for source in source_table_ids - .iter() - .chain(std::iter::once(&sink_table_id)) - { + for source in source_table_ids { node_ctx - .assign_global_id_to_table(&self.table_info_source, *source) + .assign_global_id_to_table(&self.table_info_source, None, Some(*source)) .await; } - node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_id); + node_ctx + .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None) + .await; + node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone()); // construct a active dataflow state with it let flow_plan = sql_to_flow_plan(node_ctx.borrow_mut(), &self.query_engine, &sql).await?; @@ -568,11 +568,7 @@ impl FlownodeManager { let _ = flow_options; // TODO(discord9): add more than one handles - let sink_id = node_ctx - .table_repr - .get_by_table_id(&sink_table_id) - .unwrap() - .1; + let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1; let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?; let source_ids = source_table_ids @@ -672,7 +668,7 @@ impl FlowNodeContext { &mut self, task_id: FlowId, source_table_ids: &[TableId], - sink_table_id: TableId, + sink_table_name: TableName, ) { for source_table_id in source_table_ids { self.add_source_sender(*source_table_id); @@ -682,7 +678,6 @@ impl FlowNodeContext { .insert(task_id); } - let sink_table_name = self.table_repr.get_by_table_id(&sink_table_id).unwrap().0; self.add_sink_receiver(sink_table_name.clone()); self.flow_to_sink.insert(task_id, sink_table_name); } @@ -707,9 +702,12 @@ impl FlowNodeContext { .table_repr .get_by_global_id(id) .with_context(|| TableNotFoundSnafu { - name: format!("{:?}", id), + name: format!("Global Id = {:?}", id), })? - .1; + .1 + .with_context(|| TableNotFoundSnafu { + name: format!("Table Id = {:?}", id), + })?; self.source_sender .get(&table_id) .with_context(|| TableNotFoundSnafu { @@ -727,7 +725,10 @@ impl FlowNodeContext { .with_context(|| TableNotFoundSnafu { name: format!("{:?}", id), })? - .0; + .0 + .with_context(|| TableNotFoundSnafu { + name: format!("Global Id = {:?}", id), + })?; self.sink_receiver .get(&table_name) .map(|(s, _r)| s.clone()) @@ -761,21 +762,27 @@ impl FlowNodeContext { /// Assign a global id to a table, if already assigned, return the existing global id /// + /// and will try to fetch the schema from table info manager(if table exist now) + /// /// NOTE: this will not actually render the table into collection refered as GlobalId /// merely creating a mapping from table id to global id pub async fn assign_global_id_to_table( &mut self, srv_map: &TableInfoSource, - table_id: TableId, + table_name: Option, + table_id: Option, ) -> GlobalId { - if let Some((_name, gid)) = self.table_repr.get_by_table_id(&table_id) { + if let Some((_name, gid)) = table_name + .as_ref() + .and_then(|table_name| self.table_repr.get_by_name(table_name)) + { gid } else { let global_id = self.new_global_id(); - - let table_name = srv_map.get_table_name(&table_id).await.unwrap(); - let schema = srv_map.get_table_schema(&table_id).await.unwrap(); - self.schema.insert(global_id, schema); + if let Some(table_id) = table_id { + let schema = srv_map.get_table_schema(&table_id).await.unwrap(); + self.schema.insert(global_id, schema); + } self.table_repr.insert(table_name, table_id, global_id); global_id @@ -793,7 +800,7 @@ impl FlowNodeContext { pub struct TriMap { name_to_global_id: HashMap, id_to_global_id: HashMap, - global_id_to_name_id: BTreeMap, + global_id_to_name_id: BTreeMap, Option)>, } impl TriMap { @@ -801,32 +808,32 @@ impl TriMap { Default::default() } - pub fn insert(&mut self, name: TableName, id: TableId, global_id: GlobalId) { - self.name_to_global_id.insert(name.clone(), global_id); - self.id_to_global_id.insert(id, global_id); + pub fn insert(&mut self, name: Option, id: Option, global_id: GlobalId) { + name.clone() + .and_then(|name| self.name_to_global_id.insert(name.clone(), global_id)); + id.and_then(|id| self.id_to_global_id.insert(id, global_id)); self.global_id_to_name_id.insert(global_id, (name, id)); } - pub fn get_by_name(&self, name: &TableName) -> Option<(TableId, GlobalId)> { - self.name_to_global_id.get(name).and_then(|global_id| { - self.global_id_to_name_id - .get(global_id) - .map(|(_name, id)| (*id, *global_id)) + pub fn get_by_name(&self, name: &TableName) -> Option<(Option, GlobalId)> { + self.name_to_global_id.get(name).map(|global_id| { + let (_name, id) = self.global_id_to_name_id.get(global_id).unwrap(); + (*id, *global_id) }) } - pub fn get_by_table_id(&self, id: &TableId) -> Option<(TableName, GlobalId)> { - self.id_to_global_id.get(id).and_then(|global_id| { - self.global_id_to_name_id - .get(global_id) - .map(|(name, _id)| (name.clone(), *global_id)) + pub fn get_by_table_id(&self, id: &TableId) -> Option<(Option, GlobalId)> { + self.id_to_global_id.get(id).map(|global_id| { + let (name, _id) = self.global_id_to_name_id.get(global_id).unwrap(); + (name.clone(), *global_id) }) } - pub fn get_by_global_id(&self, global_id: &GlobalId) -> Option<(TableName, TableId)> { - self.global_id_to_name_id - .get(global_id) - .map(|(name, id)| (name.clone(), *id)) + pub fn get_by_global_id( + &self, + global_id: &GlobalId, + ) -> Option<(Option, Option)> { + self.global_id_to_name_id.get(global_id).cloned() } } diff --git a/src/flow/src/adapter/server.rs b/src/flow/src/adapter/server.rs index 35ad79b040..3989a16d79 100644 --- a/src/flow/src/adapter/server.rs +++ b/src/flow/src/adapter/server.rs @@ -60,16 +60,16 @@ impl flow_server::Flow for FlowService { flow_options, })) => { let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec(); - let sink_table_id = self - .manager - .table_info_source - .get_table_id_from_proto_name(&sink_table_name) - .await?; + let sink_table_name = vec![ + sink_table_name.catalog_name, + sink_table_name.schema_name, + sink_table_name.table_name, + ]; let ret = self .manager .create_flow( task_id.id as u64, - sink_table_id, + sink_table_name, &source_table_ids, create_if_not_exists, Some(expire_when), diff --git a/src/flow/src/adapter/standalone.rs b/src/flow/src/adapter/standalone.rs index 4a1a8762ad..3fec54a726 100644 --- a/src/flow/src/adapter/standalone.rs +++ b/src/flow/src/adapter/standalone.rs @@ -46,15 +46,15 @@ impl Flownode for FlownodeManager { flow_options, })) => { let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec(); - let sink_table_id = self - .table_info_source - .get_table_id_from_proto_name(&sink_table_name) - .await - .map_err(to_meta_err)?; + let sink_table_name = vec![ + sink_table_name.catalog_name, + sink_table_name.schema_name, + sink_table_name.table_name, + ]; let ret = self .create_flow( task_id.id as u64, - sink_table_id, + sink_table_name, &source_table_ids, create_if_not_exists, Some(expire_when), diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 6159e02d62..b302e0ae5d 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -145,7 +145,7 @@ mod test { let name = vec!["numbers".to_string()]; let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); let mut tri_map = TriMap::new(); - tri_map.insert(name.clone(), 0, gid); + tri_map.insert(Some(name.clone()), Some(0), gid); FlowNodeContext { schema: HashMap::from([(gid, schema)]), table_repr: tri_map,