diff --git a/Cargo.lock b/Cargo.lock index 67ec748de3..24f9769e5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3222,6 +3222,7 @@ dependencies = [ "common-runtime", "common-telemetry", "common-test-util", + "common-time", "datafusion", "datafusion-common", "datafusion-expr", diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index bcb777a9dd..81c7443d8d 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -58,6 +58,16 @@ pub enum Format { Parquet(ParquetFormat), } +impl Format { + pub fn suffix(&self) -> &'static str { + match self { + Format::Csv(_) => ".csv", + Format::Json(_) => ".json", + Format::Parquet(_) => ".parquet", + } + } +} + impl TryFrom<&HashMap> for Format { type Error = error::Error; diff --git a/src/common/query/src/logical_plan.rs b/src/common/query/src/logical_plan.rs index 809d033baf..0b8d67ae6a 100644 --- a/src/common/query/src/logical_plan.rs +++ b/src/common/query/src/logical_plan.rs @@ -20,6 +20,7 @@ mod udf; use std::sync::Arc; use datatypes::prelude::ConcreteDataType; +pub use expr::build_filter_from_timestamp; pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef}; pub use self::expr::{DfExpr, Expr}; @@ -28,7 +29,6 @@ pub use self::udf::ScalarUdf; use crate::function::{ReturnTypeFunction, ScalarFunctionImplementation}; use crate::logical_plan::accumulator::*; use crate::signature::{Signature, Volatility}; - /// Creates a new UDF with a specific signature and specific return type. /// This is a helper function to create a new UDF. /// The function `create_udf` returns a subset of all possible `ScalarFunction`: diff --git a/src/common/query/src/logical_plan/expr.rs b/src/common/query/src/logical_plan/expr.rs index 0af6aa3922..5744b4f486 100644 --- a/src/common/query/src/logical_plan/expr.rs +++ b/src/common/query/src/logical_plan/expr.rs @@ -12,7 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_time::range::TimestampRange; +use common_time::timestamp::TimeUnit; +use common_time::Timestamp; +use datafusion_common::{Column, ScalarValue}; pub use datafusion_expr::expr::Expr as DfExpr; +use datafusion_expr::{and, binary_expr, Operator}; /// Central struct of query API. /// Represent logical expressions such as `A + 1`, or `CAST(c1 AS int)`. @@ -33,6 +38,54 @@ impl From for Expr { } } +/// Builds an `Expr` that filters timestamp column from given timestamp range. +/// Returns [None] if time range is [None] or full time range. +pub fn build_filter_from_timestamp( + ts_col_name: &str, + time_range: Option<&TimestampRange>, +) -> Option { + let Some(time_range) = time_range else { return None; }; + let ts_col_expr = DfExpr::Column(Column { + relation: None, + name: ts_col_name.to_string(), + }); + + let df_expr = match (time_range.start(), time_range.end()) { + (None, None) => None, + (Some(start), None) => Some(binary_expr( + ts_col_expr, + Operator::GtEq, + timestamp_to_literal(start), + )), + (None, Some(end)) => Some(binary_expr( + ts_col_expr, + Operator::Lt, + timestamp_to_literal(end), + )), + (Some(start), Some(end)) => Some(and( + binary_expr( + ts_col_expr.clone(), + Operator::GtEq, + timestamp_to_literal(start), + ), + binary_expr(ts_col_expr, Operator::Lt, timestamp_to_literal(end)), + )), + }; + + df_expr.map(Expr::from) +} + +/// Converts a [Timestamp] to datafusion literal value. +fn timestamp_to_literal(timestamp: &Timestamp) -> DfExpr { + let scalar_value = match timestamp.unit() { + TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None), + TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(timestamp.value()), None), + TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None), + TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None), + }; + DfExpr::Literal(scalar_value) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/common/time/src/range.rs b/src/common/time/src/range.rs index ce61fea10e..0461378449 100644 --- a/src/common/time/src/range.rs +++ b/src/common/time/src/range.rs @@ -14,6 +14,8 @@ use std::fmt::{Debug, Display, Formatter}; +use serde::{Deserialize, Serialize}; + use crate::timestamp::TimeUnit; use crate::timestamp_millis::TimestampMillis; use crate::Timestamp; @@ -23,7 +25,7 @@ use crate::Timestamp; /// The range contains values that `value >= start` and `val < end`. /// /// The range is empty iff `start == end == "the default value of T"` -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct GenericRange { start: Option, end: Option, @@ -522,4 +524,25 @@ mod tests { ); assert!(range.is_empty()); } + + #[test] + fn test_serialize_timestamp_range() { + macro_rules! test_serde_for_unit { + ($($unit: expr),*) => { + $( + let original_range = TimestampRange::with_unit(0, 10, $unit).unwrap(); + let string = serde_json::to_string(&original_range).unwrap(); + let deserialized: TimestampRange = serde_json::from_str(&string).unwrap(); + assert_eq!(original_range, deserialized); + )* + }; + } + + test_serde_for_unit!( + TimeUnit::Second, + TimeUnit::Millisecond, + TimeUnit::Microsecond, + TimeUnit::Nanosecond + ); + } } diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index f0eda492b4..3b7b5f88be 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -228,6 +228,22 @@ pub fn table_idents_to_full_name( } } +pub fn idents_to_full_database_name( + obj_name: &ObjectName, + query_ctx: &QueryContextRef, +) -> Result<(String, String)> { + match &obj_name.0[..] { + [database] => Ok((query_ctx.current_catalog(), database.value.clone())), + [catalog, database] => Ok((catalog.value.clone(), database.value.clone())), + _ => error::InvalidSqlSnafu { + msg: format!( + "expect database name to be ., , found: {obj_name}", + ), + } + .fail(), + } +} + #[async_trait] impl SqlStatementExecutor for Instance { async fn execute_sql( diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 4487b10e37..207be3e093 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -29,6 +29,7 @@ common-meta = { path = "../common/meta" } common-recordbatch = { path = "../common/recordbatch" } common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } +common-time = { path = "../common/time" } datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 7aba14e118..bdfa8d697d 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -548,6 +548,13 @@ pub enum Error { #[snafu(backtrace)] source: query::error::Error, }, + + #[snafu(display("Invalid COPY parameter, key: {}, value: {}", key, value))] + InvalidCopyParameter { + key: String, + value: String, + location: Location, + }, } pub type Result = std::result::Result; @@ -667,6 +674,7 @@ impl ErrorExt for Error { | Error::BuildBackend { source } => source.status_code(), Error::WriteParquet { source, .. } => source.status_code(), + Error::InvalidCopyParameter { .. } => StatusCode::InvalidArguments, } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 3507b7e36e..0f116c50d2 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -627,12 +627,15 @@ pub fn check_permission( Statement::DescribeTable(stmt) => { validate_param(stmt.name(), query_ctx)?; } - Statement::Copy(stmd) => match stmd { + Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt { CopyTable::To(copy_table_to) => validate_param(©_table_to.table_name, query_ctx)?, CopyTable::From(copy_table_from) => { validate_param(©_table_from.table_name, query_ctx)? } }, + Statement::Copy(sql::statements::copy::Copy::CopyDatabase(stmt)) => { + validate_param(&stmt.database_name, query_ctx)? + } } Ok(()) } diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs index d3cd1d82aa..4030452fc9 100644 --- a/src/frontend/src/statement.rs +++ b/src/frontend/src/statement.rs @@ -12,32 +12,40 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod backup; mod copy_table_from; mod copy_table_to; mod describe; mod show; mod tql; +use std::collections::HashMap; +use std::str::FromStr; + use catalog::CatalogManagerRef; use common_error::prelude::BoxedError; use common_query::Output; use common_recordbatch::RecordBatches; -use datanode::instance::sql::table_idents_to_full_name; +use common_time::range::TimestampRange; +use common_time::Timestamp; +use datanode::instance::sql::{idents_to_full_database_name, table_idents_to_full_name}; use query::parser::QueryStatement; use query::query_engine::SqlStatementExecutorRef; use query::QueryEngineRef; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; -use sql::statements::copy::{CopyTable, CopyTableArgument}; +use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument}; use sql::statements::statement::Statement; use table::engine::TableReference; -use table::requests::{CopyDirection, CopyTableRequest}; +use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; use table::TableRef; +use crate::error; use crate::error::{ CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu, }; +use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; #[derive(Clone)] pub struct StatementExecutor { @@ -92,14 +100,23 @@ impl StatementExecutor { Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await, - Statement::Copy(stmt) => { + Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => { let req = to_copy_table_request(stmt, query_ctx)?; match req.direction { - CopyDirection::Export => self.copy_table_to(req).await, - CopyDirection::Import => self.copy_table_from(req).await, + CopyDirection::Export => { + self.copy_table_to(req).await.map(Output::AffectedRows) + } + CopyDirection::Import => { + self.copy_table_from(req).await.map(Output::AffectedRows) + } } } + Statement::Copy(sql::statements::copy::Copy::CopyDatabase(arg)) => { + self.copy_database(to_copy_database_request(arg, &query_ctx)?) + .await + } + Statement::CreateDatabase(_) | Statement::CreateTable(_) | Statement::CreateExternalTable(_) @@ -191,5 +208,47 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result< connection, pattern, direction, + // we copy the whole table by default. + timestamp_range: None, }) } + +/// Converts [CopyDatabaseArgument] to [CopyDatabaseRequest]. +/// This function extracts the necessary info including catalog/database name, time range, etc. +fn to_copy_database_request( + arg: CopyDatabaseArgument, + query_ctx: &QueryContextRef, +) -> Result { + let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let start_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_START_KEY)?; + let end_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_END_KEY)?; + + let time_range = match (start_timestamp, end_timestamp) { + (Some(start), Some(end)) => TimestampRange::new(start, end), + (Some(start), None) => Some(TimestampRange::from_start(start)), + (None, Some(end)) => Some(TimestampRange::until_end(end, false)), // exclusive end + (None, None) => None, + }; + + Ok(CopyDatabaseRequest { + catalog_name, + schema_name: database_name, + location: arg.location, + with: arg.with, + connection: arg.connection, + time_range, + }) +} + +/// Extracts timestamp from a [HashMap] with given key. +fn extract_timestamp(map: &HashMap, key: &str) -> Result> { + map.get(key) + .map(|v| { + Timestamp::from_str(v) + .map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build()) + }) + .transpose() +} diff --git a/src/frontend/src/statement/backup.rs b/src/frontend/src/statement/backup.rs new file mode 100644 index 0000000000..bdd99f73c3 --- /dev/null +++ b/src/frontend/src/statement/backup.rs @@ -0,0 +1,97 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_datasource::file_format::Format; +use common_query::Output; +use common_telemetry::info; +use snafu::{ensure, OptionExt, ResultExt}; +use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; + +use crate::error; +use crate::error::{ + CatalogNotFoundSnafu, CatalogSnafu, InvalidCopyParameterSnafu, SchemaNotFoundSnafu, +}; +use crate::statement::StatementExecutor; + +pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time"; +pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time"; + +impl StatementExecutor { + pub(crate) async fn copy_database(&self, req: CopyDatabaseRequest) -> error::Result { + // location must end with / so that every table is exported to a file. + ensure!( + req.location.ends_with('/'), + InvalidCopyParameterSnafu { + key: "location", + value: req.location, + } + ); + + info!( + "Copy database {}.{}, dir: {},. time: {:?}", + req.catalog_name, req.schema_name, req.location, req.time_range + ); + let schema = self + .catalog_manager + .catalog(&req.catalog_name) + .await + .context(CatalogSnafu)? + .context(CatalogNotFoundSnafu { + catalog_name: &req.catalog_name, + })? + .schema(&req.schema_name) + .await + .context(CatalogSnafu)? + .context(SchemaNotFoundSnafu { + schema_info: &req.schema_name, + })?; + + let suffix = Format::try_from(&req.with) + .context(error::ParseFileFormatSnafu)? + .suffix(); + + let table_names = schema.table_names().await.context(CatalogSnafu)?; + + let mut exported_rows = 0; + for table_name in table_names { + // TODO(hl): remove this hardcode once we've removed numbers table. + if table_name == "numbers" { + continue; + } + let mut table_file = req.location.clone(); + table_file.push_str(&table_name); + table_file.push_str(suffix); + info!( + "Copy table: {}.{}.{} to {}", + req.catalog_name, req.schema_name, table_name, table_file + ); + + let exported = self + .copy_table_to(CopyTableRequest { + catalog_name: req.catalog_name.clone(), + schema_name: req.schema_name.clone(), + table_name, + location: table_file, + with: req.with.clone(), + connection: req.connection.clone(), + pattern: None, + direction: CopyDirection::Export, + timestamp_range: req.time_range, + }) + .await?; + exported_rows += exported; + } + Ok(Output::AffectedRows(exported_rows)) + } +} diff --git a/src/frontend/src/statement/copy_table_from.rs b/src/frontend/src/statement/copy_table_from.rs index 8f590b5109..617195a5e9 100644 --- a/src/frontend/src/statement/copy_table_from.rs +++ b/src/frontend/src/statement/copy_table_from.rs @@ -24,7 +24,6 @@ use common_datasource::file_format::{FileFormat, Format}; use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::{build_backend, parse_url}; use common_datasource::util::find_dir_and_filename; -use common_query::Output; use common_recordbatch::adapter::ParquetRecordBatchStreamAdapter; use common_recordbatch::DfSendableRecordBatchStream; use datafusion::datasource::listing::PartitionedFile; @@ -205,7 +204,7 @@ impl StatementExecutor { } } - pub async fn copy_table_from(&self, req: CopyTableRequest) -> Result { + pub async fn copy_table_from(&self, req: CopyTableRequest) -> Result { let table_ref = TableReference { catalog: &req.catalog_name, schema: &req.schema_name, @@ -313,7 +312,7 @@ impl StatementExecutor { } } - Ok(Output::AffectedRows(rows_inserted)) + Ok(rows_inserted) } } diff --git a/src/frontend/src/statement/copy_table_to.rs b/src/frontend/src/statement/copy_table_to.rs index aed13b57e4..77fba4a499 100644 --- a/src/frontend/src/statement/copy_table_to.rs +++ b/src/frontend/src/statement/copy_table_to.rs @@ -18,7 +18,6 @@ use common_datasource::file_format::json::stream_to_json; use common_datasource::file_format::Format; use common_datasource::object_store::{build_backend, parse_url}; use common_query::physical_plan::SessionContext; -use common_query::Output; use common_recordbatch::adapter::DfRecordBatchStreamAdapter; use common_recordbatch::SendableRecordBatchStream; use object_store::ObjectStore; @@ -72,7 +71,7 @@ impl StatementExecutor { } } - pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result { + pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result { let table_ref = TableReference { catalog: &req.catalog_name, schema: &req.schema_name, @@ -82,12 +81,25 @@ impl StatementExecutor { let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?; - let stream = table - .scan(None, &[], None) - .await - .with_context(|_| error::CopyTableSnafu { - table_name: table_ref.to_string(), - })?; + let filters = table + .schema() + .timestamp_column() + .and_then(|c| { + common_query::logical_plan::build_filter_from_timestamp( + &c.name, + req.timestamp_range.as_ref(), + ) + }) + .into_iter() + .collect::>(); + + let stream = + table + .scan(None, &filters, None) + .await + .with_context(|_| error::CopyTableSnafu { + table_name: table_ref.to_string(), + })?; let stream = stream .execute(0, SessionContext::default().task_ctx()) @@ -101,6 +113,6 @@ impl StatementExecutor { .stream_to_file(stream, &format, object_store, &path) .await?; - Ok(Output::AffectedRows(rows_copied)) + Ok(rows_copied) } } diff --git a/src/sql/src/lib.rs b/src/sql/src/lib.rs index 027ecf2817..ef9ef453fc 100644 --- a/src/sql/src/lib.rs +++ b/src/sql/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(box_patterns)] #![feature(assert_matches)] +#![feature(let_chains)] pub mod ast; pub mod dialect; diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index 0e56f90757..1bad3a3f7f 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -12,22 +12,60 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use snafu::ResultExt; use sqlparser::ast::ObjectName; use sqlparser::keywords::Keyword; +use sqlparser::tokenizer::Token::Word; use crate::error::{self, Result}; use crate::parser::ParserContext; -use crate::statements::copy::{CopyTable, CopyTableArgument}; +use crate::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument}; use crate::statements::statement::Statement; use crate::util::parse_option_string; +pub type With = HashMap; +pub type Connection = HashMap; + // COPY tbl TO 'output.parquet'; impl<'a> ParserContext<'a> { pub(crate) fn parse_copy(&mut self) -> Result { self.parser.next_token(); - let copy_table = self.parse_copy_table()?; - Ok(Statement::Copy(copy_table)) + let next = self.parser.peek_token(); + let copy = if let Word(word) = next.token && word.keyword == Keyword::DATABASE { + self.parser.next_token(); + let copy_database = self.parser_copy_database()?; + crate::statements::copy::Copy::CopyDatabase(copy_database) + } else { + let copy_table = self.parse_copy_table()?; + crate::statements::copy::Copy::CopyTable(copy_table) + }; + + Ok(Statement::Copy(copy)) + } + + fn parser_copy_database(&mut self) -> Result { + let database_name = + self.parser + .parse_object_name() + .with_context(|_| error::UnexpectedSnafu { + sql: self.sql, + expected: "a database name", + actual: self.peek_token_as_string(), + })?; + + self.parser + .expect_keyword(Keyword::TO) + .context(error::SyntaxSnafu { sql: self.sql })?; + + let (with, connection, location) = self.parse_copy_to()?; + Ok(CopyDatabaseArgument { + database_name, + with, + connection, + location, + }) } fn parse_copy_table(&mut self) -> Result { @@ -41,7 +79,13 @@ impl<'a> ParserContext<'a> { })?; if self.parser.parse_keyword(Keyword::TO) { - Ok(CopyTable::To(self.parse_copy_table_to(table_name)?)) + let (with, connection, location) = self.parse_copy_to()?; + Ok(CopyTable::To(CopyTableArgument { + table_name, + with, + connection, + location, + })) } else { self.parser .expect_keyword(Keyword::FROM) @@ -91,7 +135,7 @@ impl<'a> ParserContext<'a> { }) } - fn parse_copy_table_to(&mut self, table_name: ObjectName) -> Result { + fn parse_copy_to(&mut self) -> Result<(With, Connection, String)> { let location = self.parser .parse_literal_string() @@ -125,12 +169,7 @@ impl<'a> ParserContext<'a> { }) .collect(); - Ok(CopyTableArgument { - table_name, - with, - connection, - location, - }) + Ok((with, connection, location)) } } @@ -139,8 +178,11 @@ mod tests { use std::assert_matches::assert_matches; use std::collections::HashMap; + use sqlparser::ast::Ident; + use super::*; use crate::dialect::GreptimeDbDialect; + use crate::statements::statement::Statement::Copy; #[test] fn test_parse_copy_table() { @@ -155,7 +197,8 @@ mod tests { let statement = result.remove(0); assert_matches!(statement, Statement::Copy { .. }); match statement { - Statement::Copy(CopyTable::To(copy_table)) => { + Copy(copy) => { + let crate::statements::copy::Copy::CopyTable(CopyTable::To(copy_table)) = copy else { unreachable!() }; let (catalog, schema, table) = if let [catalog, schema, table] = ©_table.table_name.0[..] { ( @@ -198,7 +241,9 @@ mod tests { let statement = result.remove(0); assert_matches!(statement, Statement::Copy { .. }); match statement { - Statement::Copy(CopyTable::From(copy_table)) => { + Statement::Copy(crate::statements::copy::Copy::CopyTable(CopyTable::From( + copy_table, + ))) => { let (catalog, schema, table) = if let [catalog, schema, table] = ©_table.table_name.0[..] { ( @@ -254,7 +299,9 @@ mod tests { let statement = result.remove(0); assert_matches!(statement, Statement::Copy { .. }); match statement { - Statement::Copy(CopyTable::From(copy_table)) => { + Statement::Copy(crate::statements::copy::Copy::CopyTable(CopyTable::From( + copy_table, + ))) => { if let Some(expected_pattern) = test.expected_pattern { assert_eq!(copy_table.pattern().unwrap(), expected_pattern); } @@ -295,11 +342,44 @@ mod tests { let statement = result.remove(0); assert_matches!(statement, Statement::Copy { .. }); match statement { - Statement::Copy(CopyTable::To(copy_table)) => { + Statement::Copy(crate::statements::copy::Copy::CopyTable(CopyTable::To( + copy_table, + ))) => { assert_eq!(copy_table.connection.clone(), test.expected_connection); } _ => unreachable!(), } } } + + #[test] + fn test_copy_database_to() { + let sql = "COPY DATABASE catalog0.schema0 TO 'tbl_file.parquet' WITH (FORMAT = 'parquet') CONNECTION (FOO='Bar', ONE='two')"; + let stmt = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}) + .unwrap() + .pop() + .unwrap(); + + let Statement::Copy(crate::statements::copy::Copy::CopyDatabase(stmt)) = stmt else { unreachable!() }; + assert_eq!( + ObjectName(vec![Ident::new("catalog0"), Ident::new("schema0")]), + stmt.database_name + ); + assert_eq!( + [("format".to_string(), "parquet".to_string())] + .into_iter() + .collect::>(), + stmt.with + ); + + assert_eq!( + [ + ("foo".to_string(), "Bar".to_string()), + ("one".to_string(), "two".to_string()) + ] + .into_iter() + .collect::>(), + stmt.connection + ); + } } diff --git a/src/sql/src/statements/copy.rs b/src/sql/src/statements/copy.rs index 35dfe1bfe1..494c07c707 100644 --- a/src/sql/src/statements/copy.rs +++ b/src/sql/src/statements/copy.rs @@ -15,12 +15,27 @@ use std::collections::HashMap; use sqlparser::ast::ObjectName; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Copy { + CopyTable(CopyTable), + CopyDatabase(CopyDatabaseArgument), +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum CopyTable { To(CopyTableArgument), From(CopyTableArgument), } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CopyDatabaseArgument { + pub database_name: ObjectName, + pub with: HashMap, + pub connection: HashMap, + pub location: String, +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct CopyTableArgument { pub table_name: ObjectName, diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index 37e1363265..20a608ae5d 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -17,7 +17,6 @@ use sqlparser::ast::Statement as SpStatement; use crate::error::{ConvertToDfStatementSnafu, Error}; use crate::statements::alter::AlterTable; -use crate::statements::copy::CopyTable; use crate::statements::create::{CreateDatabase, CreateExternalTable, CreateTable}; use crate::statements::delete::Delete; use crate::statements::describe::DescribeTable; @@ -60,7 +59,7 @@ pub enum Statement { Explain(Explain), Use(String), // COPY - Copy(CopyTable), + Copy(crate::statements::copy::Copy), Tql(Tql), } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index c71cb7382b..57fbcb3a1c 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -20,6 +20,7 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_query::AddColumnLocation; +use common_time::range::TimestampRange; use datatypes::prelude::VectorRef; use datatypes::schema::{ColumnSchema, RawSchema}; use serde::{Deserialize, Serialize}; @@ -268,6 +269,7 @@ pub struct CopyTableRequest { pub connection: HashMap, pub pattern: Option, pub direction: CopyDirection, + pub timestamp_range: Option, } #[derive(Debug, Clone, Default)] @@ -293,6 +295,16 @@ macro_rules! meter_insert_request { }; } +#[derive(Debug, Clone, Default, Deserialize, Serialize)] +pub struct CopyDatabaseRequest { + pub catalog_name: String, + pub schema_name: String, + pub location: String, + pub with: HashMap, + pub connection: HashMap, + pub time_range: Option, +} + #[cfg(test)] mod tests { use super::*;