mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: incremental database backup (#1240)
* feat: incremental database backup * chore: rebase develop * chore: move backup to StatementExecutor * feat: copy database parser * chore: remove some todos * chore: use timestamp string instead of i64 string * fix: typo
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3222,6 +3222,7 @@ dependencies = [
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
"datafusion",
|
||||
"datafusion-common",
|
||||
"datafusion-expr",
|
||||
|
||||
@@ -58,6 +58,16 @@ pub enum Format {
|
||||
Parquet(ParquetFormat),
|
||||
}
|
||||
|
||||
impl Format {
|
||||
pub fn suffix(&self) -> &'static str {
|
||||
match self {
|
||||
Format::Csv(_) => ".csv",
|
||||
Format::Json(_) => ".json",
|
||||
Format::Parquet(_) => ".parquet",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&HashMap<String, String>> for Format {
|
||||
type Error = error::Error;
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ mod udf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
pub use expr::build_filter_from_timestamp;
|
||||
|
||||
pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef};
|
||||
pub use self::expr::{DfExpr, Expr};
|
||||
@@ -28,7 +29,6 @@ pub use self::udf::ScalarUdf;
|
||||
use crate::function::{ReturnTypeFunction, ScalarFunctionImplementation};
|
||||
use crate::logical_plan::accumulator::*;
|
||||
use crate::signature::{Signature, Volatility};
|
||||
|
||||
/// Creates a new UDF with a specific signature and specific return type.
|
||||
/// This is a helper function to create a new UDF.
|
||||
/// The function `create_udf` returns a subset of all possible `ScalarFunction`:
|
||||
|
||||
@@ -12,7 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_time::range::TimestampRange;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::{Column, ScalarValue};
|
||||
pub use datafusion_expr::expr::Expr as DfExpr;
|
||||
use datafusion_expr::{and, binary_expr, Operator};
|
||||
|
||||
/// Central struct of query API.
|
||||
/// Represent logical expressions such as `A + 1`, or `CAST(c1 AS int)`.
|
||||
@@ -33,6 +38,54 @@ impl From<DfExpr> for Expr {
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds an `Expr` that filters timestamp column from given timestamp range.
|
||||
/// Returns [None] if time range is [None] or full time range.
|
||||
pub fn build_filter_from_timestamp(
|
||||
ts_col_name: &str,
|
||||
time_range: Option<&TimestampRange>,
|
||||
) -> Option<Expr> {
|
||||
let Some(time_range) = time_range else { return None; };
|
||||
let ts_col_expr = DfExpr::Column(Column {
|
||||
relation: None,
|
||||
name: ts_col_name.to_string(),
|
||||
});
|
||||
|
||||
let df_expr = match (time_range.start(), time_range.end()) {
|
||||
(None, None) => None,
|
||||
(Some(start), None) => Some(binary_expr(
|
||||
ts_col_expr,
|
||||
Operator::GtEq,
|
||||
timestamp_to_literal(start),
|
||||
)),
|
||||
(None, Some(end)) => Some(binary_expr(
|
||||
ts_col_expr,
|
||||
Operator::Lt,
|
||||
timestamp_to_literal(end),
|
||||
)),
|
||||
(Some(start), Some(end)) => Some(and(
|
||||
binary_expr(
|
||||
ts_col_expr.clone(),
|
||||
Operator::GtEq,
|
||||
timestamp_to_literal(start),
|
||||
),
|
||||
binary_expr(ts_col_expr, Operator::Lt, timestamp_to_literal(end)),
|
||||
)),
|
||||
};
|
||||
|
||||
df_expr.map(Expr::from)
|
||||
}
|
||||
|
||||
/// Converts a [Timestamp] to datafusion literal value.
|
||||
fn timestamp_to_literal(timestamp: &Timestamp) -> DfExpr {
|
||||
let scalar_value = match timestamp.unit() {
|
||||
TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None),
|
||||
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(timestamp.value()), None),
|
||||
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None),
|
||||
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None),
|
||||
};
|
||||
DfExpr::Literal(scalar_value)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
|
||||
use std::fmt::{Debug, Display, Formatter};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::timestamp::TimeUnit;
|
||||
use crate::timestamp_millis::TimestampMillis;
|
||||
use crate::Timestamp;
|
||||
@@ -23,7 +25,7 @@ use crate::Timestamp;
|
||||
/// The range contains values that `value >= start` and `val < end`.
|
||||
///
|
||||
/// The range is empty iff `start == end == "the default value of T"`
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub struct GenericRange<T> {
|
||||
start: Option<T>,
|
||||
end: Option<T>,
|
||||
@@ -522,4 +524,25 @@ mod tests {
|
||||
);
|
||||
assert!(range.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_serialize_timestamp_range() {
|
||||
macro_rules! test_serde_for_unit {
|
||||
($($unit: expr),*) => {
|
||||
$(
|
||||
let original_range = TimestampRange::with_unit(0, 10, $unit).unwrap();
|
||||
let string = serde_json::to_string(&original_range).unwrap();
|
||||
let deserialized: TimestampRange = serde_json::from_str(&string).unwrap();
|
||||
assert_eq!(original_range, deserialized);
|
||||
)*
|
||||
};
|
||||
}
|
||||
|
||||
test_serde_for_unit!(
|
||||
TimeUnit::Second,
|
||||
TimeUnit::Millisecond,
|
||||
TimeUnit::Microsecond,
|
||||
TimeUnit::Nanosecond
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -228,6 +228,22 @@ pub fn table_idents_to_full_name(
|
||||
}
|
||||
}
|
||||
|
||||
pub fn idents_to_full_database_name(
|
||||
obj_name: &ObjectName,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<(String, String)> {
|
||||
match &obj_name.0[..] {
|
||||
[database] => Ok((query_ctx.current_catalog(), database.value.clone())),
|
||||
[catalog, database] => Ok((catalog.value.clone(), database.value.clone())),
|
||||
_ => error::InvalidSqlSnafu {
|
||||
msg: format!(
|
||||
"expect database name to be <catalog>.<database>, <database>, found: {obj_name}",
|
||||
),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl SqlStatementExecutor for Instance {
|
||||
async fn execute_sql(
|
||||
|
||||
@@ -29,6 +29,7 @@ common-meta = { path = "../common/meta" }
|
||||
common-recordbatch = { path = "../common/recordbatch" }
|
||||
common-runtime = { path = "../common/runtime" }
|
||||
common-telemetry = { path = "../common/telemetry" }
|
||||
common-time = { path = "../common/time" }
|
||||
datafusion.workspace = true
|
||||
datafusion-common.workspace = true
|
||||
datafusion-expr.workspace = true
|
||||
|
||||
@@ -548,6 +548,13 @@ pub enum Error {
|
||||
#[snafu(backtrace)]
|
||||
source: query::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid COPY parameter, key: {}, value: {}", key, value))]
|
||||
InvalidCopyParameter {
|
||||
key: String,
|
||||
value: String,
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -667,6 +674,7 @@ impl ErrorExt for Error {
|
||||
| Error::BuildBackend { source } => source.status_code(),
|
||||
|
||||
Error::WriteParquet { source, .. } => source.status_code(),
|
||||
Error::InvalidCopyParameter { .. } => StatusCode::InvalidArguments,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -627,12 +627,15 @@ pub fn check_permission(
|
||||
Statement::DescribeTable(stmt) => {
|
||||
validate_param(stmt.name(), query_ctx)?;
|
||||
}
|
||||
Statement::Copy(stmd) => match stmd {
|
||||
Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => match stmt {
|
||||
CopyTable::To(copy_table_to) => validate_param(©_table_to.table_name, query_ctx)?,
|
||||
CopyTable::From(copy_table_from) => {
|
||||
validate_param(©_table_from.table_name, query_ctx)?
|
||||
}
|
||||
},
|
||||
Statement::Copy(sql::statements::copy::Copy::CopyDatabase(stmt)) => {
|
||||
validate_param(&stmt.database_name, query_ctx)?
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -12,32 +12,40 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod backup;
|
||||
mod copy_table_from;
|
||||
mod copy_table_to;
|
||||
mod describe;
|
||||
mod show;
|
||||
mod tql;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_error::prelude::BoxedError;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use datanode::instance::sql::table_idents_to_full_name;
|
||||
use common_time::range::TimestampRange;
|
||||
use common_time::Timestamp;
|
||||
use datanode::instance::sql::{idents_to_full_database_name, table_idents_to_full_name};
|
||||
use query::parser::QueryStatement;
|
||||
use query::query_engine::SqlStatementExecutorRef;
|
||||
use query::QueryEngineRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::statements::copy::{CopyTable, CopyTableArgument};
|
||||
use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument};
|
||||
use sql::statements::statement::Statement;
|
||||
use table::engine::TableReference;
|
||||
use table::requests::{CopyDirection, CopyTableRequest};
|
||||
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::error;
|
||||
use crate::error::{
|
||||
CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, PlanStatementSnafu,
|
||||
Result, SchemaNotFoundSnafu, TableNotFoundSnafu,
|
||||
};
|
||||
use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct StatementExecutor {
|
||||
@@ -92,14 +100,23 @@ impl StatementExecutor {
|
||||
|
||||
Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
|
||||
|
||||
Statement::Copy(stmt) => {
|
||||
Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
|
||||
let req = to_copy_table_request(stmt, query_ctx)?;
|
||||
match req.direction {
|
||||
CopyDirection::Export => self.copy_table_to(req).await,
|
||||
CopyDirection::Import => self.copy_table_from(req).await,
|
||||
CopyDirection::Export => {
|
||||
self.copy_table_to(req).await.map(Output::AffectedRows)
|
||||
}
|
||||
CopyDirection::Import => {
|
||||
self.copy_table_from(req).await.map(Output::AffectedRows)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Statement::Copy(sql::statements::copy::Copy::CopyDatabase(arg)) => {
|
||||
self.copy_database(to_copy_database_request(arg, &query_ctx)?)
|
||||
.await
|
||||
}
|
||||
|
||||
Statement::CreateDatabase(_)
|
||||
| Statement::CreateTable(_)
|
||||
| Statement::CreateExternalTable(_)
|
||||
@@ -191,5 +208,47 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<
|
||||
connection,
|
||||
pattern,
|
||||
direction,
|
||||
// we copy the whole table by default.
|
||||
timestamp_range: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Converts [CopyDatabaseArgument] to [CopyDatabaseRequest].
|
||||
/// This function extracts the necessary info including catalog/database name, time range, etc.
|
||||
fn to_copy_database_request(
|
||||
arg: CopyDatabaseArgument,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<CopyDatabaseRequest> {
|
||||
let (catalog_name, database_name) = idents_to_full_database_name(&arg.database_name, query_ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
let start_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_START_KEY)?;
|
||||
let end_timestamp = extract_timestamp(&arg.with, COPY_DATABASE_TIME_END_KEY)?;
|
||||
|
||||
let time_range = match (start_timestamp, end_timestamp) {
|
||||
(Some(start), Some(end)) => TimestampRange::new(start, end),
|
||||
(Some(start), None) => Some(TimestampRange::from_start(start)),
|
||||
(None, Some(end)) => Some(TimestampRange::until_end(end, false)), // exclusive end
|
||||
(None, None) => None,
|
||||
};
|
||||
|
||||
Ok(CopyDatabaseRequest {
|
||||
catalog_name,
|
||||
schema_name: database_name,
|
||||
location: arg.location,
|
||||
with: arg.with,
|
||||
connection: arg.connection,
|
||||
time_range,
|
||||
})
|
||||
}
|
||||
|
||||
/// Extracts timestamp from a [HashMap<String, String>] with given key.
|
||||
fn extract_timestamp(map: &HashMap<String, String>, key: &str) -> Result<Option<Timestamp>> {
|
||||
map.get(key)
|
||||
.map(|v| {
|
||||
Timestamp::from_str(v)
|
||||
.map_err(|_| error::InvalidCopyParameterSnafu { key, value: v }.build())
|
||||
})
|
||||
.transpose()
|
||||
}
|
||||
|
||||
97
src/frontend/src/statement/backup.rs
Normal file
97
src/frontend/src/statement/backup.rs
Normal file
@@ -0,0 +1,97 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_datasource::file_format::Format;
|
||||
use common_query::Output;
|
||||
use common_telemetry::info;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::{
|
||||
CatalogNotFoundSnafu, CatalogSnafu, InvalidCopyParameterSnafu, SchemaNotFoundSnafu,
|
||||
};
|
||||
use crate::statement::StatementExecutor;
|
||||
|
||||
pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time";
|
||||
pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time";
|
||||
|
||||
impl StatementExecutor {
|
||||
pub(crate) async fn copy_database(&self, req: CopyDatabaseRequest) -> error::Result<Output> {
|
||||
// location must end with / so that every table is exported to a file.
|
||||
ensure!(
|
||||
req.location.ends_with('/'),
|
||||
InvalidCopyParameterSnafu {
|
||||
key: "location",
|
||||
value: req.location,
|
||||
}
|
||||
);
|
||||
|
||||
info!(
|
||||
"Copy database {}.{}, dir: {},. time: {:?}",
|
||||
req.catalog_name, req.schema_name, req.location, req.time_range
|
||||
);
|
||||
let schema = self
|
||||
.catalog_manager
|
||||
.catalog(&req.catalog_name)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.context(CatalogNotFoundSnafu {
|
||||
catalog_name: &req.catalog_name,
|
||||
})?
|
||||
.schema(&req.schema_name)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.context(SchemaNotFoundSnafu {
|
||||
schema_info: &req.schema_name,
|
||||
})?;
|
||||
|
||||
let suffix = Format::try_from(&req.with)
|
||||
.context(error::ParseFileFormatSnafu)?
|
||||
.suffix();
|
||||
|
||||
let table_names = schema.table_names().await.context(CatalogSnafu)?;
|
||||
|
||||
let mut exported_rows = 0;
|
||||
for table_name in table_names {
|
||||
// TODO(hl): remove this hardcode once we've removed numbers table.
|
||||
if table_name == "numbers" {
|
||||
continue;
|
||||
}
|
||||
let mut table_file = req.location.clone();
|
||||
table_file.push_str(&table_name);
|
||||
table_file.push_str(suffix);
|
||||
info!(
|
||||
"Copy table: {}.{}.{} to {}",
|
||||
req.catalog_name, req.schema_name, table_name, table_file
|
||||
);
|
||||
|
||||
let exported = self
|
||||
.copy_table_to(CopyTableRequest {
|
||||
catalog_name: req.catalog_name.clone(),
|
||||
schema_name: req.schema_name.clone(),
|
||||
table_name,
|
||||
location: table_file,
|
||||
with: req.with.clone(),
|
||||
connection: req.connection.clone(),
|
||||
pattern: None,
|
||||
direction: CopyDirection::Export,
|
||||
timestamp_range: req.time_range,
|
||||
})
|
||||
.await?;
|
||||
exported_rows += exported;
|
||||
}
|
||||
Ok(Output::AffectedRows(exported_rows))
|
||||
}
|
||||
}
|
||||
@@ -24,7 +24,6 @@ use common_datasource::file_format::{FileFormat, Format};
|
||||
use common_datasource::lister::{Lister, Source};
|
||||
use common_datasource::object_store::{build_backend, parse_url};
|
||||
use common_datasource::util::find_dir_and_filename;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::adapter::ParquetRecordBatchStreamAdapter;
|
||||
use common_recordbatch::DfSendableRecordBatchStream;
|
||||
use datafusion::datasource::listing::PartitionedFile;
|
||||
@@ -205,7 +204,7 @@ impl StatementExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn copy_table_from(&self, req: CopyTableRequest) -> Result<Output> {
|
||||
pub async fn copy_table_from(&self, req: CopyTableRequest) -> Result<usize> {
|
||||
let table_ref = TableReference {
|
||||
catalog: &req.catalog_name,
|
||||
schema: &req.schema_name,
|
||||
@@ -313,7 +312,7 @@ impl StatementExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Output::AffectedRows(rows_inserted))
|
||||
Ok(rows_inserted)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ use common_datasource::file_format::json::stream_to_json;
|
||||
use common_datasource::file_format::Format;
|
||||
use common_datasource::object_store::{build_backend, parse_url};
|
||||
use common_query::physical_plan::SessionContext;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use object_store::ObjectStore;
|
||||
@@ -72,7 +71,7 @@ impl StatementExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result<Output> {
|
||||
pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result<usize> {
|
||||
let table_ref = TableReference {
|
||||
catalog: &req.catalog_name,
|
||||
schema: &req.schema_name,
|
||||
@@ -82,12 +81,25 @@ impl StatementExecutor {
|
||||
|
||||
let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
|
||||
|
||||
let stream = table
|
||||
.scan(None, &[], None)
|
||||
.await
|
||||
.with_context(|_| error::CopyTableSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
let filters = table
|
||||
.schema()
|
||||
.timestamp_column()
|
||||
.and_then(|c| {
|
||||
common_query::logical_plan::build_filter_from_timestamp(
|
||||
&c.name,
|
||||
req.timestamp_range.as_ref(),
|
||||
)
|
||||
})
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let stream =
|
||||
table
|
||||
.scan(None, &filters, None)
|
||||
.await
|
||||
.with_context(|_| error::CopyTableSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
|
||||
let stream = stream
|
||||
.execute(0, SessionContext::default().task_ctx())
|
||||
@@ -101,6 +113,6 @@ impl StatementExecutor {
|
||||
.stream_to_file(stream, &format, object_store, &path)
|
||||
.await?;
|
||||
|
||||
Ok(Output::AffectedRows(rows_copied))
|
||||
Ok(rows_copied)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
#![feature(box_patterns)]
|
||||
#![feature(assert_matches)]
|
||||
#![feature(let_chains)]
|
||||
|
||||
pub mod ast;
|
||||
pub mod dialect;
|
||||
|
||||
@@ -12,22 +12,60 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use snafu::ResultExt;
|
||||
use sqlparser::ast::ObjectName;
|
||||
use sqlparser::keywords::Keyword;
|
||||
use sqlparser::tokenizer::Token::Word;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::parser::ParserContext;
|
||||
use crate::statements::copy::{CopyTable, CopyTableArgument};
|
||||
use crate::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument};
|
||||
use crate::statements::statement::Statement;
|
||||
use crate::util::parse_option_string;
|
||||
|
||||
pub type With = HashMap<String, String>;
|
||||
pub type Connection = HashMap<String, String>;
|
||||
|
||||
// COPY tbl TO 'output.parquet';
|
||||
impl<'a> ParserContext<'a> {
|
||||
pub(crate) fn parse_copy(&mut self) -> Result<Statement> {
|
||||
self.parser.next_token();
|
||||
let copy_table = self.parse_copy_table()?;
|
||||
Ok(Statement::Copy(copy_table))
|
||||
let next = self.parser.peek_token();
|
||||
let copy = if let Word(word) = next.token && word.keyword == Keyword::DATABASE {
|
||||
self.parser.next_token();
|
||||
let copy_database = self.parser_copy_database()?;
|
||||
crate::statements::copy::Copy::CopyDatabase(copy_database)
|
||||
} else {
|
||||
let copy_table = self.parse_copy_table()?;
|
||||
crate::statements::copy::Copy::CopyTable(copy_table)
|
||||
};
|
||||
|
||||
Ok(Statement::Copy(copy))
|
||||
}
|
||||
|
||||
fn parser_copy_database(&mut self) -> Result<CopyDatabaseArgument> {
|
||||
let database_name =
|
||||
self.parser
|
||||
.parse_object_name()
|
||||
.with_context(|_| error::UnexpectedSnafu {
|
||||
sql: self.sql,
|
||||
expected: "a database name",
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
|
||||
self.parser
|
||||
.expect_keyword(Keyword::TO)
|
||||
.context(error::SyntaxSnafu { sql: self.sql })?;
|
||||
|
||||
let (with, connection, location) = self.parse_copy_to()?;
|
||||
Ok(CopyDatabaseArgument {
|
||||
database_name,
|
||||
with,
|
||||
connection,
|
||||
location,
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_copy_table(&mut self) -> Result<CopyTable> {
|
||||
@@ -41,7 +79,13 @@ impl<'a> ParserContext<'a> {
|
||||
})?;
|
||||
|
||||
if self.parser.parse_keyword(Keyword::TO) {
|
||||
Ok(CopyTable::To(self.parse_copy_table_to(table_name)?))
|
||||
let (with, connection, location) = self.parse_copy_to()?;
|
||||
Ok(CopyTable::To(CopyTableArgument {
|
||||
table_name,
|
||||
with,
|
||||
connection,
|
||||
location,
|
||||
}))
|
||||
} else {
|
||||
self.parser
|
||||
.expect_keyword(Keyword::FROM)
|
||||
@@ -91,7 +135,7 @@ impl<'a> ParserContext<'a> {
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_copy_table_to(&mut self, table_name: ObjectName) -> Result<CopyTableArgument> {
|
||||
fn parse_copy_to(&mut self) -> Result<(With, Connection, String)> {
|
||||
let location =
|
||||
self.parser
|
||||
.parse_literal_string()
|
||||
@@ -125,12 +169,7 @@ impl<'a> ParserContext<'a> {
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(CopyTableArgument {
|
||||
table_name,
|
||||
with,
|
||||
connection,
|
||||
location,
|
||||
})
|
||||
Ok((with, connection, location))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,8 +178,11 @@ mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use sqlparser::ast::Ident;
|
||||
|
||||
use super::*;
|
||||
use crate::dialect::GreptimeDbDialect;
|
||||
use crate::statements::statement::Statement::Copy;
|
||||
|
||||
#[test]
|
||||
fn test_parse_copy_table() {
|
||||
@@ -155,7 +197,8 @@ mod tests {
|
||||
let statement = result.remove(0);
|
||||
assert_matches!(statement, Statement::Copy { .. });
|
||||
match statement {
|
||||
Statement::Copy(CopyTable::To(copy_table)) => {
|
||||
Copy(copy) => {
|
||||
let crate::statements::copy::Copy::CopyTable(CopyTable::To(copy_table)) = copy else { unreachable!() };
|
||||
let (catalog, schema, table) =
|
||||
if let [catalog, schema, table] = ©_table.table_name.0[..] {
|
||||
(
|
||||
@@ -198,7 +241,9 @@ mod tests {
|
||||
let statement = result.remove(0);
|
||||
assert_matches!(statement, Statement::Copy { .. });
|
||||
match statement {
|
||||
Statement::Copy(CopyTable::From(copy_table)) => {
|
||||
Statement::Copy(crate::statements::copy::Copy::CopyTable(CopyTable::From(
|
||||
copy_table,
|
||||
))) => {
|
||||
let (catalog, schema, table) =
|
||||
if let [catalog, schema, table] = ©_table.table_name.0[..] {
|
||||
(
|
||||
@@ -254,7 +299,9 @@ mod tests {
|
||||
let statement = result.remove(0);
|
||||
assert_matches!(statement, Statement::Copy { .. });
|
||||
match statement {
|
||||
Statement::Copy(CopyTable::From(copy_table)) => {
|
||||
Statement::Copy(crate::statements::copy::Copy::CopyTable(CopyTable::From(
|
||||
copy_table,
|
||||
))) => {
|
||||
if let Some(expected_pattern) = test.expected_pattern {
|
||||
assert_eq!(copy_table.pattern().unwrap(), expected_pattern);
|
||||
}
|
||||
@@ -295,11 +342,44 @@ mod tests {
|
||||
let statement = result.remove(0);
|
||||
assert_matches!(statement, Statement::Copy { .. });
|
||||
match statement {
|
||||
Statement::Copy(CopyTable::To(copy_table)) => {
|
||||
Statement::Copy(crate::statements::copy::Copy::CopyTable(CopyTable::To(
|
||||
copy_table,
|
||||
))) => {
|
||||
assert_eq!(copy_table.connection.clone(), test.expected_connection);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_copy_database_to() {
|
||||
let sql = "COPY DATABASE catalog0.schema0 TO 'tbl_file.parquet' WITH (FORMAT = 'parquet') CONNECTION (FOO='Bar', ONE='two')";
|
||||
let stmt = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {})
|
||||
.unwrap()
|
||||
.pop()
|
||||
.unwrap();
|
||||
|
||||
let Statement::Copy(crate::statements::copy::Copy::CopyDatabase(stmt)) = stmt else { unreachable!() };
|
||||
assert_eq!(
|
||||
ObjectName(vec![Ident::new("catalog0"), Ident::new("schema0")]),
|
||||
stmt.database_name
|
||||
);
|
||||
assert_eq!(
|
||||
[("format".to_string(), "parquet".to_string())]
|
||||
.into_iter()
|
||||
.collect::<HashMap<_, _>>(),
|
||||
stmt.with
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
[
|
||||
("foo".to_string(), "Bar".to_string()),
|
||||
("one".to_string(), "two".to_string())
|
||||
]
|
||||
.into_iter()
|
||||
.collect::<HashMap<_, _>>(),
|
||||
stmt.connection
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,12 +15,27 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use sqlparser::ast::ObjectName;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum Copy {
|
||||
CopyTable(CopyTable),
|
||||
CopyDatabase(CopyDatabaseArgument),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum CopyTable {
|
||||
To(CopyTableArgument),
|
||||
From(CopyTableArgument),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct CopyDatabaseArgument {
|
||||
pub database_name: ObjectName,
|
||||
pub with: HashMap<String, String>,
|
||||
pub connection: HashMap<String, String>,
|
||||
pub location: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct CopyTableArgument {
|
||||
pub table_name: ObjectName,
|
||||
|
||||
@@ -17,7 +17,6 @@ use sqlparser::ast::Statement as SpStatement;
|
||||
|
||||
use crate::error::{ConvertToDfStatementSnafu, Error};
|
||||
use crate::statements::alter::AlterTable;
|
||||
use crate::statements::copy::CopyTable;
|
||||
use crate::statements::create::{CreateDatabase, CreateExternalTable, CreateTable};
|
||||
use crate::statements::delete::Delete;
|
||||
use crate::statements::describe::DescribeTable;
|
||||
@@ -60,7 +59,7 @@ pub enum Statement {
|
||||
Explain(Explain),
|
||||
Use(String),
|
||||
// COPY
|
||||
Copy(CopyTable),
|
||||
Copy(crate::statements::copy::Copy),
|
||||
Tql(Tql),
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ use std::time::Duration;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_query::AddColumnLocation;
|
||||
use common_time::range::TimestampRange;
|
||||
use datatypes::prelude::VectorRef;
|
||||
use datatypes::schema::{ColumnSchema, RawSchema};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -268,6 +269,7 @@ pub struct CopyTableRequest {
|
||||
pub connection: HashMap<String, String>,
|
||||
pub pattern: Option<String>,
|
||||
pub direction: CopyDirection,
|
||||
pub timestamp_range: Option<TimestampRange>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
@@ -293,6 +295,16 @@ macro_rules! meter_insert_request {
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
|
||||
pub struct CopyDatabaseRequest {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
pub location: String,
|
||||
pub with: HashMap<String, String>,
|
||||
pub connection: HashMap<String, String>,
|
||||
pub time_range: Option<TimestampRange>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
Reference in New Issue
Block a user