refactor: use table name for sink table

This commit is contained in:
discord9
2024-05-07 17:28:39 +08:00
parent d5bdbedcd6
commit a6727e2e8d
4 changed files with 62 additions and 55 deletions

View File

@@ -14,7 +14,7 @@
//! for getting data from source and sending results to sink
//! and communicating with other parts of the database
use std::borrow::BorrowMut;
use std::borrow::{Borrow, BorrowMut};
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
@@ -530,7 +530,7 @@ impl FlownodeManager {
pub async fn create_flow(
&self,
flow_id: FlowId,
sink_table_id: TableId,
sink_table_name: TableName,
source_table_ids: &[TableId],
create_if_not_exist: bool,
expire_when: Option<String>,
@@ -549,15 +549,15 @@ impl FlownodeManager {
let mut node_ctx = self.node_context.lock().await;
// assign global id to source and sink table
for source in source_table_ids
.iter()
.chain(std::iter::once(&sink_table_id))
{
for source in source_table_ids {
node_ctx
.assign_global_id_to_table(&self.table_info_source, *source)
.assign_global_id_to_table(&self.table_info_source, None, Some(*source))
.await;
}
node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_id);
node_ctx
.assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None)
.await;
node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone());
// construct a active dataflow state with it
let flow_plan = sql_to_flow_plan(node_ctx.borrow_mut(), &self.query_engine, &sql).await?;
@@ -568,11 +568,7 @@ impl FlownodeManager {
let _ = flow_options;
// TODO(discord9): add more than one handles
let sink_id = node_ctx
.table_repr
.get_by_table_id(&sink_table_id)
.unwrap()
.1;
let sink_id = node_ctx.table_repr.get_by_name(&sink_table_name).unwrap().1;
let sink_sender = node_ctx.get_sink_by_global_id(&sink_id)?;
let source_ids = source_table_ids
@@ -672,7 +668,7 @@ impl FlowNodeContext {
&mut self,
task_id: FlowId,
source_table_ids: &[TableId],
sink_table_id: TableId,
sink_table_name: TableName,
) {
for source_table_id in source_table_ids {
self.add_source_sender(*source_table_id);
@@ -682,7 +678,6 @@ impl FlowNodeContext {
.insert(task_id);
}
let sink_table_name = self.table_repr.get_by_table_id(&sink_table_id).unwrap().0;
self.add_sink_receiver(sink_table_name.clone());
self.flow_to_sink.insert(task_id, sink_table_name);
}
@@ -707,9 +702,12 @@ impl FlowNodeContext {
.table_repr
.get_by_global_id(id)
.with_context(|| TableNotFoundSnafu {
name: format!("{:?}", id),
name: format!("Global Id = {:?}", id),
})?
.1;
.1
.with_context(|| TableNotFoundSnafu {
name: format!("Table Id = {:?}", id),
})?;
self.source_sender
.get(&table_id)
.with_context(|| TableNotFoundSnafu {
@@ -727,7 +725,10 @@ impl FlowNodeContext {
.with_context(|| TableNotFoundSnafu {
name: format!("{:?}", id),
})?
.0;
.0
.with_context(|| TableNotFoundSnafu {
name: format!("Global Id = {:?}", id),
})?;
self.sink_receiver
.get(&table_name)
.map(|(s, _r)| s.clone())
@@ -761,21 +762,27 @@ impl FlowNodeContext {
/// Assign a global id to a table, if already assigned, return the existing global id
///
/// and will try to fetch the schema from table info manager(if table exist now)
///
/// NOTE: this will not actually render the table into collection refered as GlobalId
/// merely creating a mapping from table id to global id
pub async fn assign_global_id_to_table(
&mut self,
srv_map: &TableInfoSource,
table_id: TableId,
table_name: Option<TableName>,
table_id: Option<TableId>,
) -> GlobalId {
if let Some((_name, gid)) = self.table_repr.get_by_table_id(&table_id) {
if let Some((_name, gid)) = table_name
.as_ref()
.and_then(|table_name| self.table_repr.get_by_name(table_name))
{
gid
} else {
let global_id = self.new_global_id();
let table_name = srv_map.get_table_name(&table_id).await.unwrap();
let schema = srv_map.get_table_schema(&table_id).await.unwrap();
self.schema.insert(global_id, schema);
if let Some(table_id) = table_id {
let schema = srv_map.get_table_schema(&table_id).await.unwrap();
self.schema.insert(global_id, schema);
}
self.table_repr.insert(table_name, table_id, global_id);
global_id
@@ -793,7 +800,7 @@ impl FlowNodeContext {
pub struct TriMap {
name_to_global_id: HashMap<TableName, GlobalId>,
id_to_global_id: HashMap<TableId, GlobalId>,
global_id_to_name_id: BTreeMap<GlobalId, (TableName, TableId)>,
global_id_to_name_id: BTreeMap<GlobalId, (Option<TableName>, Option<TableId>)>,
}
impl TriMap {
@@ -801,32 +808,32 @@ impl TriMap {
Default::default()
}
pub fn insert(&mut self, name: TableName, id: TableId, global_id: GlobalId) {
self.name_to_global_id.insert(name.clone(), global_id);
self.id_to_global_id.insert(id, global_id);
pub fn insert(&mut self, name: Option<TableName>, id: Option<TableId>, global_id: GlobalId) {
name.clone()
.and_then(|name| self.name_to_global_id.insert(name.clone(), global_id));
id.and_then(|id| self.id_to_global_id.insert(id, global_id));
self.global_id_to_name_id.insert(global_id, (name, id));
}
pub fn get_by_name(&self, name: &TableName) -> Option<(TableId, GlobalId)> {
self.name_to_global_id.get(name).and_then(|global_id| {
self.global_id_to_name_id
.get(global_id)
.map(|(_name, id)| (*id, *global_id))
pub fn get_by_name(&self, name: &TableName) -> Option<(Option<TableId>, GlobalId)> {
self.name_to_global_id.get(name).map(|global_id| {
let (_name, id) = self.global_id_to_name_id.get(global_id).unwrap();
(*id, *global_id)
})
}
pub fn get_by_table_id(&self, id: &TableId) -> Option<(TableName, GlobalId)> {
self.id_to_global_id.get(id).and_then(|global_id| {
self.global_id_to_name_id
.get(global_id)
.map(|(name, _id)| (name.clone(), *global_id))
pub fn get_by_table_id(&self, id: &TableId) -> Option<(Option<TableName>, GlobalId)> {
self.id_to_global_id.get(id).map(|global_id| {
let (name, _id) = self.global_id_to_name_id.get(global_id).unwrap();
(name.clone(), *global_id)
})
}
pub fn get_by_global_id(&self, global_id: &GlobalId) -> Option<(TableName, TableId)> {
self.global_id_to_name_id
.get(global_id)
.map(|(name, id)| (name.clone(), *id))
pub fn get_by_global_id(
&self,
global_id: &GlobalId,
) -> Option<(Option<TableName>, Option<TableId>)> {
self.global_id_to_name_id.get(global_id).cloned()
}
}

View File

@@ -60,16 +60,16 @@ impl flow_server::Flow for FlowService {
flow_options,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_id = self
.manager
.table_info_source
.get_table_id_from_proto_name(&sink_table_name)
.await?;
let sink_table_name = vec![
sink_table_name.catalog_name,
sink_table_name.schema_name,
sink_table_name.table_name,
];
let ret = self
.manager
.create_flow(
task_id.id as u64,
sink_table_id,
sink_table_name,
&source_table_ids,
create_if_not_exists,
Some(expire_when),

View File

@@ -46,15 +46,15 @@ impl Flownode for FlownodeManager {
flow_options,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_id = self
.table_info_source
.get_table_id_from_proto_name(&sink_table_name)
.await
.map_err(to_meta_err)?;
let sink_table_name = vec![
sink_table_name.catalog_name,
sink_table_name.schema_name,
sink_table_name.table_name,
];
let ret = self
.create_flow(
task_id.id as u64,
sink_table_id,
sink_table_name,
&source_table_ids,
create_if_not_exists,
Some(expire_when),

View File

@@ -145,7 +145,7 @@ mod test {
let name = vec!["numbers".to_string()];
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
let mut tri_map = TriMap::new();
tri_map.insert(name.clone(), 0, gid);
tri_map.insert(Some(name.clone()), Some(0), gid);
FlowNodeContext {
schema: HashMap::from([(gid, schema)]),
table_repr: tri_map,