From 6659f3cc620fbbb5387c7a9a4caff9b23720fcf8 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 22 May 2024 14:55:33 +0800 Subject: [PATCH] fix: reorder write requests --- src/flow/src/adapter.rs | 10 ++++- src/flow/src/adapter/flownode_impl.rs | 53 ++++++++++++++++++++++++++- src/flow/src/adapter/node_context.rs | 8 ++-- src/flow/src/adapter/table_source.rs | 32 ++++++++++------ src/flow/src/repr/relation.rs | 8 +++- src/flow/src/transform.rs | 4 +- src/flow/src/transform/plan.rs | 2 +- src/flow/src/utils.rs | 2 - 8 files changed, 93 insertions(+), 26 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 712cd5cbf2..340ab6991a 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -303,6 +303,7 @@ impl FlownodeManager { .clone(); // TODO(discord9): use default key from schema let primary_keys = schema + .typ() .keys .first() .map(|v| { @@ -326,11 +327,18 @@ impl FlownodeManager { .with_time_index(true); let wout_ts = schema + .typ() .column_types + .clone() .into_iter() .enumerate() .map(|(idx, typ)| { - ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable) + let name = schema + .names + .get(idx) + .cloned() + .unwrap_or(format!("Col_{}", idx)); + ColumnSchema::new(name, typ.scalar_type, typ.nullable) }) .collect_vec(); diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index e770bb5e4c..9b9b6d3626 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -14,13 +14,17 @@ //! impl `FlowNode` trait for FlowNodeManager so standalone can call them +use std::collections::HashMap; + use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse}; use api::v1::region::InsertRequests; use common_error::ext::BoxedError; use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu}; use common_meta::node_manager::Flownode; +use common_telemetry::debug; use itertools::Itertools; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; use crate::adapter::FlownodeManager; use crate::repr::{self, DiffRow}; @@ -101,12 +105,57 @@ impl Flownode for FlownodeManager { async fn handle_inserts(&self, request: InsertRequests) -> Result { for write_request in request.requests { let region_id = write_request.region_id; - let rows_proto = write_request.rows.map(|r| r.rows).unwrap_or(vec![]); + let table_id = RegionId::from(region_id).table_id(); + + let (insert_schema, rows_proto) = write_request + .rows + .map(|r| (r.schema, r.rows)) + .unwrap_or_default(); + // TODO(discord9): reconsider time assignment mechanism let now = self.tick_manager.tick(); + + let fetch_order = { + let ctx = self.node_context.lock().await; + let table_col_names = ctx + .table_repr + .get_by_table_id(&table_id) + .map(|r| r.1) + .and_then(|id| ctx.schema.get(&id)) + .map(|desc| &desc.names) + .context(UnexpectedSnafu { + err_msg: format!("Table not found: {}", table_id), + })?; + let name_to_col = HashMap::<_, _>::from_iter( + insert_schema + .iter() + .enumerate() + .map(|(i, name)| (&name.column_name, i)), + ); + let fetch_order: Vec = table_col_names + .iter() + .map(|names| { + name_to_col.get(names).copied().context(UnexpectedSnafu { + err_msg: format!("Column not found: {}", names), + }) + }) + .try_collect()?; + if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) { + debug!("Reordering columns: {:?}", fetch_order) + } + fetch_order + }; + let rows: Vec = rows_proto .into_iter() .map(repr::Row::from) + .map(|r| { + let reordered = fetch_order + .iter() + .map(|&i| r.inner[i].clone()) + .collect_vec(); + repr::Row::new(reordered) + }) .map(|r| (r, now, 1)) .collect_vec(); self.handle_write_request(region_id.into(), rows) diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index b1d01373fb..bc88d2bbe5 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -27,7 +27,7 @@ use crate::adapter::error::{Error, EvalSnafu, TableNotFoundSnafu}; use crate::adapter::{FlowId, TableName, TableSource}; use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; -use crate::repr::{DiffRow, RelationType, BROADCAST_CAP}; +use crate::repr::{DiffRow, RelationDesc, RelationType, BROADCAST_CAP}; /// A context that holds the information of the dataflow #[derive(Default, Debug)] @@ -54,7 +54,7 @@ pub struct FlownodeContext { /// store source in buffer for each source table, in case broadcast channel is full pub send_buffer: BTreeMap>, /// the schema of the table, query from metasrv or inferred from TypedPlan - pub schema: HashMap, + pub schema: HashMap, /// All the tables that have been registered in the worker pub table_repr: IdToNameMap, pub query_context: Option>, @@ -226,7 +226,7 @@ impl FlownodeContext { /// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function. /// /// Returns an error if no table has been registered with the provided names - pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationType), Error> { + pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationDesc), Error> { let id = self .table_repr .get_by_name(name) @@ -297,7 +297,7 @@ impl FlownodeContext { .get_by_name(table_name) .map(|(_, gid)| gid) .unwrap(); - self.schema.insert(gid, schema); + self.schema.insert(gid, schema.into_named(vec![])); Ok(()) } diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs index cfa41f785a..36e9cd7561 100644 --- a/src/flow/src/adapter/table_source.rs +++ b/src/flow/src/adapter/table_source.rs @@ -17,7 +17,6 @@ use common_error::ext::BoxedError; use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_meta::key::table_name::{TableNameKey, TableNameManager}; -use itertools::Itertools; use snafu::{OptionExt, ResultExt}; use table::metadata::TableId; @@ -25,7 +24,7 @@ use crate::adapter::error::{ Error, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, }; use crate::adapter::TableName; -use crate::repr::{self, ColumnType, RelationType}; +use crate::repr::{self, ColumnType, RelationDesc, RelationType}; /// mapping of table name <-> table id should be query from tableinfo manager pub struct TableSource { @@ -107,7 +106,7 @@ impl TableSource { pub async fn get_table_name_schema( &self, table_id: &TableId, - ) -> Result<(TableName, RelationType), Error> { + ) -> Result<(TableName, RelationDesc), Error> { let table_info_value = self .get_table_info_value(table_id) .await? @@ -123,14 +122,20 @@ impl TableSource { ]; let raw_schema = table_info_value.table_info.meta.schema; - let column_types = raw_schema + let (col_names, column_types): (Vec<_>, Vec<_>) = raw_schema .column_schemas + .clone() .into_iter() - .map(|col| ColumnType { - nullable: col.is_nullable(), - scalar_type: col.data_type, + .map(|col| { + ( + col.name.clone(), + ColumnType { + nullable: col.is_nullable(), + scalar_type: col.data_type, + }, + ) }) - .collect_vec(); + .unzip(); let key = table_info_value.table_info.meta.primary_key_indices; let keys = vec![repr::Key::from(key)]; @@ -138,10 +143,13 @@ impl TableSource { let time_index = raw_schema.timestamp_index; Ok(( table_name, - RelationType { - column_types, - keys, - time_index, + RelationDesc { + typ: RelationType { + column_types, + keys, + time_index, + }, + names: col_names, }, )) } diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index 59edb31616..48f4de2894 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -262,6 +262,10 @@ impl RelationType { true } + + pub fn into_named(self, names: Vec) -> RelationDesc { + RelationDesc { typ: self, names } + } } /// The type of a `Value` @@ -325,8 +329,8 @@ fn return_true() -> bool { /// Individual column names are optional. #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] pub struct RelationDesc { - typ: RelationType, - names: Vec, + pub typ: RelationType, + pub names: Vec, } impl RelationDesc { diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 9fe0b73d36..03927f5169 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -211,7 +211,7 @@ mod test { let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); tri_map.insert(Some(name.clone()), Some(1024), gid); - schemas.insert(gid, schema); + schemas.insert(gid, schema.into_named(vec![])); } { @@ -225,7 +225,7 @@ mod test { ColumnType::new(CDT::uint32_datatype(), false), ColumnType::new(CDT::datetime_datatype(), false), ]); - schemas.insert(gid, schema); + schemas.insert(gid, schema.into_named(vec![])); tri_map.insert(Some(name.clone()), Some(1025), gid); } diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index 0dedc9e535..337eba7eef 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -269,7 +269,7 @@ impl TypedPlan { id: crate::expr::Id::Global(table.0), }; let get_table = TypedPlan { - typ: table.1, + typ: table.1.typ().clone(), plan: get_table, }; diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 599dc4541a..93edf176e7 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -208,9 +208,7 @@ impl Arrangement { for ((key, val), update_ts, diff) in updates { // check if the key is expired if let Some(s) = &mut self.expire_state { - dbg!(now, &key, &s); if let Some(expired_by) = s.update_event_ts(now, &key)? { - dbg!(expired_by, &key); max_expired_by = max_expired_by.max(Some(expired_by)); continue; }