From e1a8215394f84702cd5461c72b74f36edf3d8c22 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 7 May 2024 21:24:41 +0800 Subject: [PATCH] tests(WIP): get demo working --- src/cmd/src/standalone.rs | 2 + src/flow/src/adapter.rs | 91 ++++++++++++++++++++++------- src/flow/src/adapter/server.rs | 4 +- src/flow/src/adapter/worker.rs | 1 + src/flow/src/expr/relation/func.rs | 8 ++- src/flow/src/transform.rs | 6 +- src/flow/src/transform/expr.rs | 3 +- src/flow/src/transform/plan.rs | 7 ++- tests-integration/src/standalone.rs | 18 +++--- 9 files changed, 101 insertions(+), 39 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 10714d88e5..03d18f1cbf 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -413,6 +413,7 @@ impl StartCommand { Default::default(), fe_plugins.clone(), table_metadata_manager.clone(), + catalog_manager.clone(), ) .with_kv_backend(kv_backend.clone()); let flownode = Arc::new(flow_builder.build().await); @@ -473,6 +474,7 @@ impl StartCommand { flownode .set_frontend_invoker(Box::new(frontend.clone())) .await; + let _handle = flownode.clone().run_background(); let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins) .build() diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index c27854d22e..9f798d3b1c 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -19,6 +19,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::sync::Arc; use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; +use catalog::kvbackend::KvBackendCatalogManager; use catalog::memory::MemoryCatalogManager; use common_base::Plugins; use common_error::ext::BoxedError; @@ -27,6 +28,7 @@ use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_meta::key::table_name::{TableNameKey, TableNameManager}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; +use common_runtime::JoinHandle; use common_telemetry::info; use greptime_proto::v1; use hydroflow::scheduled::graph::Hydroflow; @@ -87,6 +89,7 @@ pub struct FlownodeBuilder { plugins: Plugins, kv_backend: Option, table_meta: TableMetadataManagerRef, + catalog_manager: Arc, } impl FlownodeBuilder { @@ -95,12 +98,14 @@ impl FlownodeBuilder { opts: FlownodeOptions, plugins: Plugins, table_meta: TableMetadataManagerRef, + catalog_manager: Arc, ) -> Self { Self { opts, plugins, kv_backend: None, table_meta, + catalog_manager, } } @@ -116,7 +121,7 @@ impl FlownodeBuilder { pub async fn build(self) -> FlownodeManager { let query_engine_factory = QueryEngineFactory::new_with_plugins( // query engine in flownode only translate plan with resolved table source. - MemoryCatalogManager::with_default_setup(), + self.catalog_manager.clone(), None, None, None, @@ -182,11 +187,19 @@ pub struct FlownodeManager { } impl FlownodeManager { + /// run in common_runtime background runtime + pub fn run_background(self: Arc) -> JoinHandle<()> { + info!("Starting flownode manager task"); + common_runtime::spawn_bg(async move { + self.run().await; + }) + } /// Trigger dataflow running, and then send writeback request to the source sender /// /// note that this method didn't handle input mirror request, as this should be handled by grpc server pub async fn run(&self) { loop { + info!("Trigger one run of flownode"); self.run_available().await; // TODO(discord9): error handling let _ = self.send_writeback_requests().await; @@ -195,7 +208,10 @@ impl FlownodeManager { } /// set frontend invoker - pub async fn set_frontend_invoker(&self, frontend: Box) { + pub async fn set_frontend_invoker( + self: &Arc, + frontend: Box, + ) { *self.frontend_invoker.write().await = Some(frontend); } @@ -318,13 +334,24 @@ impl TableInfoSource { .map(|v| v.into_inner())) } - pub async fn get_table_schema(&self, table_id: &TableId) -> Result { + pub async fn get_table_name_schema( + &self, + table_id: &TableId, + ) -> Result<(TableName, RelationType), Error> { let table_info_value = self .get_table_info_value(table_id) .await? .with_context(|| TableNotFoundSnafu { name: format!("TableId = {:?}, Can't found table info", table_id), })?; + + let table_name = table_info_value.table_name(); + let table_name = vec![ + table_name.catalog_name, + table_name.schema_name, + table_name.table_name, + ]; + let raw_schema = table_info_value.table_info.meta.schema; let column_types = raw_schema .column_schemas @@ -339,11 +366,14 @@ impl TableInfoSource { let keys = vec![repr::Key::from(key)]; let time_index = raw_schema.timestamp_index; - Ok(RelationType { - column_types, - keys, - time_index, - }) + Ok(( + table_name, + RelationType { + column_types, + keys, + time_index, + }, + )) } } @@ -432,7 +462,11 @@ impl FlownodeManager { let schema = table_info.table_info.meta.schema.column_schemas; let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; - + info!( + "Sending {} writeback requests for table {}", + reqs.len(), + table_name.join(".") + ); for req in reqs { match req { DiffRequest::Insert(insert) => { @@ -552,13 +586,16 @@ impl FlownodeManager { for source in source_table_ids { node_ctx .assign_global_id_to_table(&self.table_info_source, None, Some(*source)) - .await; + .await?; } node_ctx .assign_global_id_to_table(&self.table_info_source, Some(sink_table_name.clone()), None) - .await; + .await?; + node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone()); + // TODO(discord9): pass the actual `QueryContext` in here + node_ctx.query_context = Some(QueryContext::with("greptime", "public")); // construct a active dataflow state with it let flow_plan = sql_to_flow_plan(node_ctx.borrow_mut(), &self.query_engine, &sql).await?; @@ -592,7 +629,7 @@ impl FlownodeManager { create_if_not_exist, ) .await?; - + info!("Successfully create flow with id={}", flow_id); Ok(Some(flow_id)) } } @@ -625,6 +662,7 @@ pub struct FlowNodeContext { pub schema: HashMap, /// All the tables that have been registered in the worker pub table_repr: TriMap, + pub query_context: Option>, } impl FlowNodeContext { @@ -764,6 +802,8 @@ impl FlowNodeContext { /// Assign a global id to a table, if already assigned, return the existing global id /// + /// require at least one of `table_name` or `table_id` to be `Some` + /// /// and will try to fetch the schema from table info manager(if table exist now) /// /// NOTE: this will not actually render the table into collection refered as GlobalId @@ -771,23 +811,32 @@ impl FlowNodeContext { pub async fn assign_global_id_to_table( &mut self, srv_map: &TableInfoSource, - table_name: Option, + mut table_name: Option, table_id: Option, - ) -> GlobalId { - if let Some((_name, gid)) = table_name + ) -> Result { + // if we can find by table name/id. not assign it + if let Some(gid) = table_name .as_ref() .and_then(|table_name| self.table_repr.get_by_name(table_name)) + .map(|(_, gid)| gid) + .or_else(|| { + table_id + .and_then(|id| self.table_repr.get_by_table_id(&id)) + .map(|(_, gid)| gid) + }) { - gid + Ok(gid) } else { let global_id = self.new_global_id(); + if let Some(table_id) = table_id { - let schema = srv_map.get_table_schema(&table_id).await; - let _ = schema.map(|schema| self.schema.insert(global_id, schema)); - } + let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?; + table_name = table_name.or(Some(known_table_name)); + self.schema.insert(global_id, schema); + } // if we don't have table id, it means database havn't assign one yet or we don't need it self.table_repr.insert(table_name, table_id, global_id); - global_id + Ok(global_id) } } @@ -798,7 +847,7 @@ impl FlowNodeContext { } /// A tri-directional map that maps table name, table id, and global id -#[derive(Default)] +#[derive(Default, Debug)] pub struct TriMap { name_to_global_id: HashMap, id_to_global_id: HashMap, diff --git a/src/flow/src/adapter/server.rs b/src/flow/src/adapter/server.rs index 3989a16d79..4fa7a5dc4c 100644 --- a/src/flow/src/adapter/server.rs +++ b/src/flow/src/adapter/server.rs @@ -182,9 +182,7 @@ impl servers::server::Server for FlowNodeServer { // TODO(discord9): better place for dataflow to run per second let manager_ref = self.flow_service.manager.clone(); - let _handle_trigger_run = common_runtime::spawn_bg(async move { - manager_ref.run().await; - }); + let _handle = manager_ref.clone().run_background(); Ok(addr) } diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 8f3b459afa..985f27bdd8 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -18,6 +18,7 @@ use std::borrow::BorrowMut; use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; +use common_telemetry::info; use hydroflow::scheduled::graph::Hydroflow; use snafu::ResultExt; use tokio::sync::{broadcast, mpsc, Mutex}; diff --git a/src/flow/src/expr/relation/func.rs b/src/flow/src/expr/relation/func.rs index bcee991d64..4506bf7a55 100644 --- a/src/flow/src/expr/relation/func.rs +++ b/src/flow/src/expr/relation/func.rs @@ -206,12 +206,16 @@ impl AggregateFunc { .fail(); } }; - let input_type = arg_type.unwrap_or_else(ConcreteDataType::null_datatype); + let input_type = if matches!(generic_fn, GenericFn::Count) { + ConcreteDataType::null_datatype() + } else { + arg_type.unwrap_or_else(ConcreteDataType::null_datatype) + }; rule.get(&(generic_fn, input_type.clone())) .cloned() .with_context(|| InvalidQuerySnafu { reason: format!( - "No specialization found for binary function {:?} with input type {:?}", + "No specialization found for aggregate function {:?} with input type {:?}", generic_fn, input_type ), }) diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 718e826415..98393b7368 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -101,11 +101,11 @@ pub async fn sql_to_flow_plan( engine: &Arc, sql: &str, ) -> Result { - let stmt = - QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).context(InvalidQueryPlanSnafu)?; + let query_ctx = ctx.query_context.clone().unwrap(); + let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx).context(InvalidQueryPlanSnafu)?; let plan = engine .planner() - .plan(stmt, QueryContext::arc()) + .plan(stmt, query_ctx) .await .context(InvalidQueryPlanSnafu)?; let LogicalPlan::DfPlan(plan) = plan; diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index c8bff7da5c..068f7159bb 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -14,6 +14,7 @@ #![warn(unused_imports)] +use common_telemetry::info; use datatypes::data_type::ConcreteDataType as CDT; use itertools::Itertools; use snafu::{OptionExt, ResultExt}; @@ -99,7 +100,7 @@ impl TypedExpr { }, ) .unzip(); - + info!("Function: {:?}", f); match arg_len { // because variadic function can also have 1 arguments, we need to check if it's a variadic function first 1 if VariadicFunc::from_str_and_types(fn_name, &arg_types).is_err() => { diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index 9422c67569..d6b2c7af3d 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -114,7 +114,12 @@ impl TypedPlan { } Some(RelType::Read(read)) => { if let Some(ReadType::NamedTable(nt)) = &read.as_ref().read_type { - let table_reference = nt.names.clone(); + let query_ctx = ctx.query_context.clone().unwrap(); + let mut table_reference = vec![ + query_ctx.current_catalog().to_string(), + query_ctx.current_schema().to_string(), + ]; + table_reference.extend(nt.names.clone()); let table = ctx.table(&table_reference)?; let get_table = Plan::Get { id: crate::expr::Id::Global(table.0), diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 4fad9e4b2e..ec4cdb74bb 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -131,14 +131,6 @@ impl GreptimeDbStandaloneBuilder { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); table_metadata_manager.init().await.unwrap(); - let flow_builder = FlownodeBuilder::new( - Default::default(), - plugins.clone(), - table_metadata_manager.clone(), - ) - .with_kv_backend(kv_backend.clone()); - let flownode = Arc::new(flow_builder.build().await); - let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default()); let catalog_manager = KvBackendCatalogManager::new( @@ -149,6 +141,15 @@ impl GreptimeDbStandaloneBuilder { ) .await; + let flow_builder = FlownodeBuilder::new( + Default::default(), + plugins.clone(), + table_metadata_manager.clone(), + catalog_manager.clone(), + ) + .with_kv_backend(kv_backend.clone()); + let flownode = Arc::new(flow_builder.build().await); + let node_manager = Arc::new(StandaloneDatanodeManager { region_server: datanode.region_server(), flow_server: flownode.clone(), @@ -209,6 +210,7 @@ impl GreptimeDbStandaloneBuilder { flownode .set_frontend_invoker(Box::new(instance.clone())) .await; + let _ = flownode.run_background(); procedure_manager.start().await.unwrap(); wal_options_allocator.start().await.unwrap();