feat: get table schema

This commit is contained in:
discord9
2024-04-26 16:17:44 +08:00
parent 3fb3fb18c2
commit 640674b9bc
2 changed files with 53 additions and 5 deletions

View File

@@ -17,9 +17,10 @@
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
use common_meta::key::table_info::TableInfoManager;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::TableNameManager;
use hydroflow::scheduled::graph::Hydroflow;
use itertools::Itertools;
use minstant::Anchor;
use prost::bytes::buf;
use query::QueryEngine;
@@ -36,10 +37,12 @@ use crate::compute::{Context, DataflowState, ErrCollector};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, DiffRow, RelationType, Row, BOARDCAST_CAP};
use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BOARDCAST_CAP};
use crate::transform::sql_to_flow_plan;
pub(crate) mod error;
#[cfg(test)]
mod tests;
use error::Error;
@@ -54,8 +57,7 @@ pub struct FlowNodeManager<'subgraph> {
/// The state of all tasks in the flow node
/// This is also the only field that's not `Send` in the struct
pub task_states: BTreeMap<TaskId, ActiveDataflowState<'subgraph>>,
// TODO: catalog/tableinfo manager for query schema and translate sql to plan
/// The query engine that will be used to parse the query and convert it to a dataflow plan
query_engine: Arc<dyn QueryEngine>,
srv_map: TableIdNameMapper,
/// contains mapping from table name to global id, and table schema
@@ -91,6 +93,48 @@ impl TableIdNameMapper {
.map(|name| name.unwrap().table_name())
.map(|name| vec![name.catalog_name, name.schema_name, name.table_name])
}
/// query metasrv about the table name and table id
pub async fn get_table_info_value(
&self,
table_id: &TableId,
) -> Result<Option<TableInfoValue>, Error> {
Ok(self
.table_info_manager
.get(*table_id)
.await
.with_context(|_| TableNotFoundMetaSnafu {
msg: format!("TableId = {:?}, couldn't found table name", table_id),
})?
.map(|v| v.into_inner()))
}
pub async fn get_table_schema(&self, table_id: &TableId) -> Result<RelationType, Error> {
let table_info_value = self
.get_table_info_value(table_id)
.await?
.with_context(|| TableNotFoundSnafu {
name: format!("TableId = {:?}, Can't found table info", table_id),
})?;
let raw_schema = table_info_value.table_info.meta.schema;
let column_types = raw_schema
.column_schemas
.into_iter()
.map(|col| ColumnType {
nullable: col.is_nullable(),
scalar_type: col.data_type,
})
.collect_vec();
let key = table_info_value.table_info.meta.primary_key_indices;
let keys = vec![repr::Key::from(key)];
let time_index = raw_schema.timestamp_index;
Ok(RelationType {
column_types,
keys,
time_index,
})
}
}
/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState`
@@ -477,7 +521,11 @@ impl FlowWorkerContext {
gid
} else {
let global_id = self.new_global_id();
let table_name = srv_map.get_table_name(&table_id).await.unwrap();
let schema = srv_map.get_table_schema(&table_id).await.unwrap();
self.schema.insert(global_id, schema);
self.table_repr.insert(table_name, table_id, global_id);
global_id
}

View File

@@ -27,7 +27,7 @@ use datatypes::types::cast;
use datatypes::types::cast::CastOption;
use datatypes::value::Value;
use itertools::Itertools;
pub(crate) use relation::{ColumnType, RelationDesc, RelationType};
pub(crate) use relation::{ColumnType, Key, RelationDesc, RelationType};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;