diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 532ea366a6..8d85ca8320 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -17,9 +17,10 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::sync::Arc; -use common_meta::key::table_info::TableInfoManager; +use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_meta::key::table_name::TableNameManager; use hydroflow::scheduled::graph::Hydroflow; +use itertools::Itertools; use minstant::Anchor; use prost::bytes::buf; use query::QueryEngine; @@ -36,10 +37,12 @@ use crate::compute::{Context, DataflowState, ErrCollector}; use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; use crate::plan::{Plan, TypedPlan}; -use crate::repr::{self, DiffRow, RelationType, Row, BOARDCAST_CAP}; +use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BOARDCAST_CAP}; use crate::transform::sql_to_flow_plan; pub(crate) mod error; +#[cfg(test)] +mod tests; use error::Error; @@ -54,8 +57,7 @@ pub struct FlowNodeManager<'subgraph> { /// The state of all tasks in the flow node /// This is also the only field that's not `Send` in the struct pub task_states: BTreeMap>, - - // TODO: catalog/tableinfo manager for query schema and translate sql to plan + /// The query engine that will be used to parse the query and convert it to a dataflow plan query_engine: Arc, srv_map: TableIdNameMapper, /// contains mapping from table name to global id, and table schema @@ -91,6 +93,48 @@ impl TableIdNameMapper { .map(|name| name.unwrap().table_name()) .map(|name| vec![name.catalog_name, name.schema_name, name.table_name]) } + /// query metasrv about the table name and table id + pub async fn get_table_info_value( + &self, + table_id: &TableId, + ) -> Result, Error> { + Ok(self + .table_info_manager + .get(*table_id) + .await + .with_context(|_| TableNotFoundMetaSnafu { + msg: format!("TableId = {:?}, couldn't found table name", table_id), + })? + .map(|v| v.into_inner())) + } + + pub async fn get_table_schema(&self, table_id: &TableId) -> Result { + let table_info_value = self + .get_table_info_value(table_id) + .await? + .with_context(|| TableNotFoundSnafu { + name: format!("TableId = {:?}, Can't found table info", table_id), + })?; + let raw_schema = table_info_value.table_info.meta.schema; + let column_types = raw_schema + .column_schemas + .into_iter() + .map(|col| ColumnType { + nullable: col.is_nullable(), + scalar_type: col.data_type, + }) + .collect_vec(); + + let key = table_info_value.table_info.meta.primary_key_indices; + let keys = vec![repr::Key::from(key)]; + + let time_index = raw_schema.timestamp_index; + Ok(RelationType { + column_types, + keys, + time_index, + }) + } } /// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState` @@ -477,7 +521,11 @@ impl FlowWorkerContext { gid } else { let global_id = self.new_global_id(); + let table_name = srv_map.get_table_name(&table_id).await.unwrap(); + let schema = srv_map.get_table_schema(&table_id).await.unwrap(); + self.schema.insert(global_id, schema); + self.table_repr.insert(table_name, table_id, global_id); global_id } diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index 4b40b80522..6d3622f245 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -27,7 +27,7 @@ use datatypes::types::cast; use datatypes::types::cast::CastOption; use datatypes::value::Value; use itertools::Itertools; -pub(crate) use relation::{ColumnType, RelationDesc, RelationType}; +pub(crate) use relation::{ColumnType, Key, RelationDesc, RelationType}; use serde::{Deserialize, Serialize}; use snafu::ResultExt;