Skip to main content

operator/
insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21    InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22    RegionRequestHeader,
23};
24use api::v1::{
25    AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26    RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31    PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
32    TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_operations_table_name,
33    trace_services_table_name,
34};
35use common_grpc_expr::util::ColumnExpr;
36use common_meta::cache::TableFlownodeSetCacheRef;
37use common_meta::node_manager::{AffectedRows, NodeManagerRef};
38use common_meta::peer::Peer;
39use common_query::Output;
40use common_query::prelude::{greptime_timestamp, greptime_value};
41use common_telemetry::tracing_context::TracingContext;
42use common_telemetry::{error, info, warn};
43use datatypes::schema::SkippingIndexOptions;
44use futures_util::future;
45use meter_macros::write_meter;
46use partition::manager::PartitionRuleManagerRef;
47use session::context::QueryContextRef;
48use snafu::ResultExt;
49use snafu::prelude::*;
50use sql::partition::partition_rule_for_hexstring;
51use sql::statements::create::Partitions;
52use sql::statements::insert::Insert;
53use store_api::metric_engine_consts::{
54    LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
55};
56use store_api::mito_engine_options::{
57    APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, TTL_KEY,
58    TWCS_TIME_WINDOW,
59};
60use store_api::storage::{RegionId, TableId};
61use table::TableRef;
62use table::metadata::TableInfo;
63use table::requests::{
64    AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL,
65    TABLE_DATA_MODEL_TRACE_V1, TRACE_TABLE_PARTITIONS_HINT_KEY, VALID_TABLE_OPTION_KEYS,
66};
67use table::table_reference::TableReference;
68
69use crate::error::{
70    CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
71    InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
72};
73use crate::expr_helper;
74use crate::region_req_factory::RegionRequestFactory;
75use crate::req_convert::common::preprocess_row_insert_requests;
76use crate::req_convert::insert::{
77    ColumnToRow, RowToRegion, StatementToRegion, TableToRegion, fill_reqs_with_impure_default,
78};
79use crate::statement::StatementExecutor;
80
81pub struct Inserter {
82    catalog_manager: CatalogManagerRef,
83    pub(crate) partition_manager: PartitionRuleManagerRef,
84    pub(crate) node_manager: NodeManagerRef,
85    pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
86}
87
88pub type InserterRef = Arc<Inserter>;
89
90/// Hint for the table type to create automatically.
91#[derive(Clone)]
92pub enum AutoCreateTableType {
93    /// A logical table with the physical table name.
94    Logical(String),
95    /// A physical table.
96    Physical,
97    /// A log table which is append-only.
98    Log,
99    /// A table that merges rows by `last_non_null` strategy.
100    LastNonNull,
101    /// Create table that build index and default partition rules on trace_id
102    Trace,
103}
104
105impl AutoCreateTableType {
106    pub fn as_str(&self) -> &'static str {
107        match self {
108            AutoCreateTableType::Logical(_) => "logical",
109            AutoCreateTableType::Physical => "physical",
110            AutoCreateTableType::Log => "log",
111            AutoCreateTableType::LastNonNull => "last_non_null",
112            AutoCreateTableType::Trace => "trace",
113        }
114    }
115}
116
117/// Split insert requests into normal and instant requests.
118///
119/// Where instant requests are requests with ttl=instant,
120/// and normal requests are requests with ttl set to other values.
121///
122/// This is used to split requests for different processing.
123#[derive(Clone)]
124pub struct InstantAndNormalInsertRequests {
125    /// Requests with normal ttl.
126    pub normal_requests: RegionInsertRequests,
127    /// Requests with ttl=instant.
128    /// Will be discarded immediately at frontend, wouldn't even insert into memtable, and only sent to flow node if needed.
129    pub instant_requests: RegionInsertRequests,
130}
131
132impl Inserter {
133    pub fn new(
134        catalog_manager: CatalogManagerRef,
135        partition_manager: PartitionRuleManagerRef,
136        node_manager: NodeManagerRef,
137        table_flownode_set_cache: TableFlownodeSetCacheRef,
138    ) -> Self {
139        Self {
140            catalog_manager,
141            partition_manager,
142            node_manager,
143            table_flownode_set_cache,
144        }
145    }
146
147    pub async fn handle_column_inserts(
148        &self,
149        requests: InsertRequests,
150        ctx: QueryContextRef,
151        statement_executor: &StatementExecutor,
152    ) -> Result<Output> {
153        let row_inserts = ColumnToRow::convert(requests)?;
154        self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
155            .await
156    }
157
158    /// Handles row inserts request and creates a physical table on demand.
159    pub async fn handle_row_inserts(
160        &self,
161        mut requests: RowInsertRequests,
162        ctx: QueryContextRef,
163        statement_executor: &StatementExecutor,
164        accommodate_existing_schema: bool,
165        is_single_value: bool,
166    ) -> Result<Output> {
167        preprocess_row_insert_requests(&mut requests.inserts)?;
168        self.handle_row_inserts_with_create_type(
169            requests,
170            ctx,
171            statement_executor,
172            AutoCreateTableType::Physical,
173            accommodate_existing_schema,
174            is_single_value,
175        )
176        .await
177    }
178
179    /// Handles row inserts request and creates a log table on demand.
180    pub async fn handle_log_inserts(
181        &self,
182        requests: RowInsertRequests,
183        ctx: QueryContextRef,
184        statement_executor: &StatementExecutor,
185    ) -> Result<Output> {
186        self.handle_row_inserts_with_create_type(
187            requests,
188            ctx,
189            statement_executor,
190            AutoCreateTableType::Log,
191            false,
192            false,
193        )
194        .await
195    }
196
197    pub async fn handle_trace_inserts(
198        &self,
199        requests: RowInsertRequests,
200        ctx: QueryContextRef,
201        statement_executor: &StatementExecutor,
202    ) -> Result<Output> {
203        self.handle_row_inserts_with_create_type(
204            requests,
205            ctx,
206            statement_executor,
207            AutoCreateTableType::Trace,
208            false,
209            false,
210        )
211        .await
212    }
213
214    /// Handles row inserts request and creates a table with `last_non_null` merge mode on demand.
215    pub async fn handle_last_non_null_inserts(
216        &self,
217        requests: RowInsertRequests,
218        ctx: QueryContextRef,
219        statement_executor: &StatementExecutor,
220        accommodate_existing_schema: bool,
221        is_single_value: bool,
222    ) -> Result<Output> {
223        self.handle_row_inserts_with_create_type(
224            requests,
225            ctx,
226            statement_executor,
227            AutoCreateTableType::LastNonNull,
228            accommodate_existing_schema,
229            is_single_value,
230        )
231        .await
232    }
233
234    /// Handles row inserts request with specified [AutoCreateTableType].
235    async fn handle_row_inserts_with_create_type(
236        &self,
237        mut requests: RowInsertRequests,
238        ctx: QueryContextRef,
239        statement_executor: &StatementExecutor,
240        create_type: AutoCreateTableType,
241        accommodate_existing_schema: bool,
242        is_single_value: bool,
243    ) -> Result<Output> {
244        // remove empty requests
245        requests.inserts.retain(|req| {
246            req.rows
247                .as_ref()
248                .map(|r| !r.rows.is_empty())
249                .unwrap_or_default()
250        });
251        validate_column_count_match(&requests)?;
252
253        let CreateAlterTableResult {
254            instant_table_ids,
255            table_infos,
256        } = self
257            .create_or_alter_tables_on_demand(
258                &mut requests,
259                &ctx,
260                create_type,
261                statement_executor,
262                accommodate_existing_schema,
263                is_single_value,
264            )
265            .await?;
266
267        let name_to_info = table_infos
268            .values()
269            .map(|info| (info.name.clone(), info.clone()))
270            .collect::<HashMap<_, _>>();
271        let inserts = RowToRegion::new(
272            name_to_info,
273            instant_table_ids,
274            self.partition_manager.as_ref(),
275        )
276        .convert(requests)
277        .await?;
278
279        self.do_request(inserts, &table_infos, &ctx).await
280    }
281
282    /// Handles row inserts request with metric engine.
283    pub async fn handle_metric_row_inserts(
284        &self,
285        mut requests: RowInsertRequests,
286        ctx: QueryContextRef,
287        statement_executor: &StatementExecutor,
288        physical_table: String,
289    ) -> Result<Output> {
290        // remove empty requests
291        requests.inserts.retain(|req| {
292            req.rows
293                .as_ref()
294                .map(|r| !r.rows.is_empty())
295                .unwrap_or_default()
296        });
297        validate_column_count_match(&requests)?;
298
299        // check and create physical table
300        self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
301            .await?;
302
303        // check and create logical tables
304        let CreateAlterTableResult {
305            instant_table_ids,
306            table_infos,
307        } = self
308            .create_or_alter_tables_on_demand(
309                &mut requests,
310                &ctx,
311                AutoCreateTableType::Logical(physical_table.clone()),
312                statement_executor,
313                true,
314                true,
315            )
316            .await?;
317        let name_to_info = table_infos
318            .values()
319            .map(|info| (info.name.clone(), info.clone()))
320            .collect::<HashMap<_, _>>();
321        let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
322            .convert(requests)
323            .await?;
324
325        self.do_request(inserts, &table_infos, &ctx).await
326    }
327
328    pub async fn handle_table_insert(
329        &self,
330        request: TableInsertRequest,
331        ctx: QueryContextRef,
332    ) -> Result<Output> {
333        let catalog = request.catalog_name.as_str();
334        let schema = request.schema_name.as_str();
335        let table_name = request.table_name.as_str();
336        let table = self.get_table(catalog, schema, table_name).await?;
337        let table = table.with_context(|| TableNotFoundSnafu {
338            table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
339        })?;
340        let table_info = table.table_info();
341
342        let inserts = TableToRegion::new(&table_info, &self.partition_manager)
343            .convert(request)
344            .await?;
345
346        let table_infos = HashMap::from_iter([(table_info.table_id(), table_info.clone())]);
347
348        self.do_request(inserts, &table_infos, &ctx).await
349    }
350
351    pub async fn handle_statement_insert(
352        &self,
353        insert: &Insert,
354        ctx: &QueryContextRef,
355        statement_executor: &StatementExecutor,
356    ) -> Result<Output> {
357        let (inserts, table_info) =
358            StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
359                .convert(insert, ctx, statement_executor)
360                .await?;
361
362        let table_infos = HashMap::from_iter([(table_info.table_id(), table_info.clone())]);
363
364        self.do_request(inserts, &table_infos, ctx).await
365    }
366}
367
368impl Inserter {
369    async fn do_request(
370        &self,
371        requests: InstantAndNormalInsertRequests,
372        table_infos: &HashMap<TableId, Arc<TableInfo>>,
373        ctx: &QueryContextRef,
374    ) -> Result<Output> {
375        // Fill impure default values in the request
376        let requests = fill_reqs_with_impure_default(table_infos, requests)?;
377
378        let write_cost = write_meter!(
379            ctx.current_catalog(),
380            ctx.current_schema(),
381            requests,
382            ctx.channel() as u8
383        );
384        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
385            tracing_context: TracingContext::from_current_span().to_w3c(),
386            dbname: ctx.get_db_string(),
387            ..Default::default()
388        });
389
390        let InstantAndNormalInsertRequests {
391            normal_requests,
392            instant_requests,
393        } = requests;
394
395        // Mirror requests for source table to flownode asynchronously
396        let flow_mirror_task = FlowMirrorTask::new(
397            &self.table_flownode_set_cache,
398            normal_requests
399                .requests
400                .iter()
401                .chain(instant_requests.requests.iter()),
402        )
403        .await?;
404        flow_mirror_task.detach(self.node_manager.clone())?;
405
406        // Write requests to datanode and wait for response
407        let write_tasks = self
408            .group_requests_by_peer(normal_requests)
409            .await?
410            .into_iter()
411            .map(|(peer, inserts)| {
412                let node_manager = self.node_manager.clone();
413                let request = request_factory.build_insert(inserts);
414                common_runtime::spawn_global(async move {
415                    node_manager
416                        .datanode(&peer)
417                        .await
418                        .handle(request)
419                        .await
420                        .context(RequestInsertsSnafu)
421                })
422            });
423        let results = future::try_join_all(write_tasks)
424            .await
425            .context(JoinTaskSnafu)?;
426        let affected_rows = results
427            .into_iter()
428            .map(|resp| resp.map(|r| r.affected_rows))
429            .sum::<Result<AffectedRows>>()?;
430        crate::metrics::DIST_INGEST_ROW_COUNT
431            .with_label_values(&[ctx.get_db_string().as_str()])
432            .inc_by(affected_rows as u64);
433        Ok(Output::new(
434            OutputData::AffectedRows(affected_rows),
435            OutputMeta::new_with_cost(write_cost as _),
436        ))
437    }
438
439    async fn group_requests_by_peer(
440        &self,
441        requests: RegionInsertRequests,
442    ) -> Result<HashMap<Peer, RegionInsertRequests>> {
443        // group by region ids first to reduce repeatedly call `find_region_leader`
444        // TODO(discord9): determine if a addition clone is worth it
445        let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
446        for req in requests.requests {
447            let region_id = RegionId::from_u64(req.region_id);
448            requests_per_region
449                .entry(region_id)
450                .or_default()
451                .requests
452                .push(req);
453        }
454
455        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
456
457        for (region_id, reqs) in requests_per_region {
458            let peer = self
459                .partition_manager
460                .find_region_leader(region_id)
461                .await
462                .context(FindRegionLeaderSnafu)?;
463            inserts
464                .entry(peer)
465                .or_default()
466                .requests
467                .extend(reqs.requests);
468        }
469
470        Ok(inserts)
471    }
472
473    /// Creates or alter tables on demand:
474    /// - if table does not exist, create table by inferred CreateExpr
475    /// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
476    ///
477    /// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
478    /// This mapping is used in the conversion of RowToRegion.
479    ///
480    /// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
481    /// It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
482    /// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
483    /// remote write. This will modify the `RowInsertRequests` in place.
484    /// `is_single_value` indicates whether the default schema only contains single value column so we can accommodate it.
485    async fn create_or_alter_tables_on_demand(
486        &self,
487        requests: &mut RowInsertRequests,
488        ctx: &QueryContextRef,
489        auto_create_table_type: AutoCreateTableType,
490        statement_executor: &StatementExecutor,
491        accommodate_existing_schema: bool,
492        is_single_value: bool,
493    ) -> Result<CreateAlterTableResult> {
494        let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
495            .with_label_values(&[auto_create_table_type.as_str()])
496            .start_timer();
497
498        let catalog = ctx.current_catalog();
499        let schema = ctx.current_schema();
500
501        let mut table_infos = HashMap::new();
502        // If `auto_create_table` hint is disabled, skip creating/altering tables.
503        let auto_create_table_hint = ctx
504            .extension(AUTO_CREATE_TABLE_KEY)
505            .map(|v| v.parse::<bool>())
506            .transpose()
507            .map_err(|_| {
508                InvalidInsertRequestSnafu {
509                    reason: "`auto_create_table` hint must be a boolean",
510                }
511                .build()
512            })?
513            .unwrap_or(true);
514        if !auto_create_table_hint {
515            let mut instant_table_ids = HashSet::new();
516            for req in &requests.inserts {
517                let table = self
518                    .get_table(catalog, &schema, &req.table_name)
519                    .await?
520                    .context(InvalidInsertRequestSnafu {
521                        reason: format!(
522                            "Table `{}` does not exist, and `auto_create_table` hint is disabled",
523                            req.table_name
524                        ),
525                    })?;
526                let table_info = table.table_info();
527                if table_info.is_ttl_instant_table() {
528                    instant_table_ids.insert(table_info.table_id());
529                }
530                table_infos.insert(table_info.table_id(), table.table_info());
531            }
532            let ret = CreateAlterTableResult {
533                instant_table_ids,
534                table_infos,
535            };
536            return Ok(ret);
537        }
538
539        let mut create_tables = vec![];
540        let mut alter_tables = vec![];
541        let mut need_refresh_table_infos = HashSet::new();
542        let mut instant_table_ids = HashSet::new();
543
544        for req in &mut requests.inserts {
545            match self.get_table(catalog, &schema, &req.table_name).await? {
546                Some(table) => {
547                    let table_info = table.table_info();
548                    if table_info.is_ttl_instant_table() {
549                        instant_table_ids.insert(table_info.table_id());
550                    }
551                    if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
552                        req,
553                        &table,
554                        ctx,
555                        accommodate_existing_schema,
556                        is_single_value,
557                    )? {
558                        alter_tables.push(alter_expr);
559                        need_refresh_table_infos.insert((
560                            catalog.to_string(),
561                            schema.clone(),
562                            req.table_name.clone(),
563                        ));
564                    } else {
565                        table_infos.insert(table_info.table_id(), table.table_info());
566                    }
567                }
568                None => {
569                    let create_expr =
570                        self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
571                    create_tables.push(create_expr);
572                }
573            }
574        }
575
576        match auto_create_table_type {
577            AutoCreateTableType::Logical(_) => {
578                if !create_tables.is_empty() {
579                    // Creates logical tables in batch.
580                    let tables = self
581                        .create_logical_tables(create_tables, ctx, statement_executor)
582                        .await?;
583
584                    for table in tables {
585                        let table_info = table.table_info();
586                        if table_info.is_ttl_instant_table() {
587                            instant_table_ids.insert(table_info.table_id());
588                        }
589                        table_infos.insert(table_info.table_id(), table.table_info());
590                    }
591                }
592                if !alter_tables.is_empty() {
593                    // Alter logical tables in batch.
594                    statement_executor
595                        .alter_logical_tables(alter_tables, ctx.clone())
596                        .await?;
597                }
598            }
599            AutoCreateTableType::Physical
600            | AutoCreateTableType::Log
601            | AutoCreateTableType::LastNonNull => {
602                // note that auto create table shouldn't be ttl instant table
603                // for it's a very unexpected behavior and should be set by user explicitly
604                for create_table in create_tables {
605                    let table = self
606                        .create_physical_table(create_table, None, ctx, statement_executor)
607                        .await?;
608                    let table_info = table.table_info();
609                    if table_info.is_ttl_instant_table() {
610                        instant_table_ids.insert(table_info.table_id());
611                    }
612                    table_infos.insert(table_info.table_id(), table.table_info());
613                }
614                for alter_expr in alter_tables.into_iter() {
615                    statement_executor
616                        .alter_table_inner(alter_expr, ctx.clone())
617                        .await?;
618                }
619            }
620
621            AutoCreateTableType::Trace => {
622                let trace_table_name = ctx
623                    .extension(TRACE_TABLE_NAME_SESSION_KEY)
624                    .unwrap_or(TRACE_TABLE_NAME);
625
626                let trace_table_partitions = if let Some(trace_table_partitions) =
627                    ctx.extension(TRACE_TABLE_PARTITIONS_HINT_KEY)
628                {
629                    let p = trace_table_partitions.parse::<u32>().map_err(|_| {
630                        InvalidInsertRequestSnafu {
631                            reason: format!(
632                                "Failed to parse trace_table_partitions: {}",
633                                trace_table_partitions
634                            ),
635                        }
636                        .build()
637                    })?;
638                    Some(p)
639                } else {
640                    None
641                };
642
643                // note that auto create table shouldn't be ttl instant table
644                // for it's a very unexpected behavior and should be set by user explicitly
645                for mut create_table in create_tables {
646                    if create_table.table_name == trace_services_table_name(trace_table_name)
647                        || create_table.table_name == trace_operations_table_name(trace_table_name)
648                    {
649                        // Disable append mode for auxiliary tables (services/operations) since they require upsert behavior.
650                        create_table
651                            .table_options
652                            .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
653                        // Remove `ttl` key from table options if it exists
654                        create_table.table_options.remove(TTL_KEY);
655
656                        let table = self
657                            .create_physical_table(create_table, None, ctx, statement_executor)
658                            .await?;
659                        let table_info = table.table_info();
660                        if table_info.is_ttl_instant_table() {
661                            instant_table_ids.insert(table_info.table_id());
662                        }
663                        table_infos.insert(table_info.table_id(), table.table_info());
664                    } else {
665                        // prebuilt partition rules for uuid data: see the function
666                        // for more information
667                        let partitions = if matches!(trace_table_partitions, Some(0) | Some(1)) {
668                            // disable partitions
669                            None
670                        } else {
671                            let p = partition_rule_for_hexstring(
672                                TRACE_ID_COLUMN,
673                                trace_table_partitions,
674                            )
675                            .context(CreatePartitionRulesSnafu)?;
676                            Some(p)
677                        };
678
679                        // add skip index to
680                        // - trace_id: when searching by trace id
681                        // - parent_span_id: when searching root span
682                        // - span_name: when searching certain types of span
683                        let index_columns =
684                            [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
685                        for index_column in index_columns {
686                            if let Some(col) = create_table
687                                .column_defs
688                                .iter_mut()
689                                .find(|c| c.name == index_column)
690                            {
691                                col.options =
692                                    options_from_skipping(&SkippingIndexOptions::default())
693                                        .context(ColumnOptionsSnafu)?;
694                            } else {
695                                warn!(
696                                    "Column {} not found when creating index for trace table: {}.",
697                                    index_column, create_table.table_name
698                                );
699                            }
700                        }
701
702                        // use table_options to mark table model version
703                        create_table.table_options.insert(
704                            TABLE_DATA_MODEL.to_string(),
705                            TABLE_DATA_MODEL_TRACE_V1.to_string(),
706                        );
707
708                        let table = self
709                            .create_physical_table(
710                                create_table,
711                                partitions,
712                                ctx,
713                                statement_executor,
714                            )
715                            .await?;
716                        let table_info = table.table_info();
717                        if table_info.is_ttl_instant_table() {
718                            instant_table_ids.insert(table_info.table_id());
719                        }
720                        table_infos.insert(table_info.table_id(), table.table_info());
721                    }
722                }
723                for alter_expr in alter_tables.into_iter() {
724                    statement_executor
725                        .alter_table_inner(alter_expr, ctx.clone())
726                        .await?;
727                }
728            }
729        }
730
731        // refresh table infos for altered tables
732        for (catalog, schema, table_name) in need_refresh_table_infos {
733            let table = self
734                .get_table(&catalog, &schema, &table_name)
735                .await?
736                .context(TableNotFoundSnafu {
737                    table_name: common_catalog::format_full_table_name(
738                        &catalog,
739                        &schema,
740                        &table_name,
741                    ),
742                })?;
743            let table_info = table.table_info();
744            table_infos.insert(table_info.table_id(), table.table_info());
745        }
746
747        Ok(CreateAlterTableResult {
748            instant_table_ids,
749            table_infos,
750        })
751    }
752
753    async fn create_physical_table_on_demand(
754        &self,
755        ctx: &QueryContextRef,
756        physical_table: String,
757        statement_executor: &StatementExecutor,
758    ) -> Result<()> {
759        let catalog_name = ctx.current_catalog();
760        let schema_name = ctx.current_schema();
761
762        // check if exist
763        if self
764            .get_table(catalog_name, &schema_name, &physical_table)
765            .await?
766            .is_some()
767        {
768            return Ok(());
769        }
770
771        let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
772        info!("Physical metric table `{table_reference}` does not exist, try creating table");
773
774        // schema with timestamp and field column
775        let default_schema = vec![
776            ColumnSchema {
777                column_name: greptime_timestamp().to_string(),
778                datatype: ColumnDataType::TimestampMillisecond as _,
779                semantic_type: SemanticType::Timestamp as _,
780                datatype_extension: None,
781                options: None,
782            },
783            ColumnSchema {
784                column_name: greptime_value().to_string(),
785                datatype: ColumnDataType::Float64 as _,
786                semantic_type: SemanticType::Field as _,
787                datatype_extension: None,
788                options: None,
789            },
790        ];
791        let create_table_expr =
792            &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
793
794        create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
795        create_table_expr
796            .table_options
797            .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
798
799        // create physical table
800        let res = statement_executor
801            .create_table_inner(create_table_expr, None, ctx.clone())
802            .await;
803
804        match res {
805            Ok(_) => {
806                info!("Successfully created table {table_reference}",);
807                Ok(())
808            }
809            Err(err) => {
810                error!(err; "Failed to create table {table_reference}");
811                Err(err)
812            }
813        }
814    }
815
816    async fn get_table(
817        &self,
818        catalog: &str,
819        schema: &str,
820        table: &str,
821    ) -> Result<Option<TableRef>> {
822        self.catalog_manager
823            .table(catalog, schema, table, None)
824            .await
825            .context(CatalogSnafu)
826    }
827
828    fn get_create_table_expr_on_demand(
829        &self,
830        req: &RowInsertRequest,
831        create_type: &AutoCreateTableType,
832        ctx: &QueryContextRef,
833    ) -> Result<CreateTableExpr> {
834        let mut table_options = std::collections::HashMap::with_capacity(4);
835        fill_table_options_for_create(&mut table_options, create_type, ctx);
836
837        let engine_name = if let AutoCreateTableType::Logical(_) = create_type {
838            // engine should be metric engine when creating logical tables.
839            METRIC_ENGINE_NAME
840        } else {
841            default_engine()
842        };
843
844        let schema = ctx.current_schema();
845        let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
846        // SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
847        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
848        let mut create_table_expr =
849            build_create_table_expr(&table_ref, request_schema, engine_name)?;
850
851        info!("Table `{table_ref}` does not exist, try creating table");
852        create_table_expr.table_options.extend(table_options);
853        Ok(create_table_expr)
854    }
855
856    /// Returns an alter table expression if it finds new columns in the request.
857    /// When `accommodate_existing_schema` is false, it always adds columns if not exist.
858    /// When `accommodate_existing_schema` is true, it may modify the input `req` to
859    /// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
860    /// for more details.
861    /// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
862    /// input `req`.
863    fn get_alter_table_expr_on_demand(
864        &self,
865        req: &mut RowInsertRequest,
866        table: &TableRef,
867        ctx: &QueryContextRef,
868        accommodate_existing_schema: bool,
869        is_single_value: bool,
870    ) -> Result<Option<AlterTableExpr>> {
871        let catalog_name = ctx.current_catalog();
872        let schema_name = ctx.current_schema();
873        let table_name = table.table_info().name.clone();
874
875        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
876        let column_exprs = ColumnExpr::from_column_schemas(request_schema);
877        let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
878        let Some(mut add_columns) = add_columns else {
879            return Ok(None);
880        };
881
882        // If accommodate_existing_schema is true, update request schema for Timestamp/Field columns
883        if accommodate_existing_schema {
884            let table_schema = table.schema();
885            // Find timestamp column name
886            let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
887            // Find field column name if there is only one and `is_single_value` is true.
888            let mut field_col_name = None;
889            if is_single_value {
890                let mut multiple_field_cols = false;
891                table.field_columns().for_each(|col| {
892                    if field_col_name.is_none() {
893                        field_col_name = Some(col.name.clone());
894                    } else {
895                        multiple_field_cols = true;
896                    }
897                });
898                if multiple_field_cols {
899                    field_col_name = None;
900                }
901            }
902
903            // Update column name in request schema for Timestamp/Field columns
904            if let Some(rows) = req.rows.as_mut() {
905                for col in &mut rows.schema {
906                    match col.semantic_type {
907                        x if x == SemanticType::Timestamp as i32 => {
908                            if let Some(ref ts_name) = ts_col_name
909                                && col.column_name != *ts_name
910                            {
911                                col.column_name = ts_name.clone();
912                            }
913                        }
914                        x if x == SemanticType::Field as i32 => {
915                            if let Some(ref field_name) = field_col_name
916                                && col.column_name != *field_name
917                            {
918                                col.column_name = field_name.clone();
919                            }
920                        }
921                        _ => {}
922                    }
923                }
924            }
925
926            // Only keep columns that are tags or non-single field.
927            add_columns.add_columns.retain(|col| {
928                let def = col.column_def.as_ref().unwrap();
929                def.semantic_type == SemanticType::Tag as i32
930                    || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
931            });
932
933            if add_columns.add_columns.is_empty() {
934                return Ok(None);
935            }
936        }
937
938        Ok(Some(AlterTableExpr {
939            catalog_name: catalog_name.to_string(),
940            schema_name: schema_name.clone(),
941            table_name: table_name.clone(),
942            kind: Some(Kind::AddColumns(add_columns)),
943        }))
944    }
945
946    /// Creates a table with options.
947    async fn create_physical_table(
948        &self,
949        mut create_table_expr: CreateTableExpr,
950        partitions: Option<Partitions>,
951        ctx: &QueryContextRef,
952        statement_executor: &StatementExecutor,
953    ) -> Result<TableRef> {
954        {
955            let table_ref = TableReference::full(
956                &create_table_expr.catalog_name,
957                &create_table_expr.schema_name,
958                &create_table_expr.table_name,
959            );
960
961            info!("Table `{table_ref}` does not exist, try creating table");
962        }
963        let res = statement_executor
964            .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
965            .await;
966
967        let table_ref = TableReference::full(
968            &create_table_expr.catalog_name,
969            &create_table_expr.schema_name,
970            &create_table_expr.table_name,
971        );
972
973        match res {
974            Ok(table) => {
975                info!(
976                    "Successfully created table {} with options: {:?}",
977                    table_ref, create_table_expr.table_options,
978                );
979                Ok(table)
980            }
981            Err(err) => {
982                error!(err; "Failed to create table {}", table_ref);
983                Err(err)
984            }
985        }
986    }
987
988    async fn create_logical_tables(
989        &self,
990        create_table_exprs: Vec<CreateTableExpr>,
991        ctx: &QueryContextRef,
992        statement_executor: &StatementExecutor,
993    ) -> Result<Vec<TableRef>> {
994        let res = statement_executor
995            .create_logical_tables(&create_table_exprs, ctx.clone())
996            .await;
997
998        match res {
999            Ok(res) => {
1000                info!("Successfully created logical tables");
1001                Ok(res)
1002            }
1003            Err(err) => {
1004                let failed_tables = create_table_exprs
1005                    .into_iter()
1006                    .map(|expr| {
1007                        format!(
1008                            "{}.{}.{}",
1009                            expr.catalog_name, expr.schema_name, expr.table_name
1010                        )
1011                    })
1012                    .collect::<Vec<_>>();
1013                error!(
1014                    err;
1015                    "Failed to create logical tables {:?}",
1016                    failed_tables
1017                );
1018                Err(err)
1019            }
1020        }
1021    }
1022
1023    pub fn node_manager(&self) -> &NodeManagerRef {
1024        &self.node_manager
1025    }
1026
1027    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
1028        &self.partition_manager
1029    }
1030}
1031
1032fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
1033    for request in &requests.inserts {
1034        let rows = request.rows.as_ref().unwrap();
1035        let column_count = rows.schema.len();
1036        rows.rows.iter().try_for_each(|r| {
1037            ensure!(
1038                r.values.len() == column_count,
1039                InvalidInsertRequestSnafu {
1040                    reason: format!(
1041                        "column count mismatch, columns: {}, values: {}",
1042                        column_count,
1043                        r.values.len()
1044                    )
1045                }
1046            );
1047            Ok(())
1048        })?;
1049    }
1050    Ok(())
1051}
1052
1053/// Fill table options for a new table by create type.
1054pub fn fill_table_options_for_create(
1055    table_options: &mut std::collections::HashMap<String, String>,
1056    create_type: &AutoCreateTableType,
1057    ctx: &QueryContextRef,
1058) {
1059    for key in VALID_TABLE_OPTION_KEYS {
1060        if let Some(value) = ctx.extension(key) {
1061            table_options.insert(key.to_string(), value.to_string());
1062        }
1063    }
1064
1065    match create_type {
1066        AutoCreateTableType::Logical(physical_table) => {
1067            table_options.insert(
1068                LOGICAL_TABLE_METADATA_KEY.to_string(),
1069                physical_table.clone(),
1070            );
1071        }
1072        AutoCreateTableType::Physical => {
1073            if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
1074                table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
1075            }
1076            if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
1077                table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
1078            }
1079            if let Some(time_window) = ctx.extension(TWCS_TIME_WINDOW) {
1080                table_options.insert(TWCS_TIME_WINDOW.to_string(), time_window.to_string());
1081                // We need to set the compaction type explicitly.
1082                table_options.insert(
1083                    COMPACTION_TYPE.to_string(),
1084                    COMPACTION_TYPE_TWCS.to_string(),
1085                );
1086            }
1087        }
1088        // Set append_mode to true for log table.
1089        // because log tables should keep rows with the same ts and tags.
1090        AutoCreateTableType::Log => {
1091            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1092        }
1093        AutoCreateTableType::LastNonNull => {
1094            table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1095        }
1096        AutoCreateTableType::Trace => {
1097            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1098        }
1099    }
1100}
1101
1102pub fn build_create_table_expr(
1103    table: &TableReference,
1104    request_schema: &[ColumnSchema],
1105    engine: &str,
1106) -> Result<CreateTableExpr> {
1107    expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1108}
1109
1110/// Result of `create_or_alter_tables_on_demand`.
1111struct CreateAlterTableResult {
1112    /// table ids of ttl=instant tables.
1113    instant_table_ids: HashSet<TableId>,
1114    /// Table Info of the created tables.
1115    table_infos: HashMap<TableId, Arc<TableInfo>>,
1116}
1117
1118struct FlowMirrorTask {
1119    requests: HashMap<Peer, RegionInsertRequests>,
1120    num_rows: usize,
1121}
1122
1123impl FlowMirrorTask {
1124    async fn new(
1125        cache: &TableFlownodeSetCacheRef,
1126        requests: impl Iterator<Item = &RegionInsertRequest>,
1127    ) -> Result<Self> {
1128        let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1129            HashMap::new();
1130        let mut num_rows = 0;
1131
1132        for req in requests {
1133            let table_id = RegionId::from_u64(req.region_id).table_id();
1134            match src_table_reqs.get_mut(&table_id) {
1135                Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1136                // already know this is not source table
1137                Some(None) => continue,
1138                _ => {
1139                    // dedup peers
1140                    let peers = cache
1141                        .get(table_id)
1142                        .await
1143                        .context(RequestInsertsSnafu)?
1144                        .unwrap_or_default()
1145                        .values()
1146                        .cloned()
1147                        .collect::<HashSet<_>>()
1148                        .into_iter()
1149                        .collect::<Vec<_>>();
1150
1151                    if !peers.is_empty() {
1152                        let mut reqs = RegionInsertRequests::default();
1153                        reqs.requests.push(req.clone());
1154                        num_rows += reqs
1155                            .requests
1156                            .iter()
1157                            .map(|r| r.rows.as_ref().unwrap().rows.len())
1158                            .sum::<usize>();
1159                        src_table_reqs.insert(table_id, Some((peers, reqs)));
1160                    } else {
1161                        // insert a empty entry to avoid repeat query
1162                        src_table_reqs.insert(table_id, None);
1163                    }
1164                }
1165            }
1166        }
1167
1168        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1169
1170        for (_table_id, (peers, reqs)) in src_table_reqs
1171            .into_iter()
1172            .filter_map(|(k, v)| v.map(|v| (k, v)))
1173        {
1174            if peers.len() == 1 {
1175                // fast path, zero copy
1176                inserts
1177                    .entry(peers[0].clone())
1178                    .or_default()
1179                    .requests
1180                    .extend(reqs.requests);
1181                continue;
1182            } else {
1183                // TODO(discord9): need to split requests to multiple flownodes
1184                for flownode in peers {
1185                    inserts
1186                        .entry(flownode.clone())
1187                        .or_default()
1188                        .requests
1189                        .extend(reqs.requests.clone());
1190                }
1191            }
1192        }
1193
1194        Ok(Self {
1195            requests: inserts,
1196            num_rows,
1197        })
1198    }
1199
1200    fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1201        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1202        for (peer, inserts) in self.requests {
1203            let node_manager = node_manager.clone();
1204            common_runtime::spawn_global(async move {
1205                let result = node_manager
1206                    .flownode(&peer)
1207                    .await
1208                    .handle_inserts(inserts)
1209                    .await
1210                    .context(RequestInsertsSnafu);
1211
1212                match result {
1213                    Ok(resp) => {
1214                        let affected_rows = resp.affected_rows;
1215                        crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1216                        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1217                    }
1218                    Err(err) => {
1219                        error!(err; "Failed to insert data into flownode {}", peer);
1220                    }
1221                }
1222            });
1223        }
1224
1225        Ok(())
1226    }
1227}
1228
1229#[cfg(test)]
1230mod tests {
1231    use std::sync::Arc;
1232
1233    use api::v1::helper::{field_column_schema, time_index_column_schema};
1234    use api::v1::{RowInsertRequest, Rows, Value};
1235    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1236    use common_meta::cache::new_table_flownode_set_cache;
1237    use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1238    use common_meta::test_util::MockDatanodeManager;
1239    use datatypes::data_type::ConcreteDataType;
1240    use datatypes::schema::ColumnSchema;
1241    use moka::future::Cache;
1242    use session::context::QueryContext;
1243    use table::TableRef;
1244    use table::dist_table::DummyDataSource;
1245    use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1246
1247    use super::*;
1248    use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1249
1250    fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1251        let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1252            ColumnSchema::new(
1253                ts_name,
1254                ConcreteDataType::timestamp_millisecond_datatype(),
1255                false,
1256            )
1257            .with_time_index(true),
1258            ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1259        ])
1260        .unwrap()
1261        .build()
1262        .unwrap();
1263        let meta = TableMetaBuilder::empty()
1264            .schema(Arc::new(schema))
1265            .primary_key_indices(vec![])
1266            .value_indices(vec![1])
1267            .engine("mito")
1268            .next_column_id(0)
1269            .options(Default::default())
1270            .created_on(Default::default())
1271            .build()
1272            .unwrap();
1273        let info = Arc::new(
1274            TableInfoBuilder::default()
1275                .table_id(1)
1276                .table_version(0)
1277                .name("test_table")
1278                .schema_name(DEFAULT_SCHEMA_NAME)
1279                .catalog_name(DEFAULT_CATALOG_NAME)
1280                .desc(None)
1281                .table_type(TableType::Base)
1282                .meta(meta)
1283                .build()
1284                .unwrap(),
1285        );
1286        Arc::new(table::Table::new(
1287            info,
1288            table::metadata::FilterPushDownType::Unsupported,
1289            Arc::new(DummyDataSource),
1290        ))
1291    }
1292
1293    #[tokio::test]
1294    async fn test_accommodate_existing_schema_logic() {
1295        let ts_name = "my_ts";
1296        let field_name = "my_field";
1297        let table = make_table_ref_with_schema(ts_name, field_name);
1298
1299        // The request uses different names for timestamp and field columns
1300        let mut req = RowInsertRequest {
1301            table_name: "test_table".to_string(),
1302            rows: Some(Rows {
1303                schema: vec![
1304                    time_index_column_schema("ts_wrong", ColumnDataType::TimestampMillisecond),
1305                    field_column_schema("field_wrong", ColumnDataType::Float64),
1306                ],
1307                rows: vec![api::v1::Row {
1308                    values: vec![Value::default(), Value::default()],
1309                }],
1310            }),
1311        };
1312        let ctx = Arc::new(QueryContext::with(
1313            DEFAULT_CATALOG_NAME,
1314            DEFAULT_SCHEMA_NAME,
1315        ));
1316
1317        let kv_backend = prepare_mocked_backend().await;
1318        let inserter = Inserter::new(
1319            catalog::memory::MemoryCatalogManager::new(),
1320            create_partition_rule_manager(kv_backend.clone()).await,
1321            Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1322            Arc::new(new_table_flownode_set_cache(
1323                String::new(),
1324                Cache::new(100),
1325                kv_backend.clone(),
1326            )),
1327        );
1328        let alter_expr = inserter
1329            .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1330            .unwrap();
1331        assert!(alter_expr.is_none());
1332
1333        // The request's schema should have updated names for timestamp and field columns
1334        let req_schema = req.rows.as_ref().unwrap().schema.clone();
1335        assert_eq!(req_schema[0].column_name, ts_name);
1336        assert_eq!(req_schema[1].column_name, field_name);
1337    }
1338}