mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 05:42:57 +00:00
feat: copy database from (#3164)
* wip: impl COPY DATABASE FROM parser * wip: impl copy database from * wip: add some ut * wip: add continue_on_error option * test: add sqlness cases for copy database * fix: trailing newline * fix: typo * fix: some cr comments * chore: resolve confilicts * fix: some cr comments
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5765,6 +5765,7 @@ dependencies = [
|
||||
"common-recordbatch",
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
"common-test-util",
|
||||
"common-time",
|
||||
"datafusion",
|
||||
"datafusion-common",
|
||||
|
||||
@@ -71,7 +71,7 @@ use session::context::QueryContextRef;
|
||||
use snafu::prelude::*;
|
||||
use sql::dialect::Dialect;
|
||||
use sql::parser::{ParseOptions, ParserContext};
|
||||
use sql::statements::copy::CopyTable;
|
||||
use sql::statements::copy::{CopyDatabase, CopyTable};
|
||||
use sql::statements::statement::Statement;
|
||||
use sqlparser::ast::ObjectName;
|
||||
pub use standalone::StandaloneDatanodeManager;
|
||||
@@ -487,8 +487,11 @@ pub fn check_permission(
|
||||
validate_param(©_table_from.table_name, query_ctx)?
|
||||
}
|
||||
},
|
||||
Statement::Copy(sql::statements::copy::Copy::CopyDatabase(stmt)) => {
|
||||
validate_param(&stmt.database_name, query_ctx)?
|
||||
Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
|
||||
match copy_database {
|
||||
CopyDatabase::To(stmt) => validate_param(&stmt.database_name, query_ctx)?,
|
||||
CopyDatabase::From(stmt) => validate_param(&stmt.database_name, query_ctx)?,
|
||||
}
|
||||
}
|
||||
Statement::TruncateTable(stmt) => {
|
||||
validate_param(stmt.table_name(), query_ctx)?;
|
||||
|
||||
@@ -54,3 +54,6 @@ substrait.workspace = true
|
||||
table.workspace = true
|
||||
tokio.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
common-test-util.workspace = true
|
||||
|
||||
@@ -418,6 +418,9 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid COPY DATABASE location, must end with '/': {}", value))]
|
||||
InvalidCopyDatabasePath { value: String, location: Location },
|
||||
|
||||
#[snafu(display("Table metadata manager error"))]
|
||||
TableMetadataManager {
|
||||
source: common_meta::error::Error,
|
||||
@@ -596,7 +599,9 @@ impl ErrorExt for Error {
|
||||
| Error::BuildBackend { source, .. } => source.status_code(),
|
||||
|
||||
Error::ExecuteDdl { source, .. } => source.status_code(),
|
||||
Error::InvalidCopyParameter { .. } => StatusCode::InvalidArguments,
|
||||
Error::InvalidCopyParameter { .. } | Error::InvalidCopyDatabasePath { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
|
||||
Error::ReadRecordBatch { source, .. } | Error::BuildColumnVectors { source, .. } => {
|
||||
source.status_code()
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod backup;
|
||||
mod copy_database;
|
||||
mod copy_table_from;
|
||||
mod copy_table_to;
|
||||
mod ddl;
|
||||
@@ -41,7 +41,7 @@ use query::plan::LogicalPlan;
|
||||
use query::QueryEngineRef;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use sql::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument};
|
||||
use sql::statements::copy::{CopyDatabase, CopyDatabaseArgument, CopyTable, CopyTableArgument};
|
||||
use sql::statements::statement::Statement;
|
||||
use sql::statements::OptionMap;
|
||||
use sql::util::format_raw_object_name;
|
||||
@@ -55,7 +55,7 @@ use crate::error::{
|
||||
PlanStatementSnafu, Result, TableNotFoundSnafu,
|
||||
};
|
||||
use crate::insert::InserterRef;
|
||||
use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
|
||||
use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
|
||||
use crate::table::table_idents_to_full_name;
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -131,9 +131,23 @@ impl StatementExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
Statement::Copy(sql::statements::copy::Copy::CopyDatabase(arg)) => {
|
||||
self.copy_database(to_copy_database_request(arg, &query_ctx)?)
|
||||
.await
|
||||
Statement::Copy(sql::statements::copy::Copy::CopyDatabase(copy_database)) => {
|
||||
match copy_database {
|
||||
CopyDatabase::To(arg) => {
|
||||
self.copy_database_to(
|
||||
to_copy_database_request(arg, &query_ctx)?,
|
||||
query_ctx.clone(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
CopyDatabase::From(arg) => {
|
||||
self.copy_database_from(
|
||||
to_copy_database_request(arg, &query_ctx)?,
|
||||
query_ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Statement::CreateTable(stmt) => {
|
||||
|
||||
@@ -1,89 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_datasource::file_format::Format;
|
||||
use common_query::Output;
|
||||
use common_telemetry::{info, tracing};
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::{CatalogSnafu, InvalidCopyParameterSnafu};
|
||||
use crate::statement::StatementExecutor;
|
||||
|
||||
pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time";
|
||||
pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time";
|
||||
|
||||
impl StatementExecutor {
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn copy_database(&self, req: CopyDatabaseRequest) -> error::Result<Output> {
|
||||
// location must end with / so that every table is exported to a file.
|
||||
ensure!(
|
||||
req.location.ends_with('/'),
|
||||
InvalidCopyParameterSnafu {
|
||||
key: "location",
|
||||
value: req.location,
|
||||
}
|
||||
);
|
||||
|
||||
info!(
|
||||
"Copy database {}.{}, dir: {},. time: {:?}",
|
||||
req.catalog_name, req.schema_name, req.location, req.time_range
|
||||
);
|
||||
let table_names = self
|
||||
.catalog_manager
|
||||
.table_names(&req.catalog_name, &req.schema_name)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
|
||||
let suffix = Format::try_from(&req.with)
|
||||
.context(error::ParseFileFormatSnafu)?
|
||||
.suffix();
|
||||
|
||||
let mut exported_rows = 0;
|
||||
for table_name in table_names {
|
||||
// TODO(hl): remove this hardcode once we've removed numbers table.
|
||||
if table_name == "numbers" {
|
||||
continue;
|
||||
}
|
||||
let mut table_file = req.location.clone();
|
||||
table_file.push_str(&table_name);
|
||||
table_file.push_str(suffix);
|
||||
info!(
|
||||
"Copy table: {}.{}.{} to {}",
|
||||
req.catalog_name, req.schema_name, table_name, table_file
|
||||
);
|
||||
|
||||
let exported = self
|
||||
.copy_table_to(
|
||||
CopyTableRequest {
|
||||
catalog_name: req.catalog_name.clone(),
|
||||
schema_name: req.schema_name.clone(),
|
||||
table_name,
|
||||
location: table_file,
|
||||
with: req.with.clone(),
|
||||
connection: req.connection.clone(),
|
||||
pattern: None,
|
||||
direction: CopyDirection::Export,
|
||||
timestamp_range: req.time_range,
|
||||
},
|
||||
QueryContextBuilder::default().build(),
|
||||
)
|
||||
.await?;
|
||||
exported_rows += exported;
|
||||
}
|
||||
Ok(Output::AffectedRows(exported_rows))
|
||||
}
|
||||
}
|
||||
250
src/operator/src/statement/copy_database.rs
Normal file
250
src/operator/src/statement/copy_database.rs
Normal file
@@ -0,0 +1,250 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
|
||||
use common_datasource::file_format::Format;
|
||||
use common_datasource::lister::{Lister, Source};
|
||||
use common_datasource::object_store::build_backend;
|
||||
use common_query::Output;
|
||||
use common_telemetry::{debug, error, info, tracing};
|
||||
use object_store::Entry;
|
||||
use regex::Regex;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::{CatalogSnafu, InvalidCopyDatabasePathSnafu};
|
||||
use crate::statement::StatementExecutor;
|
||||
|
||||
pub(crate) const COPY_DATABASE_TIME_START_KEY: &str = "start_time";
|
||||
pub(crate) const COPY_DATABASE_TIME_END_KEY: &str = "end_time";
|
||||
pub(crate) const CONTINUE_ON_ERROR_KEY: &str = "continue_on_error";
|
||||
|
||||
impl StatementExecutor {
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn copy_database_to(
|
||||
&self,
|
||||
req: CopyDatabaseRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> error::Result<Output> {
|
||||
// location must end with / so that every table is exported to a file.
|
||||
ensure!(
|
||||
req.location.ends_with('/'),
|
||||
InvalidCopyDatabasePathSnafu {
|
||||
value: req.location,
|
||||
}
|
||||
);
|
||||
|
||||
info!(
|
||||
"Copy database {}.{} to dir: {}, time: {:?}",
|
||||
req.catalog_name, req.schema_name, req.location, req.time_range
|
||||
);
|
||||
let table_names = self
|
||||
.catalog_manager
|
||||
.table_names(&req.catalog_name, &req.schema_name)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
|
||||
let suffix = Format::try_from(&req.with)
|
||||
.context(error::ParseFileFormatSnafu)?
|
||||
.suffix();
|
||||
|
||||
let mut exported_rows = 0;
|
||||
for table_name in table_names {
|
||||
// TODO(hl): also handles tables with metric engine.
|
||||
// TODO(hl): remove this hardcode once we've removed numbers table.
|
||||
if table_name == "numbers" {
|
||||
continue;
|
||||
}
|
||||
let mut table_file = req.location.clone();
|
||||
table_file.push_str(&table_name);
|
||||
table_file.push_str(suffix);
|
||||
info!(
|
||||
"Copy table: {}.{}.{} to {}",
|
||||
req.catalog_name, req.schema_name, table_name, table_file
|
||||
);
|
||||
|
||||
let exported = self
|
||||
.copy_table_to(
|
||||
CopyTableRequest {
|
||||
catalog_name: req.catalog_name.clone(),
|
||||
schema_name: req.schema_name.clone(),
|
||||
table_name,
|
||||
location: table_file,
|
||||
with: req.with.clone(),
|
||||
connection: req.connection.clone(),
|
||||
pattern: None,
|
||||
direction: CopyDirection::Export,
|
||||
timestamp_range: req.time_range,
|
||||
},
|
||||
ctx.clone(),
|
||||
)
|
||||
.await?;
|
||||
exported_rows += exported;
|
||||
}
|
||||
Ok(Output::AffectedRows(exported_rows))
|
||||
}
|
||||
|
||||
/// Imports data to database from a given location and returns total rows imported.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn copy_database_from(
|
||||
&self,
|
||||
req: CopyDatabaseRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> error::Result<Output> {
|
||||
// location must end with /
|
||||
ensure!(
|
||||
req.location.ends_with('/'),
|
||||
InvalidCopyDatabasePathSnafu {
|
||||
value: req.location,
|
||||
}
|
||||
);
|
||||
|
||||
info!(
|
||||
"Copy database {}.{} from dir: {}, time: {:?}",
|
||||
req.catalog_name, req.schema_name, req.location, req.time_range
|
||||
);
|
||||
let suffix = Format::try_from(&req.with)
|
||||
.context(error::ParseFileFormatSnafu)?
|
||||
.suffix();
|
||||
|
||||
let entries = list_files_to_copy(&req, suffix).await?;
|
||||
|
||||
let continue_on_error = req
|
||||
.with
|
||||
.get(CONTINUE_ON_ERROR_KEY)
|
||||
.and_then(|v| bool::from_str(v).ok())
|
||||
.unwrap_or(false);
|
||||
let mut rows_inserted = 0;
|
||||
|
||||
for e in entries {
|
||||
let table_name = match parse_file_name_to_copy(&e) {
|
||||
Ok(table_name) => table_name,
|
||||
Err(err) => {
|
||||
if continue_on_error {
|
||||
error!(err; "Failed to import table from file: {:?}", e);
|
||||
continue;
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
};
|
||||
let req = CopyTableRequest {
|
||||
catalog_name: req.catalog_name.clone(),
|
||||
schema_name: req.schema_name.clone(),
|
||||
table_name: table_name.clone(),
|
||||
location: format!("{}/{}", req.location, e.path()),
|
||||
with: req.with.clone(),
|
||||
connection: req.connection.clone(),
|
||||
pattern: None,
|
||||
direction: CopyDirection::Import,
|
||||
timestamp_range: None,
|
||||
};
|
||||
debug!("Copy table, arg: {:?}", req);
|
||||
match self.copy_table_from(req, ctx.clone()).await {
|
||||
Ok(rows) => {
|
||||
rows_inserted += rows;
|
||||
}
|
||||
Err(err) => {
|
||||
if continue_on_error {
|
||||
error!(err; "Failed to import file to table: {}", table_name);
|
||||
continue;
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Output::AffectedRows(rows_inserted))
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses table names from files' names.
|
||||
fn parse_file_name_to_copy(e: &Entry) -> error::Result<String> {
|
||||
Path::new(e.name())
|
||||
.file_stem()
|
||||
.and_then(|os_str| os_str.to_str())
|
||||
.map(|s| s.to_string())
|
||||
.context(error::InvalidTableNameSnafu {
|
||||
table_name: e.name().to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Lists all files with expected suffix that can be imported to database.
|
||||
async fn list_files_to_copy(req: &CopyDatabaseRequest, suffix: &str) -> error::Result<Vec<Entry>> {
|
||||
let object_store =
|
||||
build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?;
|
||||
|
||||
let pattern = Regex::try_from(format!(".*{}", suffix)).context(error::BuildRegexSnafu)?;
|
||||
let lister = Lister::new(
|
||||
object_store.clone(),
|
||||
Source::Dir,
|
||||
"/".to_string(),
|
||||
Some(pattern),
|
||||
);
|
||||
lister.list().await.context(error::ListObjectsSnafu)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use object_store::services::Fs;
|
||||
use object_store::util::normalize_dir;
|
||||
use object_store::ObjectStore;
|
||||
use table::requests::CopyDatabaseRequest;
|
||||
|
||||
use crate::statement::copy_database::{list_files_to_copy, parse_file_name_to_copy};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_files_and_parse_table_name() {
|
||||
let dir = common_test_util::temp_dir::create_temp_dir("test_list_files_to_copy");
|
||||
let store_dir = normalize_dir(dir.path().to_str().unwrap());
|
||||
let mut builder = Fs::default();
|
||||
let _ = builder.root(&store_dir);
|
||||
let object_store = ObjectStore::new(builder).unwrap().finish();
|
||||
object_store.write("a.parquet", "").await.unwrap();
|
||||
object_store.write("b.parquet", "").await.unwrap();
|
||||
object_store.write("c.csv", "").await.unwrap();
|
||||
object_store.write("d", "").await.unwrap();
|
||||
object_store.write("e.f.parquet", "").await.unwrap();
|
||||
|
||||
let request = CopyDatabaseRequest {
|
||||
catalog_name: "catalog_0".to_string(),
|
||||
schema_name: "schema_0".to_string(),
|
||||
location: store_dir,
|
||||
with: [("FORMAT".to_string(), "parquet".to_string())]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
connection: Default::default(),
|
||||
time_range: None,
|
||||
};
|
||||
let listed = list_files_to_copy(&request, ".parquet")
|
||||
.await
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|e| parse_file_name_to_copy(&e).unwrap())
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
assert_eq!(
|
||||
["a".to_string(), "b".to_string(), "e.f".to_string()]
|
||||
.into_iter()
|
||||
.collect::<HashSet<_>>(),
|
||||
listed
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -111,15 +111,17 @@ impl StatementExecutor {
|
||||
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
|
||||
let table_source = Arc::new(DefaultTableSource::new(table_provider));
|
||||
|
||||
let plan = LogicalPlanBuilder::scan_with_filters(
|
||||
let mut builder = LogicalPlanBuilder::scan_with_filters(
|
||||
df_table_ref.to_owned_reference(),
|
||||
table_source,
|
||||
None,
|
||||
filters,
|
||||
filters.clone(),
|
||||
)
|
||||
.context(BuildDfLogicalPlanSnafu)?
|
||||
.build()
|
||||
.context(BuildDfLogicalPlanSnafu)?;
|
||||
for f in filters {
|
||||
builder = builder.filter(f).context(BuildDfLogicalPlanSnafu)?;
|
||||
}
|
||||
let plan = builder.build().context(BuildDfLogicalPlanSnafu)?;
|
||||
|
||||
let output = self
|
||||
.query_engine
|
||||
|
||||
@@ -15,13 +15,12 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use snafu::ResultExt;
|
||||
use sqlparser::ast::ObjectName;
|
||||
use sqlparser::keywords::Keyword;
|
||||
use sqlparser::tokenizer::Token::Word;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::parser::ParserContext;
|
||||
use crate::statements::copy::{CopyDatabaseArgument, CopyTable, CopyTableArgument};
|
||||
use crate::statements::copy::{CopyDatabase, CopyDatabaseArgument, CopyTable, CopyTableArgument};
|
||||
use crate::statements::statement::Statement;
|
||||
use crate::util::parse_option_string;
|
||||
|
||||
@@ -47,7 +46,7 @@ impl<'a> ParserContext<'a> {
|
||||
Ok(Statement::Copy(copy))
|
||||
}
|
||||
|
||||
fn parser_copy_database(&mut self) -> Result<CopyDatabaseArgument> {
|
||||
fn parser_copy_database(&mut self) -> Result<CopyDatabase> {
|
||||
let database_name =
|
||||
self.parser
|
||||
.parse_object_name()
|
||||
@@ -57,17 +56,29 @@ impl<'a> ParserContext<'a> {
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
|
||||
self.parser
|
||||
.expect_keyword(Keyword::TO)
|
||||
.context(error::SyntaxSnafu)?;
|
||||
|
||||
let (with, connection, location) = self.parse_copy_to()?;
|
||||
Ok(CopyDatabaseArgument {
|
||||
database_name,
|
||||
with: with.into(),
|
||||
connection: connection.into(),
|
||||
location,
|
||||
})
|
||||
let req = if self.parser.parse_keyword(Keyword::TO) {
|
||||
let (with, connection, location) = self.parse_copy_parameters()?;
|
||||
let argument = CopyDatabaseArgument {
|
||||
database_name,
|
||||
with: with.into(),
|
||||
connection: connection.into(),
|
||||
location,
|
||||
};
|
||||
CopyDatabase::To(argument)
|
||||
} else {
|
||||
self.parser
|
||||
.expect_keyword(Keyword::FROM)
|
||||
.context(error::SyntaxSnafu)?;
|
||||
let (with, connection, location) = self.parse_copy_parameters()?;
|
||||
let argument = CopyDatabaseArgument {
|
||||
database_name,
|
||||
with: with.into(),
|
||||
connection: connection.into(),
|
||||
location,
|
||||
};
|
||||
CopyDatabase::From(argument)
|
||||
};
|
||||
Ok(req)
|
||||
}
|
||||
|
||||
fn parse_copy_table(&mut self) -> Result<CopyTable> {
|
||||
@@ -82,7 +93,7 @@ impl<'a> ParserContext<'a> {
|
||||
let table_name = Self::canonicalize_object_name(raw_table_name);
|
||||
|
||||
if self.parser.parse_keyword(Keyword::TO) {
|
||||
let (with, connection, location) = self.parse_copy_to()?;
|
||||
let (with, connection, location) = self.parse_copy_parameters()?;
|
||||
Ok(CopyTable::To(CopyTableArgument {
|
||||
table_name,
|
||||
with: with.into(),
|
||||
@@ -93,52 +104,17 @@ impl<'a> ParserContext<'a> {
|
||||
self.parser
|
||||
.expect_keyword(Keyword::FROM)
|
||||
.context(error::SyntaxSnafu)?;
|
||||
Ok(CopyTable::From(self.parse_copy_table_from(table_name)?))
|
||||
let (with, connection, location) = self.parse_copy_parameters()?;
|
||||
Ok(CopyTable::From(CopyTableArgument {
|
||||
table_name,
|
||||
with: with.into(),
|
||||
connection: connection.into(),
|
||||
location,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_copy_table_from(&mut self, table_name: ObjectName) -> Result<CopyTableArgument> {
|
||||
let location =
|
||||
self.parser
|
||||
.parse_literal_string()
|
||||
.with_context(|_| error::UnexpectedSnafu {
|
||||
sql: self.sql,
|
||||
expected: "a uri",
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
|
||||
let options = self
|
||||
.parser
|
||||
.parse_options(Keyword::WITH)
|
||||
.context(error::SyntaxSnafu)?;
|
||||
|
||||
let with = options
|
||||
.into_iter()
|
||||
.filter_map(|option| {
|
||||
parse_option_string(option.value).map(|v| (option.name.value.to_lowercase(), v))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let connection_options = self
|
||||
.parser
|
||||
.parse_options(Keyword::CONNECTION)
|
||||
.context(error::SyntaxSnafu)?;
|
||||
|
||||
let connection = connection_options
|
||||
.into_iter()
|
||||
.filter_map(|option| {
|
||||
parse_option_string(option.value).map(|v| (option.name.value.to_lowercase(), v))
|
||||
})
|
||||
.collect();
|
||||
Ok(CopyTableArgument {
|
||||
table_name,
|
||||
with,
|
||||
connection,
|
||||
location,
|
||||
})
|
||||
}
|
||||
|
||||
fn parse_copy_to(&mut self) -> Result<(With, Connection, String)> {
|
||||
fn parse_copy_parameters(&mut self) -> Result<(With, Connection, String)> {
|
||||
let location =
|
||||
self.parser
|
||||
.parse_literal_string()
|
||||
@@ -181,7 +157,7 @@ mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use sqlparser::ast::Ident;
|
||||
use sqlparser::ast::{Ident, ObjectName};
|
||||
|
||||
use super::*;
|
||||
use crate::dialect::GreptimeDbDialect;
|
||||
@@ -398,6 +374,50 @@ mod tests {
|
||||
let Copy(crate::statements::copy::Copy::CopyDatabase(stmt)) = stmt else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
let CopyDatabase::To(stmt) = stmt else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
ObjectName(vec![Ident::new("catalog0"), Ident::new("schema0")]),
|
||||
stmt.database_name
|
||||
);
|
||||
assert_eq!(
|
||||
[("format".to_string(), "parquet".to_string())]
|
||||
.into_iter()
|
||||
.collect::<HashMap<_, _>>(),
|
||||
stmt.with.map
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
[
|
||||
("foo".to_string(), "Bar".to_string()),
|
||||
("one".to_string(), "two".to_string())
|
||||
]
|
||||
.into_iter()
|
||||
.collect::<HashMap<_, _>>(),
|
||||
stmt.connection.map
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_copy_database_from() {
|
||||
let sql = "COPY DATABASE catalog0.schema0 FROM '/a/b/c/' WITH (FORMAT = 'parquet') CONNECTION (FOO='Bar', ONE='two')";
|
||||
let stmt =
|
||||
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
|
||||
.unwrap()
|
||||
.pop()
|
||||
.unwrap();
|
||||
|
||||
let Copy(crate::statements::copy::Copy::CopyDatabase(stmt)) = stmt else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
let CopyDatabase::From(stmt) = stmt else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
ObjectName(vec![Ident::new("catalog0"), Ident::new("schema0")]),
|
||||
stmt.database_name
|
||||
|
||||
@@ -20,7 +20,7 @@ use crate::statements::OptionMap;
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
|
||||
pub enum Copy {
|
||||
CopyTable(CopyTable),
|
||||
CopyDatabase(CopyDatabaseArgument),
|
||||
CopyDatabase(CopyDatabase),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
|
||||
@@ -29,6 +29,12 @@ pub enum CopyTable {
|
||||
From(CopyTableArgument),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
|
||||
pub enum CopyDatabase {
|
||||
To(CopyDatabaseArgument),
|
||||
From(CopyDatabaseArgument),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut)]
|
||||
pub struct CopyDatabaseArgument {
|
||||
pub database_name: ObjectName,
|
||||
|
||||
@@ -0,0 +1,58 @@
|
||||
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000);
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
COPY DATABASE public TO '/tmp/demo/export/parquet/' WITH (FORMAT="parquet");
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
COPY DATABASE public TO '/tmp/demo/export/parquet_range/' WITH (FORMAT="parquet", start_time='2022-06-15 07:02:37.000Z', end_time='2022-06-15 07:02:37.1Z');
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
DELETE FROM demo;
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
SELECT * FROM demo ORDER BY ts;
|
||||
|
||||
++
|
||||
++
|
||||
|
||||
COPY DATABASE public FROM '/tmp/demo/export/parquet/';
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
SELECT * FROM demo ORDER BY ts;
|
||||
|
||||
+-------+------+--------+---------------------+
|
||||
| host | cpu | memory | ts |
|
||||
+-------+------+--------+---------------------+
|
||||
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
|
||||
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
|
||||
+-------+------+--------+---------------------+
|
||||
|
||||
DELETE FROM demo;
|
||||
|
||||
Affected Rows: 2
|
||||
|
||||
COPY DATABASE public FROM '/tmp/demo/export/parquet_range/';
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
SELECT * FROM demo ORDER BY ts;
|
||||
|
||||
+-------+------+--------+---------------------+
|
||||
| host | cpu | memory | ts |
|
||||
+-------+------+--------+---------------------+
|
||||
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
|
||||
+-------+------+--------+---------------------+
|
||||
|
||||
DROP TABLE demo;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -0,0 +1,23 @@
|
||||
CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index);
|
||||
|
||||
INSERT INTO demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000);
|
||||
|
||||
COPY DATABASE public TO '/tmp/demo/export/parquet/' WITH (FORMAT="parquet");
|
||||
|
||||
COPY DATABASE public TO '/tmp/demo/export/parquet_range/' WITH (FORMAT="parquet", start_time='2022-06-15 07:02:37.000Z', end_time='2022-06-15 07:02:37.1Z');
|
||||
|
||||
DELETE FROM demo;
|
||||
|
||||
SELECT * FROM demo ORDER BY ts;
|
||||
|
||||
COPY DATABASE public FROM '/tmp/demo/export/parquet/';
|
||||
|
||||
SELECT * FROM demo ORDER BY ts;
|
||||
|
||||
DELETE FROM demo;
|
||||
|
||||
COPY DATABASE public FROM '/tmp/demo/export/parquet_range/';
|
||||
|
||||
SELECT * FROM demo ORDER BY ts;
|
||||
|
||||
DROP TABLE demo;
|
||||
Reference in New Issue
Block a user