Skip to main content

operator/
statement.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod admin;
16mod comment;
17mod copy_database;
18mod copy_query_to;
19mod copy_table_from;
20mod copy_table_to;
21mod cursor;
22pub mod ddl;
23mod describe;
24mod dml;
25mod kill;
26mod set;
27mod show;
28mod tql;
29
30use std::collections::HashMap;
31use std::sync::Arc;
32
33use api::v1::RowInsertRequests;
34use catalog::CatalogManagerRef;
35use catalog::kvbackend::KvBackendCatalogManager;
36use catalog::process_manager::ProcessManagerRef;
37use client::RecordBatches;
38use client::error::{ExternalSnafu as ClientExternalSnafu, Result as ClientResult};
39use client::inserter::{InsertOptions, Inserter};
40use common_error::ext::BoxedError;
41use common_meta::cache_invalidator::CacheInvalidatorRef;
42use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
43use common_meta::key::schema_name::SchemaNameKey;
44use common_meta::key::view_info::{ViewInfoManager, ViewInfoManagerRef};
45use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
46use common_meta::kv_backend::KvBackendRef;
47use common_meta::procedure_executor::ProcedureExecutorRef;
48use common_query::Output;
49use common_telemetry::{debug, tracing, warn};
50use common_time::Timestamp;
51use common_time::range::TimestampRange;
52use datafusion_expr::LogicalPlan;
53use datatypes::prelude::ConcreteDataType;
54use datatypes::schema::ColumnSchema;
55use humantime::format_duration;
56use itertools::Itertools;
57use partition::manager::PartitionRuleManagerRef;
58use query::QueryEngineRef;
59use query::parser::QueryStatement;
60use session::context::{Channel, QueryContextBuilder, QueryContextRef};
61use session::table_name::table_idents_to_full_name;
62use set::{set_query_timeout, set_read_preference};
63use snafu::{OptionExt, ResultExt, ensure};
64use sql::ast::ObjectNamePartExt;
65use sql::statements::OptionMap;
66use sql::statements::copy::{
67    CopyDatabase, CopyDatabaseArgument, CopyQueryToArgument, CopyTable, CopyTableArgument,
68};
69use sql::statements::set_variables::SetVariables;
70use sql::statements::show::ShowCreateTableVariant;
71use sql::statements::statement::Statement;
72use sql::util::format_raw_object_name;
73use sqlparser::ast::ObjectName;
74use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
75use table::TableRef;
76use table::requests::{CopyDatabaseRequest, CopyDirection, CopyQueryToRequest, CopyTableRequest};
77use table::table_name::TableName;
78use table::table_reference::TableReference;
79
80use self::set::{
81    set_bytea_output, set_datestyle, set_intervalstyle, set_search_path, set_timezone,
82    validate_client_encoding,
83};
84use crate::error::{
85    self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, NotSupportedSnafu,
86    PlanStatementSnafu, Result, SchemaNotFoundSnafu, SqlCommonSnafu, TableMetadataManagerSnafu,
87    TableNotFoundSnafu, UnexpectedSnafu, UpgradeCatalogManagerRefSnafu,
88};
89use crate::insert::InserterRef;
90use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
91use crate::statement::set::set_allow_query_fallback;
92
93/// A configurator that customizes or enhances a [`StatementExecutor`].
94#[async_trait::async_trait]
95pub trait StatementExecutorConfigurator: Send + Sync {
96    async fn configure(
97        &self,
98        executor: StatementExecutor,
99        ctx: ExecutorConfigureContext,
100    ) -> std::result::Result<StatementExecutor, BoxedError>;
101}
102
103pub type StatementExecutorConfiguratorRef = Arc<dyn StatementExecutorConfigurator>;
104
105pub struct ExecutorConfigureContext {
106    pub kv_backend: KvBackendRef,
107}
108
109#[derive(Clone)]
110pub struct StatementExecutor {
111    catalog_manager: CatalogManagerRef,
112    query_engine: QueryEngineRef,
113    procedure_executor: ProcedureExecutorRef,
114    table_metadata_manager: TableMetadataManagerRef,
115    flow_metadata_manager: FlowMetadataManagerRef,
116    view_info_manager: ViewInfoManagerRef,
117    partition_manager: PartitionRuleManagerRef,
118    cache_invalidator: CacheInvalidatorRef,
119    inserter: InserterRef,
120    process_manager: Option<ProcessManagerRef>,
121    origin_frontend_addr: String,
122    #[cfg(feature = "enterprise")]
123    trigger_querier: Option<TriggerQuerierRef>,
124}
125
126pub type StatementExecutorRef = Arc<StatementExecutor>;
127
128/// Trait for querying trigger info, such as `SHOW CREATE TRIGGER` etc.
129#[cfg(feature = "enterprise")]
130#[async_trait::async_trait]
131pub trait TriggerQuerier: Send + Sync {
132    // Query the `SHOW CREATE TRIGGER` statement for the given trigger.
133    async fn show_create_trigger(
134        &self,
135        catalog: &str,
136        trigger: &str,
137        query_ctx: &QueryContextRef,
138    ) -> std::result::Result<Output, BoxedError>;
139
140    fn as_any(&self) -> &dyn std::any::Any;
141}
142
143#[cfg(feature = "enterprise")]
144pub type TriggerQuerierRef = Arc<dyn TriggerQuerier>;
145
146impl StatementExecutor {
147    #[allow(clippy::too_many_arguments)]
148    pub fn new(
149        catalog_manager: CatalogManagerRef,
150        query_engine: QueryEngineRef,
151        procedure_executor: ProcedureExecutorRef,
152        kv_backend: KvBackendRef,
153        cache_invalidator: CacheInvalidatorRef,
154        inserter: InserterRef,
155        partition_manager: PartitionRuleManagerRef,
156        process_manager: Option<ProcessManagerRef>,
157        origin_frontend_addr: String,
158    ) -> Self {
159        Self {
160            catalog_manager,
161            query_engine,
162            procedure_executor,
163            table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
164            flow_metadata_manager: Arc::new(FlowMetadataManager::new(kv_backend.clone())),
165            view_info_manager: Arc::new(ViewInfoManager::new(kv_backend.clone())),
166            partition_manager,
167            cache_invalidator,
168            inserter,
169            process_manager,
170            origin_frontend_addr,
171            #[cfg(feature = "enterprise")]
172            trigger_querier: None,
173        }
174    }
175
176    #[cfg(feature = "enterprise")]
177    pub fn with_trigger_querier(mut self, querier: TriggerQuerierRef) -> Self {
178        self.trigger_querier = Some(querier);
179        self
180    }
181
182    #[cfg(feature = "testing")]
183    pub async fn execute_stmt(
184        &self,
185        stmt: QueryStatement,
186        query_ctx: QueryContextRef,
187    ) -> Result<Output> {
188        match stmt {
189            QueryStatement::Sql(stmt) => self.execute_sql(stmt, query_ctx).await,
190            QueryStatement::Promql(_, _) => self.plan_exec(stmt, query_ctx).await,
191        }
192    }
193
194    #[tracing::instrument(skip_all)]
195    pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
196        match stmt {
197            Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
198                self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
199            }
200
201            Statement::DeclareCursor(declare_cursor) => {
202                self.declare_cursor(declare_cursor, query_ctx).await
203            }
204            Statement::FetchCursor(fetch_cursor) => {
205                self.fetch_cursor(fetch_cursor, query_ctx).await
206            }
207            Statement::CloseCursor(close_cursor) => {
208                self.close_cursor(close_cursor, query_ctx).await
209            }
210
211            Statement::Insert(insert) => self.insert(insert, query_ctx).await,
212
213            Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
214
215            Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
216
217            Statement::ShowDatabases(stmt) => self.show_databases(stmt, query_ctx).await,
218
219            Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
220
221            Statement::ShowTableStatus(stmt) => self.show_table_status(stmt, query_ctx).await,
222
223            Statement::ShowCollation(kind) => self.show_collation(kind, query_ctx).await,
224
225            Statement::ShowCharset(kind) => self.show_charset(kind, query_ctx).await,
226
227            Statement::ShowViews(stmt) => self.show_views(stmt, query_ctx).await,
228
229            Statement::ShowFlows(stmt) => self.show_flows(stmt, query_ctx).await,
230
231            #[cfg(feature = "enterprise")]
232            Statement::ShowTriggers(stmt) => self.show_triggers(stmt, query_ctx).await,
233
234            Statement::Copy(sql::statements::copy::Copy::CopyQueryTo(stmt)) => {
235                let query_output = self
236                    .plan_exec(QueryStatement::Sql(*stmt.query), query_ctx)
237                    .await?;
238                let req = to_copy_query_request(stmt.arg)?;
239
240                self.copy_query_to(req, query_output)
241                    .await
242                    .map(Output::new_with_affected_rows)
243            }
244
245            Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
246                let req = to_copy_table_request(stmt, query_ctx.clone())?;
247                match req.direction {
248                    CopyDirection::Export => self
249                        .copy_table_to(req, query_ctx)
250                        .await
251                        .map(Output::new_with_affected_rows),
252                    CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
253                }
254            }
255
256            Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
257                match copy_database {
258                    CopyDatabase::To(arg) => {
259                        self.copy_database_to(
260                            to_copy_database_request(arg, &query_ctx)?,
261                            query_ctx.clone(),
262                        )
263                        .await
264                    }
265                    CopyDatabase::From(arg) => {
266                        self.copy_database_from(
267                            to_copy_database_request(arg, &query_ctx)?,
268                            query_ctx,
269                        )
270                        .await
271                    }
272                }
273            }
274
275            Statement::CreateTable(stmt) => {
276                let _ = self.create_table(stmt, query_ctx).await?;
277                Ok(Output::new_with_affected_rows(0))
278            }
279            Statement::CreateTableLike(stmt) => {
280                let _ = self.create_table_like(stmt, query_ctx).await?;
281                Ok(Output::new_with_affected_rows(0))
282            }
283            Statement::CreateExternalTable(stmt) => {
284                let _ = self.create_external_table(stmt, query_ctx).await?;
285                Ok(Output::new_with_affected_rows(0))
286            }
287            Statement::CreateFlow(stmt) => self.create_flow(stmt, query_ctx).await,
288            #[cfg(feature = "enterprise")]
289            Statement::CreateTrigger(stmt) => self.create_trigger(stmt, query_ctx).await,
290            Statement::DropFlow(stmt) => {
291                self.drop_flow(
292                    query_ctx.current_catalog().to_string(),
293                    format_raw_object_name(stmt.flow_name()),
294                    stmt.drop_if_exists(),
295                    query_ctx,
296                )
297                .await
298            }
299            #[cfg(feature = "enterprise")]
300            Statement::DropTrigger(stmt) => {
301                self.drop_trigger(
302                    query_ctx.current_catalog().to_string(),
303                    format_raw_object_name(stmt.trigger_name()),
304                    stmt.drop_if_exists(),
305                    query_ctx,
306                )
307                .await
308            }
309            Statement::CreateView(stmt) => {
310                let _ = self.create_view(stmt, query_ctx).await?;
311                Ok(Output::new_with_affected_rows(0))
312            }
313            Statement::DropView(stmt) => {
314                let (catalog_name, schema_name, view_name) =
315                    table_idents_to_full_name(&stmt.view_name, &query_ctx)
316                        .map_err(BoxedError::new)
317                        .context(ExternalSnafu)?;
318
319                self.drop_view(
320                    catalog_name,
321                    schema_name,
322                    view_name,
323                    stmt.drop_if_exists,
324                    query_ctx,
325                )
326                .await
327            }
328            Statement::AlterTable(alter_table) => self.alter_table(alter_table, query_ctx).await,
329
330            Statement::AlterDatabase(alter_database) => {
331                self.alter_database(alter_database, query_ctx).await
332            }
333
334            #[cfg(feature = "enterprise")]
335            Statement::AlterTrigger(alter_trigger) => {
336                self.alter_trigger(alter_trigger, query_ctx).await
337            }
338
339            Statement::DropTable(stmt) => {
340                let mut table_names = Vec::with_capacity(stmt.table_names().len());
341                for table_name_stmt in stmt.table_names() {
342                    let (catalog, schema, table) =
343                        table_idents_to_full_name(table_name_stmt, &query_ctx)
344                            .map_err(BoxedError::new)
345                            .context(ExternalSnafu)?;
346                    table_names.push(TableName::new(catalog, schema, table));
347                }
348                self.drop_tables(&table_names[..], stmt.drop_if_exists(), query_ctx.clone())
349                    .await
350            }
351            Statement::DropDatabase(stmt) => {
352                self.drop_database(
353                    query_ctx.current_catalog().to_string(),
354                    format_raw_object_name(stmt.name()),
355                    stmt.drop_if_exists(),
356                    query_ctx,
357                )
358                .await
359            }
360            Statement::TruncateTable(stmt) => {
361                let (catalog, schema, table) =
362                    table_idents_to_full_name(stmt.table_name(), &query_ctx)
363                        .map_err(BoxedError::new)
364                        .context(ExternalSnafu)?;
365                let table_name = TableName::new(catalog, schema, table);
366                let time_ranges = self
367                    .convert_truncate_time_ranges(&table_name, stmt.time_ranges(), &query_ctx)
368                    .await?;
369                self.truncate_table(table_name, time_ranges, query_ctx)
370                    .await
371            }
372            Statement::CreateDatabase(stmt) => {
373                self.create_database(
374                    &format_raw_object_name(&stmt.name),
375                    stmt.if_not_exists,
376                    stmt.options.into_map(),
377                    query_ctx,
378                )
379                .await
380            }
381            Statement::ShowCreateDatabase(show) => {
382                let (catalog, database) =
383                    idents_to_full_database_name(&show.database_name, &query_ctx)
384                        .map_err(BoxedError::new)
385                        .context(ExternalSnafu)?;
386                let table_metadata_manager = self
387                    .catalog_manager
388                    .as_any()
389                    .downcast_ref::<KvBackendCatalogManager>()
390                    .map(|manager| manager.table_metadata_manager_ref().clone())
391                    .context(UpgradeCatalogManagerRefSnafu)?;
392                let opts: HashMap<String, String> = table_metadata_manager
393                    .schema_manager()
394                    .get(SchemaNameKey::new(&catalog, &database))
395                    .await
396                    .context(TableMetadataManagerSnafu)?
397                    .context(SchemaNotFoundSnafu {
398                        schema_info: &database,
399                    })?
400                    .into_inner()
401                    .into();
402
403                self.show_create_database(&database, opts.into()).await
404            }
405            Statement::ShowCreateTable(show) => {
406                let (catalog, schema, table) =
407                    table_idents_to_full_name(&show.table_name, &query_ctx)
408                        .map_err(BoxedError::new)
409                        .context(ExternalSnafu)?;
410
411                let table_ref = self
412                    .catalog_manager
413                    .table(&catalog, &schema, &table, Some(&query_ctx))
414                    .await
415                    .context(CatalogSnafu)?
416                    .context(TableNotFoundSnafu { table_name: &table })?;
417                let table_name = TableName::new(catalog, schema, table);
418
419                match show.variant {
420                    ShowCreateTableVariant::Original => {
421                        self.show_create_table(table_name, table_ref, query_ctx)
422                            .await
423                    }
424                    ShowCreateTableVariant::PostgresForeignTable => {
425                        self.show_create_table_for_pg(table_name, table_ref, query_ctx)
426                            .await
427                    }
428                }
429            }
430            Statement::ShowCreateFlow(show) => self.show_create_flow(show, query_ctx).await,
431            Statement::ShowCreateView(show) => self.show_create_view(show, query_ctx).await,
432            #[cfg(feature = "enterprise")]
433            Statement::ShowCreateTrigger(show) => self.show_create_trigger(show, query_ctx).await,
434            Statement::SetVariables(set_var) => self.set_variables(set_var, query_ctx),
435            Statement::ShowVariables(show_variable) => self.show_variable(show_variable, query_ctx),
436            Statement::Comment(stmt) => self.comment(stmt, query_ctx).await,
437            Statement::ShowColumns(show_columns) => {
438                self.show_columns(show_columns, query_ctx).await
439            }
440            Statement::ShowIndex(show_index) => self.show_index(show_index, query_ctx).await,
441            Statement::ShowRegion(show_region) => self.show_region(show_region, query_ctx).await,
442            Statement::ShowStatus(_) => self.show_status(query_ctx).await,
443            Statement::ShowSearchPath(_) => self.show_search_path(query_ctx).await,
444            Statement::Use(db) => self.use_database(db, query_ctx).await,
445            Statement::Admin(admin) => self.execute_admin_command(admin, query_ctx).await,
446            Statement::Kill(kill) => self.execute_kill(query_ctx, kill).await,
447            Statement::ShowProcesslist(show) => self.show_processlist(show, query_ctx).await,
448        }
449    }
450
451    pub async fn use_database(&self, db: String, query_ctx: QueryContextRef) -> Result<Output> {
452        let catalog = query_ctx.current_catalog();
453        ensure!(
454            self.catalog_manager
455                .schema_exists(catalog, db.as_ref(), Some(&query_ctx))
456                .await
457                .context(CatalogSnafu)?,
458            SchemaNotFoundSnafu { schema_info: &db }
459        );
460
461        query_ctx.set_current_schema(&db);
462
463        Ok(Output::new_with_record_batches(RecordBatches::empty()))
464    }
465
466    fn set_variables(&self, set_var: SetVariables, query_ctx: QueryContextRef) -> Result<Output> {
467        let var_name = set_var.variable.to_string().to_uppercase();
468
469        debug!(
470            "Trying to set {}={} for session: {} ",
471            var_name,
472            set_var.value.iter().map(|e| e.to_string()).join(", "),
473            query_ctx.conn_info()
474        );
475
476        match var_name.as_str() {
477            "READ_PREFERENCE" => set_read_preference(set_var.value, query_ctx)?,
478
479            "@@TIME_ZONE" | "@@SESSION.TIME_ZONE" | "TIMEZONE" | "TIME_ZONE" => {
480                set_timezone(set_var.value, query_ctx)?
481            }
482
483            "BYTEA_OUTPUT" => set_bytea_output(set_var.value, query_ctx)?,
484
485            // Same as "bytea_output", we just ignore it here.
486            // Not harmful since it only relates to how date is viewed in client app's output.
487            // The tracked issue is https://github.com/GreptimeTeam/greptimedb/issues/3442.
488            "DATESTYLE" => set_datestyle(set_var.value, query_ctx)?,
489            "INTERVALSTYLE" => set_intervalstyle(set_var.value, query_ctx)?,
490
491            // Allow query to fallback when failed to push down.
492            "ALLOW_QUERY_FALLBACK" => set_allow_query_fallback(set_var.value, query_ctx)?,
493
494            "CLIENT_ENCODING" => validate_client_encoding(set_var)?,
495            "@@SESSION.MAX_EXECUTION_TIME" | "MAX_EXECUTION_TIME" => match query_ctx.channel() {
496                Channel::Mysql => set_query_timeout(set_var.value, query_ctx)?,
497                Channel::Postgres => {
498                    warn!(
499                        "Unsupported set variable {} for channel {:?}",
500                        var_name,
501                        query_ctx.channel()
502                    );
503                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name))
504                }
505                _ => {
506                    return NotSupportedSnafu {
507                        feat: format!("Unsupported set variable {}", var_name),
508                    }
509                    .fail();
510                }
511            },
512            "STATEMENT_TIMEOUT" => match query_ctx.channel() {
513                Channel::Postgres => set_query_timeout(set_var.value, query_ctx)?,
514                Channel::Mysql => {
515                    warn!(
516                        "Unsupported set variable {} for channel {:?}",
517                        var_name,
518                        query_ctx.channel()
519                    );
520                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
521                }
522                _ => {
523                    return NotSupportedSnafu {
524                        feat: format!("Unsupported set variable {}", var_name),
525                    }
526                    .fail();
527                }
528            },
529            "SEARCH_PATH" => {
530                if query_ctx.channel() == Channel::Postgres {
531                    set_search_path(set_var.value, query_ctx)?
532                } else {
533                    return NotSupportedSnafu {
534                        feat: format!("Unsupported set variable {}", var_name),
535                    }
536                    .fail();
537                }
538            }
539            _ => {
540                if query_ctx.channel() == Channel::Postgres || query_ctx.channel() == Channel::Mysql
541                {
542                    // For unknown SET statements, we give a warning with success.
543                    // This prevents the SET call from becoming a blocker of MySQL/Postgres clients'
544                    // connection establishment.
545                    warn!(
546                        "Unsupported set variable {} for channel {:?}",
547                        var_name,
548                        query_ctx.channel()
549                    );
550                    query_ctx.set_warning(format!("Unsupported set variable {}", var_name));
551                } else {
552                    return NotSupportedSnafu {
553                        feat: format!("Unsupported set variable {}", var_name),
554                    }
555                    .fail();
556                }
557            }
558        }
559        Ok(Output::new_with_affected_rows(0))
560    }
561
562    #[tracing::instrument(skip_all)]
563    pub async fn plan(
564        &self,
565        stmt: &QueryStatement,
566        query_ctx: QueryContextRef,
567    ) -> Result<LogicalPlan> {
568        self.query_engine
569            .planner()
570            .plan(stmt, query_ctx)
571            .await
572            .context(PlanStatementSnafu)
573    }
574
575    /// Execute [`LogicalPlan`] directly.
576    #[tracing::instrument(skip_all)]
577    pub async fn exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
578        self.query_engine
579            .execute(plan, query_ctx)
580            .await
581            .context(ExecLogicalPlanSnafu)
582    }
583
584    pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
585        self.query_engine
586            .planner()
587            .optimize(plan)
588            .context(PlanStatementSnafu)
589    }
590
591    #[tracing::instrument(skip_all)]
592    async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result<Output> {
593        let plan = self.plan(&stmt, query_ctx.clone()).await?;
594        self.exec_plan(plan, query_ctx).await
595    }
596
597    async fn get_table(&self, table_ref: &TableReference<'_>) -> Result<TableRef> {
598        let TableReference {
599            catalog,
600            schema,
601            table,
602        } = table_ref;
603        self.catalog_manager
604            .table(catalog, schema, table, None)
605            .await
606            .context(CatalogSnafu)?
607            .with_context(|| TableNotFoundSnafu {
608                table_name: table_ref.to_string(),
609            })
610    }
611
612    pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
613        &self.procedure_executor
614    }
615
616    pub fn cache_invalidator(&self) -> &CacheInvalidatorRef {
617        &self.cache_invalidator
618    }
619
620    /// Convert truncate time ranges for the given table from sql values to timestamps
621    ///
622    pub async fn convert_truncate_time_ranges(
623        &self,
624        table_name: &TableName,
625        sql_values_time_range: &[(sqlparser::ast::Value, sqlparser::ast::Value)],
626        query_ctx: &QueryContextRef,
627    ) -> Result<Vec<(Timestamp, Timestamp)>> {
628        if sql_values_time_range.is_empty() {
629            return Ok(vec![]);
630        }
631        let table = self.get_table(&table_name.table_ref()).await?;
632        let info = table.table_info();
633        let time_index_dt = info
634            .meta
635            .schema
636            .timestamp_column()
637            .context(UnexpectedSnafu {
638                violated: "Table must have a timestamp column",
639            })?;
640
641        let time_unit = time_index_dt
642            .data_type
643            .as_timestamp()
644            .with_context(|| UnexpectedSnafu {
645                violated: format!(
646                    "Table {}'s time index column must be a timestamp type, found: {:?}",
647                    table_name, time_index_dt
648                ),
649            })?
650            .unit();
651
652        let start_column = ColumnSchema::new(
653            "range_start",
654            ConcreteDataType::timestamp_datatype(time_unit),
655            false,
656        );
657        let end_column = ColumnSchema::new(
658            "range_end",
659            ConcreteDataType::timestamp_datatype(time_unit),
660            false,
661        );
662        let mut time_ranges = Vec::with_capacity(sql_values_time_range.len());
663        for (start, end) in sql_values_time_range {
664            let start = common_sql::convert::sql_value_to_value(
665                &start_column,
666                start,
667                Some(&query_ctx.timezone()),
668                None,
669                false,
670            )
671            .context(SqlCommonSnafu)
672            .and_then(|v| {
673                if let datatypes::value::Value::Timestamp(t) = v {
674                    Ok(t)
675                } else {
676                    error::InvalidSqlSnafu {
677                        err_msg: format!("Expected a timestamp value, found {v:?}"),
678                    }
679                    .fail()
680                }
681            })?;
682
683            let end = common_sql::convert::sql_value_to_value(
684                &end_column,
685                end,
686                Some(&query_ctx.timezone()),
687                None,
688                false,
689            )
690            .context(SqlCommonSnafu)
691            .and_then(|v| {
692                if let datatypes::value::Value::Timestamp(t) = v {
693                    Ok(t)
694                } else {
695                    error::InvalidSqlSnafu {
696                        err_msg: format!("Expected a timestamp value, found {v:?}"),
697                    }
698                    .fail()
699                }
700            })?;
701            time_ranges.push((start, end));
702        }
703        Ok(time_ranges)
704    }
705
706    /// Returns the inserter for the statement executor.
707    pub(crate) fn inserter(&self) -> &InserterRef {
708        &self.inserter
709    }
710}
711
712fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest> {
713    let CopyQueryToArgument {
714        with,
715        connection,
716        location,
717    } = stmt;
718
719    Ok(CopyQueryToRequest {
720        location,
721        with: with.into_map(),
722        connection: connection.into_map(),
723    })
724}
725
726// Verifies time related format is valid
727fn verify_time_related_format(with: &OptionMap) -> Result<()> {
728    let time_format = with.get(common_datasource::file_format::TIME_FORMAT);
729    let date_format = with.get(common_datasource::file_format::DATE_FORMAT);
730    let timestamp_format = with.get(common_datasource::file_format::TIMESTAMP_FORMAT);
731    let file_format = with.get(common_datasource::file_format::FORMAT_TYPE);
732
733    if !matches!(file_format, Some(f) if f.eq_ignore_ascii_case("csv")) {
734        ensure!(
735            time_format.is_none() && date_format.is_none() && timestamp_format.is_none(),
736            error::TimestampFormatNotSupportedSnafu {
737                format: "<unknown>".to_string(),
738                file_format: file_format.unwrap_or_default(),
739            }
740        );
741    }
742
743    for (key, format_opt) in [
744        (common_datasource::file_format::TIME_FORMAT, time_format),
745        (common_datasource::file_format::DATE_FORMAT, date_format),
746        (
747            common_datasource::file_format::TIMESTAMP_FORMAT,
748            timestamp_format,
749        ),
750    ] {
751        if let Some(format) = format_opt {
752            chrono::format::strftime::StrftimeItems::new(format)
753                .parse()
754                .map_err(|_| error::InvalidCopyParameterSnafu { key, value: format }.build())?;
755        }
756    }
757
758    Ok(())
759}
760
761fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
762    let direction = match stmt {
763        CopyTable::To(_) => CopyDirection::Export,
764        CopyTable::From(_) => CopyDirection::Import,
765    };
766
767    let CopyTableArgument {
768        location,
769        connection,
770        with,
771        table_name,
772        limit,
773        ..
774    } = match stmt {
775        CopyTable::To(arg) => arg,
776        CopyTable::From(arg) => arg,
777    };
778    let (catalog_name, schema_name, table_name) =
779        table_idents_to_full_name(&table_name, &query_ctx)
780            .map_err(BoxedError::new)
781            .context(ExternalSnafu)?;
782
783    let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
784
785    verify_time_related_format(&with)?;
786
787    let pattern = with
788        .get(common_datasource::file_format::FILE_PATTERN)
789        .map(|x| x.to_string());
790
791    Ok(CopyTableRequest {
792        catalog_name,
793        schema_name,
794        table_name,
795        location,
796        with: with.into_map(),
797        connection: connection.into_map(),
798        pattern,
799        direction,
800        timestamp_range,
801        limit,
802    })
803}
804
805/// Converts [CopyDatabaseArgument] to [CopyDatabaseRequest].
806/// This function extracts the necessary info including catalog/database name, time range, etc.
807fn to_copy_database_request(
808    arg: CopyDatabaseArgument,
809    query_ctx: &QueryContextRef,
810) -> Result<CopyDatabaseRequest> {
811    let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
812        .map_err(BoxedError::new)
813        .context(ExternalSnafu)?;
814    let time_range = timestamp_range_from_option_map(&arg.with, query_ctx)?;
815
816    Ok(CopyDatabaseRequest {
817        catalog_name,
818        schema_name: database_name,
819        location: arg.location,
820        with: arg.with.into_map(),
821        connection: arg.connection.into_map(),
822        time_range,
823    })
824}
825
826/// Extracts timestamp range from OptionMap with keys `start_time` and `end_time`.
827/// The timestamp ranges should be a valid timestamp string as defined in [Timestamp::from_str].
828/// The timezone used for conversion will respect that inside `query_ctx`.
829fn timestamp_range_from_option_map(
830    options: &OptionMap,
831    query_ctx: &QueryContextRef,
832) -> Result<Option<TimestampRange>> {
833    let start_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_START_KEY, query_ctx)?;
834    let end_timestamp = extract_timestamp(options, COPY_DATABASE_TIME_END_KEY, query_ctx)?;
835    let time_range = match (start_timestamp, end_timestamp) {
836        (Some(start), Some(end)) => Some(TimestampRange::new(start, end).with_context(|| {
837            error::InvalidTimestampRangeSnafu {
838                start: start.to_iso8601_string(),
839                end: end.to_iso8601_string(),
840            }
841        })?),
842        (Some(start), None) => Some(TimestampRange::from_start(start)),
843        (None, Some(end)) => Some(TimestampRange::until_end(end, false)), // exclusive end
844        (None, None) => None,
845    };
846    Ok(time_range)
847}
848
849/// Extracts timestamp from a [HashMap<String, String>] with given key.
850fn extract_timestamp(
851    map: &OptionMap,
852    key: &str,
853    query_ctx: &QueryContextRef,
854) -> Result<Option<Timestamp>> {
855    map.get(key)
856        .map(|v| {
857            Timestamp::from_str(v, Some(&query_ctx.timezone()))
858                .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
859        })
860        .transpose()
861}
862
863fn idents_to_full_database_name(
864    obj_name: &ObjectName,
865    query_ctx: &QueryContextRef,
866) -> Result<(String, String)> {
867    match &obj_name.0[..] {
868        [database] => Ok((
869            query_ctx.current_catalog().to_owned(),
870            database.to_string_unquoted(),
871        )),
872        [catalog, database] => Ok((catalog.to_string_unquoted(), database.to_string_unquoted())),
873        _ => InvalidSqlSnafu {
874            err_msg: format!(
875                "expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
876            ),
877        }
878        .fail(),
879    }
880}
881
882/// The [`Inserter`] implementation for the statement executor.
883pub struct InserterImpl {
884    statement_executor: StatementExecutorRef,
885    options: Option<InsertOptions>,
886}
887
888impl InserterImpl {
889    pub fn new(statement_executor: StatementExecutorRef, options: Option<InsertOptions>) -> Self {
890        Self {
891            statement_executor,
892            options,
893        }
894    }
895}
896
897#[async_trait::async_trait]
898impl Inserter for InserterImpl {
899    async fn insert_rows(
900        &self,
901        context: &client::inserter::Context<'_>,
902        requests: RowInsertRequests,
903    ) -> ClientResult<()> {
904        let mut ctx_builder = QueryContextBuilder::default()
905            .current_catalog(context.catalog.to_string())
906            .current_schema(context.schema.to_string());
907        if let Some(options) = self.options.as_ref() {
908            ctx_builder = ctx_builder
909                .set_extension(
910                    TTL_KEY.to_string(),
911                    format_duration(options.ttl).to_string(),
912                )
913                .set_extension(APPEND_MODE_KEY.to_string(), options.append_mode.to_string());
914        }
915        let query_ctx = ctx_builder.build().into();
916
917        self.statement_executor
918            .inserter()
919            .handle_row_inserts(
920                requests,
921                query_ctx,
922                self.statement_executor.as_ref(),
923                false,
924                false,
925            )
926            .await
927            .map_err(BoxedError::new)
928            .context(ClientExternalSnafu)
929            .map(|_| ())
930    }
931
932    fn set_options(&mut self, options: &InsertOptions) {
933        self.options = Some(*options);
934    }
935}
936
937#[cfg(test)]
938mod tests {
939    use std::assert_matches;
940    use std::collections::HashMap;
941
942    use common_time::range::TimestampRange;
943    use common_time::{Timestamp, Timezone};
944    use session::context::QueryContextBuilder;
945    use sql::statements::OptionMap;
946
947    use crate::error;
948    use crate::statement::copy_database::{
949        COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
950    };
951    use crate::statement::{timestamp_range_from_option_map, verify_time_related_format};
952
953    fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
954        let query_ctx = QueryContextBuilder::default()
955            .timezone(Timezone::from_tz_string("Asia/Shanghai").unwrap())
956            .build()
957            .into();
958        let map = OptionMap::from(
959            [
960                (COPY_DATABASE_TIME_START_KEY.to_string(), start.to_string()),
961                (COPY_DATABASE_TIME_END_KEY.to_string(), end.to_string()),
962            ]
963            .into_iter()
964            .collect::<HashMap<_, _>>(),
965        );
966        timestamp_range_from_option_map(&map, &query_ctx)
967    }
968
969    #[test]
970    fn test_timestamp_range_from_option_map() {
971        assert_eq!(
972            Some(
973                TimestampRange::new(
974                    Timestamp::new_second(1649635200),
975                    Timestamp::new_second(1649664000),
976                )
977                .unwrap(),
978            ),
979            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 16:00:00"),).unwrap()
980        );
981
982        assert_matches!(
983            check_timestamp_range(("2022-04-11 08:00:00", "2022-04-11 07:00:00")).unwrap_err(),
984            error::Error::InvalidTimestampRange { .. }
985        );
986    }
987
988    #[test]
989    fn test_verify_timestamp_format() {
990        let map = OptionMap::from(
991            [
992                (
993                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
994                    "%Y-%m-%d %H:%M:%S".to_string(),
995                ),
996                (
997                    common_datasource::file_format::FORMAT_TYPE.to_string(),
998                    "csv".to_string(),
999                ),
1000            ]
1001            .into_iter()
1002            .collect::<HashMap<_, _>>(),
1003        );
1004        assert!(verify_time_related_format(&map).is_ok());
1005
1006        let map = OptionMap::from(
1007            [
1008                (
1009                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
1010                    "%Y-%m-%d %H:%M:%S".to_string(),
1011                ),
1012                (
1013                    common_datasource::file_format::FORMAT_TYPE.to_string(),
1014                    "json".to_string(),
1015                ),
1016            ]
1017            .into_iter()
1018            .collect::<HashMap<_, _>>(),
1019        );
1020
1021        assert_matches!(
1022            verify_time_related_format(&map).unwrap_err(),
1023            error::Error::TimestampFormatNotSupported { .. }
1024        );
1025        let map = OptionMap::from(
1026            [
1027                (
1028                    common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
1029                    "%111112".to_string(),
1030                ),
1031                (
1032                    common_datasource::file_format::FORMAT_TYPE.to_string(),
1033                    "csv".to_string(),
1034                ),
1035            ]
1036            .into_iter()
1037            .collect::<HashMap<_, _>>(),
1038        );
1039
1040        assert_matches!(
1041            verify_time_related_format(&map).unwrap_err(),
1042            error::Error::InvalidCopyParameter { .. }
1043        );
1044    }
1045}