Skip to main content

operator/
insert.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
18use api::v1::alter_table_expr::Kind;
19use api::v1::column_def::options_from_skipping;
20use api::v1::region::{
21    InsertRequest as RegionInsertRequest, InsertRequests as RegionInsertRequests,
22    RegionRequestHeader,
23};
24use api::v1::{
25    AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
26    RowInsertRequest, RowInsertRequests, SemanticType,
27};
28use catalog::CatalogManagerRef;
29use client::{OutputData, OutputMeta};
30use common_catalog::consts::{
31    PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN, TRACE_ID_COLUMN, TRACE_TABLE_NAME,
32    TRACE_TABLE_NAME_SESSION_KEY, default_engine, trace_operations_table_name,
33    trace_services_table_name,
34};
35use common_grpc_expr::util::ColumnExpr;
36use common_meta::cache::TableFlownodeSetCacheRef;
37use common_meta::node_manager::{AffectedRows, NodeManagerRef};
38use common_meta::peer::Peer;
39use common_query::Output;
40use common_query::prelude::{greptime_timestamp, greptime_value};
41use common_telemetry::tracing_context::TracingContext;
42use common_telemetry::{error, info, warn};
43use datatypes::schema::SkippingIndexOptions;
44use futures_util::future;
45use meter_macros::write_meter;
46use partition::manager::PartitionRuleManagerRef;
47use session::context::QueryContextRef;
48use snafu::ResultExt;
49use snafu::prelude::*;
50use sql::partition::partition_rule_for_hexstring;
51use sql::statements::create::Partitions;
52use sql::statements::insert::Insert;
53use store_api::metric_engine_consts::{
54    LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
55};
56use store_api::mito_engine_options::{
57    APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, TTL_KEY,
58    TWCS_TIME_WINDOW,
59};
60use store_api::storage::{RegionId, TableId};
61use table::TableRef;
62use table::metadata::TableInfo;
63use table::requests::{
64    AUTO_CREATE_TABLE_KEY, InsertRequest as TableInsertRequest, TABLE_DATA_MODEL,
65    TABLE_DATA_MODEL_TRACE_V1, TRACE_TABLE_PARTITIONS_HINT_KEY, VALID_TABLE_OPTION_KEYS,
66};
67use table::table_reference::TableReference;
68
69use crate::error::{
70    CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
71    InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
72};
73use crate::expr_helper;
74use crate::region_req_factory::RegionRequestFactory;
75use crate::req_convert::common::preprocess_row_insert_requests;
76use crate::req_convert::insert::{
77    ColumnToRow, RowToRegion, StatementToRegion, TableToRegion, fill_reqs_with_impure_default,
78};
79use crate::statement::StatementExecutor;
80
81pub struct Inserter {
82    catalog_manager: CatalogManagerRef,
83    pub(crate) partition_manager: PartitionRuleManagerRef,
84    pub(crate) node_manager: NodeManagerRef,
85    pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
86    /// Server-side upper bound for auto table creation on write.
87    /// When `false`, missing tables are never auto-created regardless of the
88    /// per-request `auto_create_table` hint. When `true`, the hint still applies.
89    auto_create_table: bool,
90}
91
92pub type InserterRef = Arc<Inserter>;
93
94/// Hint for the table type to create automatically.
95#[derive(Clone)]
96pub enum AutoCreateTableType {
97    /// A logical table with the physical table name.
98    Logical(String),
99    /// A physical table.
100    Physical,
101    /// A log table which is append-only.
102    Log,
103    /// A table that merges rows by `last_non_null` strategy.
104    LastNonNull,
105    /// Create table that build index and default partition rules on trace_id
106    Trace,
107}
108
109impl AutoCreateTableType {
110    pub fn as_str(&self) -> &'static str {
111        match self {
112            AutoCreateTableType::Logical(_) => "logical",
113            AutoCreateTableType::Physical => "physical",
114            AutoCreateTableType::Log => "log",
115            AutoCreateTableType::LastNonNull => "last_non_null",
116            AutoCreateTableType::Trace => "trace",
117        }
118    }
119}
120
121/// Split insert requests into normal and instant requests.
122///
123/// Where instant requests are requests with ttl=instant,
124/// and normal requests are requests with ttl set to other values.
125///
126/// This is used to split requests for different processing.
127#[derive(Clone)]
128pub struct InstantAndNormalInsertRequests {
129    /// Requests with normal ttl.
130    pub normal_requests: RegionInsertRequests,
131    /// Requests with ttl=instant.
132    /// Will be discarded immediately at frontend, wouldn't even insert into memtable, and only sent to flow node if needed.
133    pub instant_requests: RegionInsertRequests,
134}
135
136impl Inserter {
137    pub fn new(
138        catalog_manager: CatalogManagerRef,
139        partition_manager: PartitionRuleManagerRef,
140        node_manager: NodeManagerRef,
141        table_flownode_set_cache: TableFlownodeSetCacheRef,
142        auto_create_table: bool,
143    ) -> Self {
144        Self {
145            catalog_manager,
146            partition_manager,
147            node_manager,
148            table_flownode_set_cache,
149            auto_create_table,
150        }
151    }
152
153    pub async fn handle_column_inserts(
154        &self,
155        requests: InsertRequests,
156        ctx: QueryContextRef,
157        statement_executor: &StatementExecutor,
158    ) -> Result<Output> {
159        let row_inserts = ColumnToRow::convert(requests)?;
160        self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
161            .await
162    }
163
164    /// Handles row inserts request and creates a physical table on demand.
165    pub async fn handle_row_inserts(
166        &self,
167        mut requests: RowInsertRequests,
168        ctx: QueryContextRef,
169        statement_executor: &StatementExecutor,
170        accommodate_existing_schema: bool,
171        is_single_value: bool,
172    ) -> Result<Output> {
173        preprocess_row_insert_requests(&mut requests.inserts)?;
174        self.handle_row_inserts_with_create_type(
175            requests,
176            ctx,
177            statement_executor,
178            AutoCreateTableType::Physical,
179            accommodate_existing_schema,
180            is_single_value,
181        )
182        .await
183    }
184
185    /// Handles row inserts request and creates a log table on demand.
186    pub async fn handle_log_inserts(
187        &self,
188        requests: RowInsertRequests,
189        ctx: QueryContextRef,
190        statement_executor: &StatementExecutor,
191    ) -> Result<Output> {
192        self.handle_row_inserts_with_create_type(
193            requests,
194            ctx,
195            statement_executor,
196            AutoCreateTableType::Log,
197            false,
198            false,
199        )
200        .await
201    }
202
203    pub async fn handle_trace_inserts(
204        &self,
205        requests: RowInsertRequests,
206        ctx: QueryContextRef,
207        statement_executor: &StatementExecutor,
208    ) -> Result<Output> {
209        self.handle_row_inserts_with_create_type(
210            requests,
211            ctx,
212            statement_executor,
213            AutoCreateTableType::Trace,
214            false,
215            false,
216        )
217        .await
218    }
219
220    /// Handles row inserts request and creates a table with `last_non_null` merge mode on demand.
221    pub async fn handle_last_non_null_inserts(
222        &self,
223        requests: RowInsertRequests,
224        ctx: QueryContextRef,
225        statement_executor: &StatementExecutor,
226        accommodate_existing_schema: bool,
227        is_single_value: bool,
228    ) -> Result<Output> {
229        self.handle_row_inserts_with_create_type(
230            requests,
231            ctx,
232            statement_executor,
233            AutoCreateTableType::LastNonNull,
234            accommodate_existing_schema,
235            is_single_value,
236        )
237        .await
238    }
239
240    /// Handles row inserts request with specified [AutoCreateTableType].
241    async fn handle_row_inserts_with_create_type(
242        &self,
243        mut requests: RowInsertRequests,
244        ctx: QueryContextRef,
245        statement_executor: &StatementExecutor,
246        create_type: AutoCreateTableType,
247        accommodate_existing_schema: bool,
248        is_single_value: bool,
249    ) -> Result<Output> {
250        // remove empty requests
251        requests.inserts.retain(|req| {
252            req.rows
253                .as_ref()
254                .map(|r| !r.rows.is_empty())
255                .unwrap_or_default()
256        });
257        validate_column_count_match(&requests)?;
258
259        let CreateAlterTableResult {
260            instant_table_ids,
261            table_infos,
262        } = self
263            .create_or_alter_tables_on_demand(
264                &mut requests,
265                &ctx,
266                create_type,
267                statement_executor,
268                accommodate_existing_schema,
269                is_single_value,
270            )
271            .await?;
272
273        let name_to_info = table_infos
274            .values()
275            .map(|info| (info.name.clone(), info.clone()))
276            .collect::<HashMap<_, _>>();
277        let inserts = RowToRegion::new(
278            name_to_info,
279            instant_table_ids,
280            self.partition_manager.as_ref(),
281        )
282        .convert(requests)
283        .await?;
284
285        self.do_request(inserts, &table_infos, &ctx).await
286    }
287
288    /// Handles row inserts request with metric engine.
289    pub async fn handle_metric_row_inserts(
290        &self,
291        mut requests: RowInsertRequests,
292        ctx: QueryContextRef,
293        statement_executor: &StatementExecutor,
294        physical_table: String,
295    ) -> Result<Output> {
296        // remove empty requests
297        requests.inserts.retain(|req| {
298            req.rows
299                .as_ref()
300                .map(|r| !r.rows.is_empty())
301                .unwrap_or_default()
302        });
303        validate_column_count_match(&requests)?;
304
305        // check and create physical table
306        self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
307            .await?;
308
309        // check and create logical tables
310        let CreateAlterTableResult {
311            instant_table_ids,
312            table_infos,
313        } = self
314            .create_or_alter_tables_on_demand(
315                &mut requests,
316                &ctx,
317                AutoCreateTableType::Logical(physical_table.clone()),
318                statement_executor,
319                true,
320                true,
321            )
322            .await?;
323        let name_to_info = table_infos
324            .values()
325            .map(|info| (info.name.clone(), info.clone()))
326            .collect::<HashMap<_, _>>();
327        let inserts = RowToRegion::new(name_to_info, instant_table_ids, &self.partition_manager)
328            .convert(requests)
329            .await?;
330
331        self.do_request(inserts, &table_infos, &ctx).await
332    }
333
334    pub async fn handle_table_insert(
335        &self,
336        request: TableInsertRequest,
337        ctx: QueryContextRef,
338    ) -> Result<Output> {
339        let catalog = request.catalog_name.as_str();
340        let schema = request.schema_name.as_str();
341        let table_name = request.table_name.as_str();
342        let table = self.get_table(catalog, schema, table_name).await?;
343        let table = table.with_context(|| TableNotFoundSnafu {
344            table_name: common_catalog::format_full_table_name(catalog, schema, table_name),
345        })?;
346        let table_info = table.table_info();
347
348        let inserts = TableToRegion::new(&table_info, &self.partition_manager)
349            .convert(request)
350            .await?;
351
352        let table_infos = HashMap::from_iter([(table_info.table_id(), table_info.clone())]);
353
354        self.do_request(inserts, &table_infos, &ctx).await
355    }
356
357    pub async fn handle_statement_insert(
358        &self,
359        insert: &Insert,
360        ctx: &QueryContextRef,
361    ) -> Result<Output> {
362        let (inserts, table_info) =
363            StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
364                .convert(insert, ctx)
365                .await?;
366
367        let table_infos = HashMap::from_iter([(table_info.table_id(), table_info.clone())]);
368
369        self.do_request(inserts, &table_infos, ctx).await
370    }
371}
372
373impl Inserter {
374    async fn do_request(
375        &self,
376        requests: InstantAndNormalInsertRequests,
377        table_infos: &HashMap<TableId, Arc<TableInfo>>,
378        ctx: &QueryContextRef,
379    ) -> Result<Output> {
380        // Fill impure default values in the request
381        let requests = fill_reqs_with_impure_default(table_infos, requests)?;
382
383        let write_cost = write_meter!(
384            ctx.current_catalog(),
385            ctx.current_schema(),
386            requests,
387            ctx.channel() as u8
388        );
389        let request_factory = RegionRequestFactory::new(RegionRequestHeader {
390            tracing_context: TracingContext::from_current_span().to_w3c(),
391            dbname: ctx.get_db_string(),
392            ..Default::default()
393        });
394
395        let InstantAndNormalInsertRequests {
396            normal_requests,
397            instant_requests,
398        } = requests;
399
400        // Mirror requests for source table to flownode asynchronously
401        let flow_mirror_task = FlowMirrorTask::new(
402            &self.table_flownode_set_cache,
403            normal_requests
404                .requests
405                .iter()
406                .chain(instant_requests.requests.iter()),
407        )
408        .await?;
409        flow_mirror_task.detach(self.node_manager.clone())?;
410
411        // Write requests to datanode and wait for response
412        let write_tasks = self
413            .group_requests_by_peer(normal_requests)
414            .await?
415            .into_iter()
416            .map(|(peer, inserts)| {
417                let node_manager = self.node_manager.clone();
418                let request = request_factory.build_insert(inserts);
419                common_runtime::spawn_global(async move {
420                    node_manager
421                        .datanode(&peer)
422                        .await
423                        .handle(request)
424                        .await
425                        .context(RequestInsertsSnafu)
426                })
427            });
428        let results = future::try_join_all(write_tasks)
429            .await
430            .context(JoinTaskSnafu)?;
431        let affected_rows = results
432            .into_iter()
433            .map(|resp| resp.map(|r| r.affected_rows))
434            .sum::<Result<AffectedRows>>()?;
435        crate::metrics::DIST_INGEST_ROW_COUNT
436            .with_label_values(&[ctx.get_db_string().as_str()])
437            .inc_by(affected_rows as u64);
438        Ok(Output::new(
439            OutputData::AffectedRows(affected_rows),
440            OutputMeta::new_with_cost(write_cost as _),
441        ))
442    }
443
444    async fn group_requests_by_peer(
445        &self,
446        requests: RegionInsertRequests,
447    ) -> Result<HashMap<Peer, RegionInsertRequests>> {
448        // group by region ids first to reduce repeatedly call `find_region_leader`
449        // TODO(discord9): determine if a addition clone is worth it
450        let mut requests_per_region: HashMap<RegionId, RegionInsertRequests> = HashMap::new();
451        for req in requests.requests {
452            let region_id = RegionId::from_u64(req.region_id);
453            requests_per_region
454                .entry(region_id)
455                .or_default()
456                .requests
457                .push(req);
458        }
459
460        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
461
462        for (region_id, reqs) in requests_per_region {
463            let peer = self
464                .partition_manager
465                .find_region_leader(region_id)
466                .await
467                .context(FindRegionLeaderSnafu)?;
468            inserts
469                .entry(peer)
470                .or_default()
471                .requests
472                .extend(reqs.requests);
473        }
474
475        Ok(inserts)
476    }
477
478    /// Returns `None` if auto table creation is allowed, or `Some(reason)` if
479    /// disabled by either the global config or the request hint. The reason tells
480    /// which one, for a clearer error.
481    fn auto_create_disabled_reason(&self, ctx: &QueryContextRef) -> Result<Option<&'static str>> {
482        let auto_create_table_hint = ctx
483            .extension(AUTO_CREATE_TABLE_KEY)
484            .map(|v| v.parse::<bool>())
485            .transpose()
486            .map_err(|_| {
487                InvalidInsertRequestSnafu {
488                    reason: "`auto_create_table` hint must be a boolean",
489                }
490                .build()
491            })?
492            .unwrap_or(true);
493        Ok(if !self.auto_create_table {
494            Some("auto-create table is disabled by frontend config")
495        } else if !auto_create_table_hint {
496            Some("`auto_create_table` hint is disabled")
497        } else {
498            None
499        })
500    }
501
502    /// Creates or alter tables on demand:
503    /// - if table does not exist, create table by inferred CreateExpr
504    /// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
505    ///
506    /// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
507    /// This mapping is used in the conversion of RowToRegion.
508    ///
509    /// `accommodate_existing_schema` is used to determine if the existing schema should override the new schema.
510    /// It only works for TIME_INDEX and single VALUE columns. This is for the case where the user creates a table with
511    /// custom schema, and then inserts data with endpoints that have default schema setting, like prometheus
512    /// remote write. This will modify the `RowInsertRequests` in place.
513    /// `is_single_value` indicates whether the default schema only contains single value column so we can accommodate it.
514    async fn create_or_alter_tables_on_demand(
515        &self,
516        requests: &mut RowInsertRequests,
517        ctx: &QueryContextRef,
518        auto_create_table_type: AutoCreateTableType,
519        statement_executor: &StatementExecutor,
520        accommodate_existing_schema: bool,
521        is_single_value: bool,
522    ) -> Result<CreateAlterTableResult> {
523        let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
524            .with_label_values(&[auto_create_table_type.as_str()])
525            .start_timer();
526
527        let catalog = ctx.current_catalog();
528        let schema = ctx.current_schema();
529
530        let mut table_infos = HashMap::new();
531        if let Some(disabled_reason) = self.auto_create_disabled_reason(ctx)? {
532            let mut instant_table_ids = HashSet::new();
533            for req in &requests.inserts {
534                let table = self
535                    .get_table(catalog, &schema, &req.table_name)
536                    .await?
537                    .context(InvalidInsertRequestSnafu {
538                        reason: format!(
539                            "Table `{}` does not exist, and {}",
540                            req.table_name, disabled_reason
541                        ),
542                    })?;
543                let table_info = table.table_info();
544                if table_info.is_ttl_instant_table() {
545                    instant_table_ids.insert(table_info.table_id());
546                }
547                table_infos.insert(table_info.table_id(), table.table_info());
548            }
549            let ret = CreateAlterTableResult {
550                instant_table_ids,
551                table_infos,
552            };
553            return Ok(ret);
554        }
555
556        let mut create_tables = vec![];
557        let mut alter_tables = vec![];
558        let mut need_refresh_table_infos = HashSet::new();
559        let mut instant_table_ids = HashSet::new();
560
561        for req in &mut requests.inserts {
562            match self.get_table(catalog, &schema, &req.table_name).await? {
563                Some(table) => {
564                    let table_info = table.table_info();
565                    if table_info.is_ttl_instant_table() {
566                        instant_table_ids.insert(table_info.table_id());
567                    }
568                    if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
569                        req,
570                        &table,
571                        ctx,
572                        accommodate_existing_schema,
573                        is_single_value,
574                    )? {
575                        alter_tables.push(alter_expr);
576                        need_refresh_table_infos.insert((
577                            catalog.to_string(),
578                            schema.clone(),
579                            req.table_name.clone(),
580                        ));
581                    } else {
582                        table_infos.insert(table_info.table_id(), table.table_info());
583                    }
584                }
585                None => {
586                    let create_expr =
587                        self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
588                    create_tables.push(create_expr);
589                }
590            }
591        }
592
593        match auto_create_table_type {
594            AutoCreateTableType::Logical(_) => {
595                if !create_tables.is_empty() {
596                    // Creates logical tables in batch.
597                    let tables = self
598                        .create_logical_tables(create_tables, ctx, statement_executor)
599                        .await?;
600
601                    for table in tables {
602                        let table_info = table.table_info();
603                        if table_info.is_ttl_instant_table() {
604                            instant_table_ids.insert(table_info.table_id());
605                        }
606                        table_infos.insert(table_info.table_id(), table.table_info());
607                    }
608                }
609                if !alter_tables.is_empty() {
610                    // Alter logical tables in batch.
611                    statement_executor
612                        .alter_logical_tables(alter_tables, ctx.clone())
613                        .await?;
614                }
615            }
616            AutoCreateTableType::Physical
617            | AutoCreateTableType::Log
618            | AutoCreateTableType::LastNonNull => {
619                // note that auto create table shouldn't be ttl instant table
620                // for it's a very unexpected behavior and should be set by user explicitly
621                for create_table in create_tables {
622                    let table = self
623                        .create_physical_table(create_table, None, ctx, statement_executor)
624                        .await?;
625                    let table_info = table.table_info();
626                    if table_info.is_ttl_instant_table() {
627                        instant_table_ids.insert(table_info.table_id());
628                    }
629                    table_infos.insert(table_info.table_id(), table.table_info());
630                }
631                for alter_expr in alter_tables.into_iter() {
632                    statement_executor
633                        .alter_table_inner(alter_expr, ctx.clone())
634                        .await?;
635                }
636            }
637
638            AutoCreateTableType::Trace => {
639                let trace_table_name = ctx
640                    .extension(TRACE_TABLE_NAME_SESSION_KEY)
641                    .unwrap_or(TRACE_TABLE_NAME);
642
643                let trace_table_partitions = if let Some(trace_table_partitions) =
644                    ctx.extension(TRACE_TABLE_PARTITIONS_HINT_KEY)
645                {
646                    let p = trace_table_partitions.parse::<u32>().map_err(|_| {
647                        InvalidInsertRequestSnafu {
648                            reason: format!(
649                                "Failed to parse trace_table_partitions: {}",
650                                trace_table_partitions
651                            ),
652                        }
653                        .build()
654                    })?;
655                    Some(p)
656                } else {
657                    None
658                };
659
660                // note that auto create table shouldn't be ttl instant table
661                // for it's a very unexpected behavior and should be set by user explicitly
662                for mut create_table in create_tables {
663                    if create_table.table_name == trace_services_table_name(trace_table_name)
664                        || create_table.table_name == trace_operations_table_name(trace_table_name)
665                    {
666                        // Disable append mode for auxiliary tables (services/operations) since they require upsert behavior.
667                        create_table
668                            .table_options
669                            .insert(APPEND_MODE_KEY.to_string(), "false".to_string());
670                        // Remove `ttl` key from table options if it exists
671                        create_table.table_options.remove(TTL_KEY);
672
673                        let table = self
674                            .create_physical_table(create_table, None, ctx, statement_executor)
675                            .await?;
676                        let table_info = table.table_info();
677                        if table_info.is_ttl_instant_table() {
678                            instant_table_ids.insert(table_info.table_id());
679                        }
680                        table_infos.insert(table_info.table_id(), table.table_info());
681                    } else {
682                        // prebuilt partition rules for uuid data: see the function
683                        // for more information
684                        let partitions = if matches!(trace_table_partitions, Some(0) | Some(1)) {
685                            // disable partitions
686                            None
687                        } else {
688                            let p = partition_rule_for_hexstring(
689                                TRACE_ID_COLUMN,
690                                trace_table_partitions,
691                            )
692                            .context(CreatePartitionRulesSnafu)?;
693                            Some(p)
694                        };
695
696                        // add skip index to
697                        // - trace_id: when searching by trace id
698                        // - parent_span_id: when searching root span
699                        // - span_name: when searching certain types of span
700                        let index_columns =
701                            [TRACE_ID_COLUMN, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN];
702                        for index_column in index_columns {
703                            if let Some(col) = create_table
704                                .column_defs
705                                .iter_mut()
706                                .find(|c| c.name == index_column)
707                            {
708                                col.options =
709                                    options_from_skipping(&SkippingIndexOptions::default())
710                                        .context(ColumnOptionsSnafu)?;
711                            } else {
712                                warn!(
713                                    "Column {} not found when creating index for trace table: {}.",
714                                    index_column, create_table.table_name
715                                );
716                            }
717                        }
718
719                        // use table_options to mark table model version
720                        create_table.table_options.insert(
721                            TABLE_DATA_MODEL.to_string(),
722                            TABLE_DATA_MODEL_TRACE_V1.to_string(),
723                        );
724
725                        let table = self
726                            .create_physical_table(
727                                create_table,
728                                partitions,
729                                ctx,
730                                statement_executor,
731                            )
732                            .await?;
733                        let table_info = table.table_info();
734                        if table_info.is_ttl_instant_table() {
735                            instant_table_ids.insert(table_info.table_id());
736                        }
737                        table_infos.insert(table_info.table_id(), table.table_info());
738                    }
739                }
740                for alter_expr in alter_tables.into_iter() {
741                    statement_executor
742                        .alter_table_inner(alter_expr, ctx.clone())
743                        .await?;
744                }
745            }
746        }
747
748        // refresh table infos for altered tables
749        for (catalog, schema, table_name) in need_refresh_table_infos {
750            let table = self
751                .get_table(&catalog, &schema, &table_name)
752                .await?
753                .context(TableNotFoundSnafu {
754                    table_name: common_catalog::format_full_table_name(
755                        &catalog,
756                        &schema,
757                        &table_name,
758                    ),
759                })?;
760            let table_info = table.table_info();
761            table_infos.insert(table_info.table_id(), table.table_info());
762        }
763
764        Ok(CreateAlterTableResult {
765            instant_table_ids,
766            table_infos,
767        })
768    }
769
770    async fn create_physical_table_on_demand(
771        &self,
772        ctx: &QueryContextRef,
773        physical_table: String,
774        statement_executor: &StatementExecutor,
775    ) -> Result<()> {
776        let catalog_name = ctx.current_catalog();
777        let schema_name = ctx.current_schema();
778
779        // check if exist
780        if self
781            .get_table(catalog_name, &schema_name, &physical_table)
782            .await?
783            .is_some()
784        {
785            return Ok(());
786        }
787
788        // Gate here too, otherwise a disabled switch would still leak the physical table.
789        if let Some(disabled_reason) = self.auto_create_disabled_reason(ctx)? {
790            return InvalidInsertRequestSnafu {
791                reason: format!(
792                    "Physical table `{physical_table}` does not exist, and {disabled_reason}"
793                ),
794            }
795            .fail();
796        }
797
798        let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
799        info!("Physical metric table `{table_reference}` does not exist, try creating table");
800
801        // schema with timestamp and field column
802        let default_schema = vec![
803            ColumnSchema {
804                column_name: greptime_timestamp().to_string(),
805                datatype: ColumnDataType::TimestampMillisecond as _,
806                semantic_type: SemanticType::Timestamp as _,
807                datatype_extension: None,
808                options: None,
809            },
810            ColumnSchema {
811                column_name: greptime_value().to_string(),
812                datatype: ColumnDataType::Float64 as _,
813                semantic_type: SemanticType::Field as _,
814                datatype_extension: None,
815                options: None,
816            },
817        ];
818        let create_table_expr =
819            &mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
820
821        create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
822        create_table_expr
823            .table_options
824            .insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
825
826        // create physical table
827        let res = statement_executor
828            .create_table_inner(create_table_expr, None, ctx.clone())
829            .await;
830
831        match res {
832            Ok(_) => {
833                info!("Successfully created table {table_reference}",);
834                Ok(())
835            }
836            Err(err) => {
837                error!(err; "Failed to create table {table_reference}");
838                Err(err)
839            }
840        }
841    }
842
843    async fn get_table(
844        &self,
845        catalog: &str,
846        schema: &str,
847        table: &str,
848    ) -> Result<Option<TableRef>> {
849        self.catalog_manager
850            .table(catalog, schema, table, None)
851            .await
852            .context(CatalogSnafu)
853    }
854
855    fn get_create_table_expr_on_demand(
856        &self,
857        req: &RowInsertRequest,
858        create_type: &AutoCreateTableType,
859        ctx: &QueryContextRef,
860    ) -> Result<CreateTableExpr> {
861        let mut table_options = std::collections::HashMap::with_capacity(4);
862        fill_table_options_for_create(&mut table_options, create_type, ctx);
863
864        let engine_name = if let AutoCreateTableType::Logical(_) = create_type {
865            // engine should be metric engine when creating logical tables.
866            METRIC_ENGINE_NAME
867        } else {
868            default_engine()
869        };
870
871        let schema = ctx.current_schema();
872        let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
873        // SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
874        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
875        let mut create_table_expr =
876            build_create_table_expr(&table_ref, request_schema, engine_name)?;
877
878        info!("Table `{table_ref}` does not exist, try creating table");
879        create_table_expr.table_options.extend(table_options);
880        Ok(create_table_expr)
881    }
882
883    /// Returns an alter table expression if it finds new columns in the request.
884    /// When `accommodate_existing_schema` is false, it always adds columns if not exist.
885    /// When `accommodate_existing_schema` is true, it may modify the input `req` to
886    /// accommodate it with existing schema. See [`create_or_alter_tables_on_demand`](Self::create_or_alter_tables_on_demand)
887    /// for more details.
888    /// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
889    /// input `req`.
890    fn get_alter_table_expr_on_demand(
891        &self,
892        req: &mut RowInsertRequest,
893        table: &TableRef,
894        ctx: &QueryContextRef,
895        accommodate_existing_schema: bool,
896        is_single_value: bool,
897    ) -> Result<Option<AlterTableExpr>> {
898        let catalog_name = ctx.current_catalog();
899        let schema_name = ctx.current_schema();
900        let table_name = table.table_info().name.clone();
901
902        let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
903        let column_exprs = ColumnExpr::from_column_schemas(request_schema);
904        let add_columns = expr_helper::extract_add_columns_expr(&table.schema(), column_exprs)?;
905        let Some(mut add_columns) = add_columns else {
906            return Ok(None);
907        };
908
909        // If accommodate_existing_schema is true, update request schema for Timestamp/Field columns
910        if accommodate_existing_schema {
911            let table_schema = table.schema();
912            // Find timestamp column name
913            let ts_col_name = table_schema.timestamp_column().map(|c| c.name.clone());
914            // Find field column name if there is only one and `is_single_value` is true.
915            let mut field_col_name = None;
916            if is_single_value {
917                let mut multiple_field_cols = false;
918                table.field_columns().for_each(|col| {
919                    if field_col_name.is_none() {
920                        field_col_name = Some(col.name.clone());
921                    } else {
922                        multiple_field_cols = true;
923                    }
924                });
925                if multiple_field_cols {
926                    field_col_name = None;
927                }
928            }
929
930            // Update column name in request schema for Timestamp/Field columns
931            if let Some(rows) = req.rows.as_mut() {
932                for col in &mut rows.schema {
933                    match col.semantic_type {
934                        x if x == SemanticType::Timestamp as i32 => {
935                            if let Some(ref ts_name) = ts_col_name
936                                && col.column_name != *ts_name
937                            {
938                                col.column_name = ts_name.clone();
939                            }
940                        }
941                        x if x == SemanticType::Field as i32 => {
942                            if let Some(ref field_name) = field_col_name
943                                && col.column_name != *field_name
944                            {
945                                col.column_name = field_name.clone();
946                            }
947                        }
948                        _ => {}
949                    }
950                }
951            }
952
953            // Only keep columns that are tags or non-single field.
954            add_columns.add_columns.retain(|col| {
955                let def = col.column_def.as_ref().unwrap();
956                def.semantic_type == SemanticType::Tag as i32
957                    || (def.semantic_type == SemanticType::Field as i32 && field_col_name.is_none())
958            });
959
960            if add_columns.add_columns.is_empty() {
961                return Ok(None);
962            }
963        }
964
965        Ok(Some(AlterTableExpr {
966            catalog_name: catalog_name.to_string(),
967            schema_name: schema_name.clone(),
968            table_name: table_name.clone(),
969            kind: Some(Kind::AddColumns(add_columns)),
970        }))
971    }
972
973    /// Creates a table with options.
974    async fn create_physical_table(
975        &self,
976        mut create_table_expr: CreateTableExpr,
977        partitions: Option<Partitions>,
978        ctx: &QueryContextRef,
979        statement_executor: &StatementExecutor,
980    ) -> Result<TableRef> {
981        {
982            let table_ref = TableReference::full(
983                &create_table_expr.catalog_name,
984                &create_table_expr.schema_name,
985                &create_table_expr.table_name,
986            );
987
988            info!("Table `{table_ref}` does not exist, try creating table");
989        }
990        let res = statement_executor
991            .create_table_inner(&mut create_table_expr, partitions, ctx.clone())
992            .await;
993
994        let table_ref = TableReference::full(
995            &create_table_expr.catalog_name,
996            &create_table_expr.schema_name,
997            &create_table_expr.table_name,
998        );
999
1000        match res {
1001            Ok(table) => {
1002                info!(
1003                    "Successfully created table {} with options: {:?}",
1004                    table_ref, create_table_expr.table_options,
1005                );
1006                Ok(table)
1007            }
1008            Err(err) => {
1009                error!(err; "Failed to create table {}", table_ref);
1010                Err(err)
1011            }
1012        }
1013    }
1014
1015    async fn create_logical_tables(
1016        &self,
1017        create_table_exprs: Vec<CreateTableExpr>,
1018        ctx: &QueryContextRef,
1019        statement_executor: &StatementExecutor,
1020    ) -> Result<Vec<TableRef>> {
1021        let res = statement_executor
1022            .create_logical_tables(&create_table_exprs, ctx.clone())
1023            .await;
1024
1025        match res {
1026            Ok(res) => {
1027                info!("Successfully created logical tables");
1028                Ok(res)
1029            }
1030            Err(err) => {
1031                let failed_tables = create_table_exprs
1032                    .into_iter()
1033                    .map(|expr| {
1034                        format!(
1035                            "{}.{}.{}",
1036                            expr.catalog_name, expr.schema_name, expr.table_name
1037                        )
1038                    })
1039                    .collect::<Vec<_>>();
1040                error!(
1041                    err;
1042                    "Failed to create logical tables {:?}",
1043                    failed_tables
1044                );
1045                Err(err)
1046            }
1047        }
1048    }
1049
1050    pub fn node_manager(&self) -> &NodeManagerRef {
1051        &self.node_manager
1052    }
1053
1054    pub fn partition_manager(&self) -> &PartitionRuleManagerRef {
1055        &self.partition_manager
1056    }
1057}
1058
1059fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
1060    for request in &requests.inserts {
1061        let rows = request.rows.as_ref().unwrap();
1062        let column_count = rows.schema.len();
1063        rows.rows.iter().try_for_each(|r| {
1064            ensure!(
1065                r.values.len() == column_count,
1066                InvalidInsertRequestSnafu {
1067                    reason: format!(
1068                        "column count mismatch, columns: {}, values: {}",
1069                        column_count,
1070                        r.values.len()
1071                    )
1072                }
1073            );
1074            Ok(())
1075        })?;
1076    }
1077    Ok(())
1078}
1079
1080/// Fill table options for a new table by create type.
1081pub fn fill_table_options_for_create(
1082    table_options: &mut std::collections::HashMap<String, String>,
1083    create_type: &AutoCreateTableType,
1084    ctx: &QueryContextRef,
1085) {
1086    for key in VALID_TABLE_OPTION_KEYS {
1087        if let Some(value) = ctx.extension(key) {
1088            table_options.insert(key.to_string(), value.to_string());
1089        }
1090    }
1091
1092    match create_type {
1093        AutoCreateTableType::Logical(physical_table) => {
1094            table_options.insert(
1095                LOGICAL_TABLE_METADATA_KEY.to_string(),
1096                physical_table.clone(),
1097            );
1098        }
1099        AutoCreateTableType::Physical => {
1100            if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
1101                table_options.insert(APPEND_MODE_KEY.to_string(), append_mode.to_string());
1102            }
1103            if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
1104                table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
1105            }
1106            if let Some(time_window) = ctx.extension(TWCS_TIME_WINDOW) {
1107                table_options.insert(TWCS_TIME_WINDOW.to_string(), time_window.to_string());
1108                // We need to set the compaction type explicitly.
1109                table_options.insert(
1110                    COMPACTION_TYPE.to_string(),
1111                    COMPACTION_TYPE_TWCS.to_string(),
1112                );
1113            }
1114        }
1115        // Set append_mode to true for log table.
1116        // because log tables should keep rows with the same ts and tags.
1117        AutoCreateTableType::Log => {
1118            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1119        }
1120        AutoCreateTableType::LastNonNull => {
1121            if ctx
1122                .extension(APPEND_MODE_KEY)
1123                .is_some_and(|value| value.eq_ignore_ascii_case("true"))
1124            {
1125                table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1126                table_options.insert(MERGE_MODE_KEY.to_string(), "last_row".to_string());
1127            } else if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
1128                table_options.insert(MERGE_MODE_KEY.to_string(), merge_mode.to_string());
1129            } else {
1130                table_options.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
1131            }
1132        }
1133        AutoCreateTableType::Trace => {
1134            table_options.insert(APPEND_MODE_KEY.to_string(), "true".to_string());
1135        }
1136    }
1137}
1138
1139pub fn build_create_table_expr(
1140    table: &TableReference,
1141    request_schema: &[ColumnSchema],
1142    engine: &str,
1143) -> Result<CreateTableExpr> {
1144    expr_helper::create_table_expr_by_column_schemas(table, request_schema, engine, None)
1145}
1146
1147/// Result of `create_or_alter_tables_on_demand`.
1148struct CreateAlterTableResult {
1149    /// table ids of ttl=instant tables.
1150    instant_table_ids: HashSet<TableId>,
1151    /// Table Info of the created tables.
1152    table_infos: HashMap<TableId, Arc<TableInfo>>,
1153}
1154
1155struct FlowMirrorTask {
1156    requests: HashMap<Peer, RegionInsertRequests>,
1157    num_rows: usize,
1158}
1159
1160impl FlowMirrorTask {
1161    async fn new(
1162        cache: &TableFlownodeSetCacheRef,
1163        requests: impl Iterator<Item = &RegionInsertRequest>,
1164    ) -> Result<Self> {
1165        let mut src_table_reqs: HashMap<TableId, Option<(Vec<Peer>, RegionInsertRequests)>> =
1166            HashMap::new();
1167        let mut num_rows = 0;
1168
1169        for req in requests {
1170            let table_id = RegionId::from_u64(req.region_id).table_id();
1171            match src_table_reqs.get_mut(&table_id) {
1172                Some(Some((_peers, reqs))) => reqs.requests.push(req.clone()),
1173                // already know this is not source table
1174                Some(None) => continue,
1175                _ => {
1176                    // dedup peers
1177                    let peers = cache
1178                        .get(table_id)
1179                        .await
1180                        .context(RequestInsertsSnafu)?
1181                        .unwrap_or_default()
1182                        .values()
1183                        .cloned()
1184                        .collect::<HashSet<_>>()
1185                        .into_iter()
1186                        .collect::<Vec<_>>();
1187
1188                    if !peers.is_empty() {
1189                        let mut reqs = RegionInsertRequests::default();
1190                        reqs.requests.push(req.clone());
1191                        num_rows += reqs
1192                            .requests
1193                            .iter()
1194                            .map(|r| r.rows.as_ref().unwrap().rows.len())
1195                            .sum::<usize>();
1196                        src_table_reqs.insert(table_id, Some((peers, reqs)));
1197                    } else {
1198                        // insert a empty entry to avoid repeat query
1199                        src_table_reqs.insert(table_id, None);
1200                    }
1201                }
1202            }
1203        }
1204
1205        let mut inserts: HashMap<Peer, RegionInsertRequests> = HashMap::new();
1206
1207        for (_table_id, (peers, reqs)) in src_table_reqs
1208            .into_iter()
1209            .filter_map(|(k, v)| v.map(|v| (k, v)))
1210        {
1211            if peers.len() == 1 {
1212                // fast path, zero copy
1213                inserts
1214                    .entry(peers[0].clone())
1215                    .or_default()
1216                    .requests
1217                    .extend(reqs.requests);
1218                continue;
1219            } else {
1220                // TODO(discord9): need to split requests to multiple flownodes
1221                for flownode in peers {
1222                    inserts
1223                        .entry(flownode.clone())
1224                        .or_default()
1225                        .requests
1226                        .extend(reqs.requests.clone());
1227                }
1228            }
1229        }
1230
1231        Ok(Self {
1232            requests: inserts,
1233            num_rows,
1234        })
1235    }
1236
1237    fn detach(self, node_manager: NodeManagerRef) -> Result<()> {
1238        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.add(self.num_rows as i64);
1239        for (peer, inserts) in self.requests {
1240            let node_manager = node_manager.clone();
1241            common_runtime::spawn_global(async move {
1242                let result = node_manager
1243                    .flownode(&peer)
1244                    .await
1245                    .handle_inserts(inserts)
1246                    .await
1247                    .context(RequestInsertsSnafu);
1248
1249                match result {
1250                    Ok(resp) => {
1251                        let affected_rows = resp.affected_rows;
1252                        crate::metrics::DIST_MIRROR_ROW_COUNT.inc_by(affected_rows);
1253                        crate::metrics::DIST_MIRROR_PENDING_ROW_COUNT.sub(affected_rows as _);
1254                    }
1255                    Err(err) => {
1256                        error!(err; "Failed to insert data into flownode {}", peer);
1257                    }
1258                }
1259            });
1260        }
1261
1262        Ok(())
1263    }
1264}
1265
1266#[cfg(test)]
1267mod tests {
1268    use std::sync::Arc;
1269
1270    use api::v1::helper::{field_column_schema, time_index_column_schema};
1271    use api::v1::{RowInsertRequest, Rows, Value};
1272    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
1273    use common_meta::cache::new_table_flownode_set_cache;
1274    use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
1275    use common_meta::test_util::MockDatanodeManager;
1276    use datatypes::data_type::ConcreteDataType;
1277    use datatypes::schema::ColumnSchema;
1278    use moka::future::Cache;
1279    use session::context::QueryContext;
1280    use table::TableRef;
1281    use table::dist_table::DummyDataSource;
1282    use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
1283
1284    use super::*;
1285    use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
1286
1287    fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
1288        let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
1289            ColumnSchema::new(
1290                ts_name,
1291                ConcreteDataType::timestamp_millisecond_datatype(),
1292                false,
1293            )
1294            .with_time_index(true),
1295            ColumnSchema::new(field_name, ConcreteDataType::float64_datatype(), true),
1296        ])
1297        .unwrap()
1298        .build()
1299        .unwrap();
1300        let meta = TableMetaBuilder::empty()
1301            .schema(Arc::new(schema))
1302            .primary_key_indices(vec![])
1303            .value_indices(vec![1])
1304            .engine("mito")
1305            .next_column_id(0)
1306            .options(Default::default())
1307            .created_on(Default::default())
1308            .build()
1309            .unwrap();
1310        let info = Arc::new(
1311            TableInfoBuilder::default()
1312                .table_id(1)
1313                .table_version(0)
1314                .name("test_table")
1315                .schema_name(DEFAULT_SCHEMA_NAME)
1316                .catalog_name(DEFAULT_CATALOG_NAME)
1317                .desc(None)
1318                .table_type(TableType::Base)
1319                .meta(meta)
1320                .build()
1321                .unwrap(),
1322        );
1323        Arc::new(table::Table::new(
1324            info,
1325            table::metadata::FilterPushDownType::Unsupported,
1326            Arc::new(DummyDataSource),
1327        ))
1328    }
1329
1330    #[tokio::test]
1331    async fn test_accommodate_existing_schema_logic() {
1332        let ts_name = "my_ts";
1333        let field_name = "my_field";
1334        let table = make_table_ref_with_schema(ts_name, field_name);
1335
1336        // The request uses different names for timestamp and field columns
1337        let mut req = RowInsertRequest {
1338            table_name: "test_table".to_string(),
1339            rows: Some(Rows {
1340                schema: vec![
1341                    time_index_column_schema("ts_wrong", ColumnDataType::TimestampMillisecond),
1342                    field_column_schema("field_wrong", ColumnDataType::Float64),
1343                ],
1344                rows: vec![api::v1::Row {
1345                    values: vec![Value::default(), Value::default()],
1346                }],
1347            }),
1348        };
1349        let ctx = Arc::new(QueryContext::with(
1350            DEFAULT_CATALOG_NAME,
1351            DEFAULT_SCHEMA_NAME,
1352        ));
1353
1354        let kv_backend = prepare_mocked_backend().await;
1355        let inserter = Inserter::new(
1356            catalog::memory::MemoryCatalogManager::new(),
1357            create_partition_rule_manager(kv_backend.clone()).await,
1358            Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
1359            Arc::new(new_table_flownode_set_cache(
1360                String::new(),
1361                Cache::new(100),
1362                kv_backend.clone(),
1363            )),
1364            true,
1365        );
1366        let alter_expr = inserter
1367            .get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
1368            .unwrap();
1369        assert!(alter_expr.is_none());
1370
1371        // The request's schema should have updated names for timestamp and field columns
1372        let req_schema = req.rows.as_ref().unwrap().schema.clone();
1373        assert_eq!(req_schema[0].column_name, ts_name);
1374        assert_eq!(req_schema[1].column_name, field_name);
1375    }
1376
1377    #[test]
1378    fn test_last_non_null_create_options_preserve_default_without_append_mode() {
1379        let ctx = Arc::new(QueryContext::with(
1380            DEFAULT_CATALOG_NAME,
1381            DEFAULT_SCHEMA_NAME,
1382        ));
1383        let mut table_options = Default::default();
1384
1385        fill_table_options_for_create(&mut table_options, &AutoCreateTableType::LastNonNull, &ctx);
1386
1387        assert_eq!(
1388            Some("last_non_null"),
1389            table_options.get(MERGE_MODE_KEY).map(String::as_str)
1390        );
1391        assert!(!table_options.contains_key(APPEND_MODE_KEY));
1392    }
1393
1394    #[test]
1395    fn test_last_non_null_create_options_preserve_default_with_append_mode_false() {
1396        let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
1397        ctx.set_extension(APPEND_MODE_KEY, "false");
1398        let ctx = Arc::new(ctx);
1399        let mut table_options = Default::default();
1400
1401        fill_table_options_for_create(&mut table_options, &AutoCreateTableType::LastNonNull, &ctx);
1402
1403        assert!(!table_options.contains_key(APPEND_MODE_KEY));
1404        assert_eq!(
1405            Some("last_non_null"),
1406            table_options.get(MERGE_MODE_KEY).map(String::as_str)
1407        );
1408    }
1409
1410    #[test]
1411    fn test_last_non_null_create_options_use_configured_merge_mode() {
1412        let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
1413        ctx.set_extension(MERGE_MODE_KEY, "last_row");
1414        let ctx = Arc::new(ctx);
1415        let mut table_options = Default::default();
1416
1417        fill_table_options_for_create(&mut table_options, &AutoCreateTableType::LastNonNull, &ctx);
1418
1419        assert_eq!(
1420            Some("last_row"),
1421            table_options.get(MERGE_MODE_KEY).map(String::as_str)
1422        );
1423        assert!(!table_options.contains_key(APPEND_MODE_KEY));
1424    }
1425
1426    #[test]
1427    fn test_last_non_null_create_options_use_last_row_with_append_mode_true() {
1428        let mut ctx = QueryContext::with(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
1429        ctx.set_extension(APPEND_MODE_KEY, "true");
1430        let ctx = Arc::new(ctx);
1431        let mut table_options = Default::default();
1432
1433        fill_table_options_for_create(&mut table_options, &AutoCreateTableType::LastNonNull, &ctx);
1434
1435        assert_eq!(
1436            Some("true"),
1437            table_options.get(APPEND_MODE_KEY).map(String::as_str)
1438        );
1439        assert_eq!(
1440            Some("last_row"),
1441            table_options.get(MERGE_MODE_KEY).map(String::as_str)
1442        );
1443    }
1444}