From 59afa70311ead38aa531f908ad3bcb1e93972143 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 8 Jul 2024 12:55:36 +0800 Subject: [PATCH] feat!: Set merge mode while creating table in influx handler (#4299) * feat: influxdb write auto set merge mode * chore: update logs * chore: address PR comments --- src/frontend/src/instance/grpc.rs | 12 ++ src/frontend/src/instance/influxdb.rs | 3 +- src/operator/src/insert.rs | 184 ++++++++++++----------- src/store-api/src/mito_engine_options.rs | 9 +- tests-integration/src/influxdb.rs | 31 ++++ 5 files changed, 150 insertions(+), 89 deletions(-) diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 6597e049aa..665fb211d8 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -252,6 +252,18 @@ impl Instance { .context(TableOperationSnafu) } + #[tracing::instrument(skip_all)] + pub async fn handle_influx_row_inserts( + &self, + requests: RowInsertRequests, + ctx: QueryContextRef, + ) -> Result { + self.inserter + .handle_last_non_null_inserts(requests, ctx, self.statement_executor.as_ref()) + .await + .context(TableOperationSnafu) + } + #[tracing::instrument(skip_all)] pub async fn handle_metric_row_inserts( &self, diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 7985bab9f4..c337e41746 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -42,12 +42,11 @@ impl InfluxdbLineProtocolHandler for Instance { interceptor_ref.pre_execute(&request.lines, ctx.clone())?; let requests = request.try_into()?; - let requests = interceptor_ref .post_lines_conversion(requests, ctx.clone()) .await?; - self.handle_row_inserts(requests, ctx) + self.handle_influx_row_inserts(requests, ctx) .await .map_err(BoxedError::new) .context(servers::error::ExecuteGrpcQuerySnafu) diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 37d4d4440c..07d01e0d40 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -43,6 +43,7 @@ use sql::statements::insert::Insert; use store_api::metric_engine_consts::{ LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY, }; +use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY}; use store_api::storage::{RegionId, TableId}; use table::requests::InsertRequest as TableInsertRequest; use table::table_reference::TableReference; @@ -66,10 +67,17 @@ pub struct Inserter { pub type InserterRef = Arc; +/// Hint for the table type to create automatically. +#[derive(Clone)] enum AutoCreateTableType { + /// A logical table with the physical table name. Logical(String), + /// A physical table. Physical, + /// A log table which is append-only. Log, + /// A table that merges rows by `last_non_null` strategy. + LastNonNull, } impl AutoCreateTableType { @@ -78,6 +86,7 @@ impl AutoCreateTableType { AutoCreateTableType::Logical(_) => "logical", AutoCreateTableType::Physical => "physical", AutoCreateTableType::Log => "log", + AutoCreateTableType::LastNonNull => "last_non_null", } } } @@ -108,41 +117,61 @@ impl Inserter { .await } + /// Handles row inserts request and creates a physical table on demand. pub async fn handle_row_inserts( &self, - mut requests: RowInsertRequests, + requests: RowInsertRequests, ctx: QueryContextRef, statement_executor: &StatementExecutor, ) -> Result { - // remove empty requests - requests.inserts.retain(|req| { - req.rows - .as_ref() - .map(|r| !r.rows.is_empty()) - .unwrap_or_default() - }); - validate_column_count_match(&requests)?; - - let table_name_to_ids = self - .create_or_alter_tables_on_demand( - &requests, - &ctx, - AutoCreateTableType::Physical, - statement_executor, - ) - .await?; - let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref()) - .convert(requests) - .await?; - - self.do_request(inserts, &ctx).await + self.handle_row_inserts_with_create_type( + requests, + ctx, + statement_executor, + AutoCreateTableType::Physical, + ) + .await } + /// Handles row inserts request and creates a log table on demand. pub async fn handle_log_inserts( + &self, + requests: RowInsertRequests, + ctx: QueryContextRef, + statement_executor: &StatementExecutor, + ) -> Result { + self.handle_row_inserts_with_create_type( + requests, + ctx, + statement_executor, + AutoCreateTableType::Log, + ) + .await + } + + /// Handles row inserts request and creates a table with `last_non_null` merge mode on demand. + pub async fn handle_last_non_null_inserts( + &self, + requests: RowInsertRequests, + ctx: QueryContextRef, + statement_executor: &StatementExecutor, + ) -> Result { + self.handle_row_inserts_with_create_type( + requests, + ctx, + statement_executor, + AutoCreateTableType::LastNonNull, + ) + .await + } + + /// Handles row inserts request with specified [AutoCreateTableType]. + async fn handle_row_inserts_with_create_type( &self, mut requests: RowInsertRequests, ctx: QueryContextRef, statement_executor: &StatementExecutor, + create_type: AutoCreateTableType, ) -> Result { // remove empty requests requests.inserts.retain(|req| { @@ -154,12 +183,7 @@ impl Inserter { validate_column_count_match(&requests)?; let table_name_to_ids = self - .create_or_alter_tables_on_demand( - &requests, - &ctx, - AutoCreateTableType::Log, - statement_executor, - ) + .create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor) .await?; let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref()) .convert(requests) @@ -168,7 +192,7 @@ impl Inserter { self.do_request(inserts, &ctx).await } - /// Handle row inserts request with metric engine. + /// Handles row inserts request with metric engine. pub async fn handle_metric_row_inserts( &self, mut requests: RowInsertRequests, @@ -486,21 +510,18 @@ impl Inserter { .await?; } } - AutoCreateTableType::Physical => { + AutoCreateTableType::Physical + | AutoCreateTableType::Log + | AutoCreateTableType::LastNonNull => { for req in create_tables { - let table = self.create_table(req, ctx, statement_executor).await?; - let table_info = table.table_info(); - table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); - } - for alter_expr in alter_tables.into_iter() { - statement_executor - .alter_table_inner(alter_expr, ctx.clone()) + let table = self + .create_non_logical_table( + req, + ctx, + statement_executor, + auto_create_table_type.clone(), + ) .await?; - } - } - AutoCreateTableType::Log => { - for req in create_tables { - let table = self.create_log_table(req, ctx, statement_executor).await?; let table_info = table.table_info(); table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); } @@ -612,31 +633,58 @@ impl Inserter { })) } - /// Create a table with schema from insert request. - /// - /// To create a metric engine logical table, specify the `on_physical_table` parameter. - async fn create_table( + /// Creates a non-logical table by create type. + /// # Panics + /// Panics if `create_type` is `AutoCreateTableType::Logical`. + async fn create_non_logical_table( &self, req: &RowInsertRequest, ctx: &QueryContextRef, statement_executor: &StatementExecutor, + create_type: AutoCreateTableType, + ) -> Result { + let options: &[(&str, &str)] = match create_type { + AutoCreateTableType::Logical(_) => unreachable!(), + AutoCreateTableType::Physical => &[], + // Set append_mode to true for log table. + // because log tables should keep rows with the same ts and tags. + AutoCreateTableType::Log => &[(APPEND_MODE_KEY, "true")], + AutoCreateTableType::LastNonNull => &[(MERGE_MODE_KEY, "last_non_null")], + }; + self.create_table_with_options(req, ctx, statement_executor, options) + .await + } + + /// Creates a table with options. + async fn create_table_with_options( + &self, + req: &RowInsertRequest, + ctx: &QueryContextRef, + statement_executor: &StatementExecutor, + options: &[(&str, &str)], ) -> Result { let schema = ctx.current_schema(); let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name); - + // SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`. let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?; info!("Table `{table_ref}` does not exist, try creating table"); - - // TODO(weny): multiple regions table. + for (k, v) in options { + create_table_expr + .table_options + .insert(k.to_string(), v.to_string()); + } let res = statement_executor .create_table_inner(create_table_expr, None, ctx.clone()) .await; match res { Ok(table) => { - info!("Successfully created table {}", table_ref,); + info!( + "Successfully created table {} with options: {:?}", + table_ref, options + ); Ok(table) } Err(err) => { @@ -646,40 +694,6 @@ impl Inserter { } } - async fn create_log_table( - &self, - req: &RowInsertRequest, - ctx: &QueryContextRef, - statement_executor: &StatementExecutor, - ) -> Result { - let schema = ctx.current_schema(); - let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name); - // SAFETY: `req.rows` is guaranteed to be `Some` by `handle_log_inserts`. - let request_schema = req.rows.as_ref().unwrap().schema.as_slice(); - let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?; - - info!("Table `{table_ref}` does not exist, try creating the log table"); - // Set append_mode to true for log table. - // because log tables should keep rows with the same ts and tags. - create_table_expr - .table_options - .insert("append_mode".to_string(), "true".to_string()); - let res = statement_executor - .create_table_inner(create_table_expr, None, ctx.clone()) - .await; - - match res { - Ok(table) => { - info!("Successfully created a log table {}", table_ref); - Ok(table) - } - Err(err) => { - error!(err; "Failed to create a log table {}", table_ref); - Err(err) - } - } - } - async fn create_logical_tables( &self, create_tables: Vec<&RowInsertRequest>, diff --git a/src/store-api/src/mito_engine_options.rs b/src/store-api/src/mito_engine_options.rs index 83af650b02..9252c970b3 100644 --- a/src/store-api/src/mito_engine_options.rs +++ b/src/store-api/src/mito_engine_options.rs @@ -17,6 +17,11 @@ use common_wal::options::WAL_OPTIONS_KEY; +/// Option key for append mode. +pub const APPEND_MODE_KEY: &str = "append_mode"; +/// Option key for merge mode. +pub const MERGE_MODE_KEY: &str = "merge_mode"; + /// Returns true if the `key` is a valid option key for the mito engine. pub fn is_mito_engine_option_key(key: &str) -> bool { [ @@ -34,8 +39,8 @@ pub fn is_mito_engine_option_key(key: &str) -> bool { "memtable.partition_tree.index_max_keys_per_shard", "memtable.partition_tree.data_freeze_threshold", "memtable.partition_tree.fork_dictionary_bytes", - "append_mode", - "merge_mode", + APPEND_MODE_KEY, + MERGE_MODE_KEY, ] .contains(&key) } diff --git a/tests-integration/src/influxdb.rs b/tests-integration/src/influxdb.rs index 9db64aec8f..17f29ff6c5 100644 --- a/tests-integration/src/influxdb.rs +++ b/tests-integration/src/influxdb.rs @@ -91,6 +91,37 @@ monitor1,host=host2 memory=1027 1663840496400340001"; +-------------------------------+-------+------+--------+ | 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 | | 2022-09-22T09:54:56.400340001 | host2 | | 1027.0 | ++-------------------------------+-------+------+--------+" + ); + + // Put the cpu column for host2. + let lines = r" +monitor1,host=host2 cpu=32 1663840496400340001"; + let request = InfluxdbRequest { + precision: None, + lines: lines.to_string(), + }; + instance.exec(request, QueryContext::arc()).await.unwrap(); + + let mut output = instance + .do_query( + "SELECT ts, host, cpu, memory FROM monitor1 ORDER BY ts", + QueryContext::arc(), + ) + .await; + let output = output.remove(0).unwrap(); + let OutputData::Stream(stream) = output.data else { + unreachable!() + }; + let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!( + recordbatches.pretty_print().unwrap(), + "\ ++-------------------------------+-------+------+--------+ +| ts | host | cpu | memory | ++-------------------------------+-------+------+--------+ +| 2022-09-22T09:54:56.100023100 | host1 | 66.6 | 1024.0 | +| 2022-09-22T09:54:56.400340001 | host2 | 32.0 | 1027.0 | +-------------------------------+-------+------+--------+" ); }