From e19c8fa2b605f1965253b052ee12cbbddd2e6cb0 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 20 Mar 2023 19:23:25 +0800 Subject: [PATCH] refactor: combine Copy To and Copy From (#1197) * refactor: combine Copy To and Copy From * Apply suggestions from code review Co-authored-by: LFC * Apply suggestions from code review Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --------- Co-authored-by: LFC Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --- Cargo.lock | 17 +- Cargo.toml | 1 + src/common/datasource/Cargo.toml | 13 + src/common/datasource/src/error.rs | 75 ++++ src/common/datasource/src/lib.rs | 18 + src/common/datasource/src/lister.rs | 81 ++++ src/common/datasource/src/object_store.rs | 60 +++ src/common/datasource/src/object_store/fs.rs | 28 ++ src/common/datasource/src/object_store/s3.rs | 79 ++++ src/common/datasource/src/util.rs | 125 ++++++ src/datanode/Cargo.toml | 1 + src/datanode/src/error.rs | 25 +- src/datanode/src/instance/sql.rs | 85 +++-- src/datanode/src/sql.rs | 9 +- src/datanode/src/sql/copy_table_from.rs | 361 ++---------------- .../sql/{copy_table.rs => copy_table_to.rs} | 50 +-- src/sql/src/parsers/copy_parser.rs | 53 +-- src/sql/src/statements/copy.rs | 52 +-- src/table/src/requests.rs | 18 +- 19 files changed, 643 insertions(+), 508 deletions(-) create mode 100644 src/common/datasource/Cargo.toml create mode 100644 src/common/datasource/src/error.rs create mode 100644 src/common/datasource/src/lib.rs create mode 100644 src/common/datasource/src/lister.rs create mode 100644 src/common/datasource/src/object_store.rs create mode 100644 src/common/datasource/src/object_store/fs.rs create mode 100644 src/common/datasource/src/object_store/s3.rs create mode 100644 src/common/datasource/src/util.rs rename src/datanode/src/sql/{copy_table.rs => copy_table_to.rs} (76%) diff --git a/Cargo.lock b/Cargo.lock index 9cfa45d25e..0b15c6d1c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1485,6 +1485,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "common-datasource" +version = "0.1.1" +dependencies = [ + "common-error", + "futures", + "object-store", + "regex", + "snafu", + "url", +] + [[package]] name = "common-error" version = "0.1.1" @@ -2280,6 +2292,7 @@ dependencies = [ "client", "common-base", "common-catalog", + "common-datasource", "common-error", "common-grpc", "common-grpc-expr", @@ -2938,9 +2951,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.26" +version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608" +checksum = "86d7a0c1aa76363dac491de0ee99faf6941128376f1cf96f07db7603b7de69dd" [[package]] name = "futures-executor" diff --git a/Cargo.toml b/Cargo.toml index ab0d3136f0..4a664d7312 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "src/cmd", "src/common/base", "src/common/catalog", + "src/common/datasource", "src/common/error", "src/common/function", "src/common/function-macro", diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml new file mode 100644 index 0000000000..a077fde9c6 --- /dev/null +++ b/src/common/datasource/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "common-datasource" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +common-error = { path = "../error" } +futures.workspace = true +object-store = { path = "../../object-store" } +regex = "1.7" +snafu.workspace = true +url = "2.3" diff --git a/src/common/datasource/src/error.rs b/src/common/datasource/src/error.rs new file mode 100644 index 0000000000..d24103daba --- /dev/null +++ b/src/common/datasource/src/error.rs @@ -0,0 +1,75 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use common_error::prelude::*; +use url::ParseError; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("Unsupported backend protocol: {}", protocol))] + UnsupportedBackendProtocol { protocol: String }, + + #[snafu(display("empty host: {}", url))] + EmptyHostPath { url: String }, + + #[snafu(display("Invalid path: {}", path))] + InvalidPath { path: String }, + + #[snafu(display("Invalid url: {}, error :{}", url, source))] + InvalidUrl { url: String, source: ParseError }, + + #[snafu(display("Failed to build backend, source: {}", source))] + BuildBackend { + source: object_store::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to list object in path: {}, source: {}", path, source))] + ListObjects { + path: String, + backtrace: Backtrace, + source: object_store::Error, + }, + + #[snafu(display("Invalid connection: {}", msg))] + InvalidConnection { msg: String }, +} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + match self { + BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable, + + UnsupportedBackendProtocol { .. } + | InvalidConnection { .. } + | InvalidUrl { .. } + | EmptyHostPath { .. } + | InvalidPath { .. } => StatusCode::InvalidArguments, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/common/datasource/src/lib.rs b/src/common/datasource/src/lib.rs new file mode 100644 index 0000000000..6044d52033 --- /dev/null +++ b/src/common/datasource/src/lib.rs @@ -0,0 +1,18 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod error; +pub mod lister; +pub mod object_store; +pub mod util; diff --git a/src/common/datasource/src/lister.rs b/src/common/datasource/src/lister.rs new file mode 100644 index 0000000000..7102b26708 --- /dev/null +++ b/src/common/datasource/src/lister.rs @@ -0,0 +1,81 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures::{future, TryStreamExt}; +use object_store::{Object, ObjectStore}; +use regex::Regex; +use snafu::ResultExt; + +use crate::error::{self, Result}; +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Source { + Filename(String), + Dir, +} + +pub struct Lister { + object_store: ObjectStore, + source: Source, + path: String, + regex: Option, +} + +impl Lister { + pub fn new( + object_store: ObjectStore, + source: Source, + path: String, + regex: Option, + ) -> Self { + Lister { + object_store, + source, + path, + regex, + } + } + + pub async fn list(&self) -> Result> { + match &self.source { + Source::Dir => { + let streamer = self + .object_store + .object(&self.path) + .list() + .await + .context(error::ListObjectsSnafu { path: &self.path })?; + + streamer + .try_filter(|f| { + let res = self + .regex + .as_ref() + .map(|x| x.is_match(f.name())) + .unwrap_or(true); + future::ready(res) + }) + .try_collect::>() + .await + .context(error::ListObjectsSnafu { path: &self.path }) + } + Source::Filename(filename) => { + let obj = self + .object_store + .object(&format!("{}{}", self.path, filename)); + + Ok(vec![obj]) + } + } + } +} diff --git a/src/common/datasource/src/object_store.rs b/src/common/datasource/src/object_store.rs new file mode 100644 index 0000000000..e1f949ceb3 --- /dev/null +++ b/src/common/datasource/src/object_store.rs @@ -0,0 +1,60 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod fs; +pub mod s3; +use std::collections::HashMap; + +use object_store::ObjectStore; +use snafu::{OptionExt, ResultExt}; +use url::{ParseError, Url}; + +use self::fs::build_fs_backend; +use self::s3::build_s3_backend; +use crate::error::{self, Result}; + +pub const FS_SCHEMA: &str = "FS"; +pub const S3_SCHEMA: &str = "S3"; + +/// parse url returns (schema,Option,path) +pub fn parse_url(url: &str) -> Result<(String, Option, String)> { + let parsed_url = Url::parse(url); + match parsed_url { + Ok(url) => Ok(( + url.scheme().to_string(), + url.host_str().map(|s| s.to_string()), + url.path().to_string(), + )), + Err(ParseError::RelativeUrlWithoutBase) => { + Ok((FS_SCHEMA.to_string(), None, url.to_string())) + } + Err(err) => Err(err).context(error::InvalidUrlSnafu { url }), + } +} + +pub fn build_backend(url: &str, connection: HashMap) -> Result { + let (schema, host, _path) = parse_url(url)?; + + match schema.to_uppercase().as_str() { + S3_SCHEMA => { + let host = host.context(error::EmptyHostPathSnafu { + url: url.to_string(), + })?; + Ok(build_s3_backend(&host, "/", connection)?) + } + FS_SCHEMA => Ok(build_fs_backend("/")?), + + _ => error::UnsupportedBackendProtocolSnafu { protocol: schema }.fail(), + } +} diff --git a/src/common/datasource/src/object_store/fs.rs b/src/common/datasource/src/object_store/fs.rs new file mode 100644 index 0000000000..c2c50ce793 --- /dev/null +++ b/src/common/datasource/src/object_store/fs.rs @@ -0,0 +1,28 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use object_store::services::Fs; +use object_store::{ObjectStore, ObjectStoreBuilder}; +use snafu::ResultExt; + +use crate::error::{self, Result}; + +pub fn build_fs_backend(root: &str) -> Result { + let accessor = Fs::default() + .root(root) + .build() + .context(error::BuildBackendSnafu)?; + + Ok(ObjectStore::new(accessor).finish()) +} diff --git a/src/common/datasource/src/object_store/s3.rs b/src/common/datasource/src/object_store/s3.rs new file mode 100644 index 0000000000..d501b2a1ac --- /dev/null +++ b/src/common/datasource/src/object_store/s3.rs @@ -0,0 +1,79 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use object_store::services::S3; +use object_store::{ObjectStore, ObjectStoreBuilder}; +use snafu::ResultExt; + +use crate::error::{self, Result}; + +const ENDPOINT_URL: &str = "ENDPOINT_URL"; +const ACCESS_KEY_ID: &str = "ACCESS_KEY_ID"; +const SECRET_ACCESS_KEY: &str = "SECRET_ACCESS_KEY"; +const SESSION_TOKEN: &str = "SESSION_TOKEN"; +const REGION: &str = "REGION"; +const ENABLE_VIRTUAL_HOST_STYLE: &str = "ENABLE_VIRTUAL_HOST_STYLE"; + +pub fn build_s3_backend( + host: &str, + path: &str, + connection: HashMap, +) -> Result { + let mut builder = S3::default(); + + builder.root(path); + + builder.bucket(host); + + if let Some(endpoint) = connection.get(ENDPOINT_URL) { + builder.endpoint(endpoint); + } + + if let Some(region) = connection.get(REGION) { + builder.region(region); + } + + if let Some(key_id) = connection.get(ACCESS_KEY_ID) { + builder.access_key_id(key_id); + } + + if let Some(key) = connection.get(SECRET_ACCESS_KEY) { + builder.secret_access_key(key); + } + + if let Some(session_token) = connection.get(SESSION_TOKEN) { + builder.security_token(session_token); + } + + if let Some(enable_str) = connection.get(ENABLE_VIRTUAL_HOST_STYLE) { + let enable = enable_str.as_str().parse::().map_err(|e| { + error::InvalidConnectionSnafu { + msg: format!( + "failed to parse the option {}={}, {}", + ENABLE_VIRTUAL_HOST_STYLE, enable_str, e + ), + } + .build() + })?; + if enable { + builder.enable_virtual_host_style(); + } + } + + let accessor = builder.build().context(error::BuildBackendSnafu)?; + + Ok(ObjectStore::new(accessor).finish()) +} diff --git a/src/common/datasource/src/util.rs b/src/common/datasource/src/util.rs new file mode 100644 index 0000000000..870fe376ce --- /dev/null +++ b/src/common/datasource/src/util.rs @@ -0,0 +1,125 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub fn find_dir_and_filename(path: &str) -> (String, Option) { + if path.is_empty() { + ("/".to_string(), None) + } else if path.ends_with('/') { + (path.to_string(), None) + } else if let Some(idx) = path.rfind('/') { + ( + path[..idx + 1].to_string(), + Some(path[idx + 1..].to_string()), + ) + } else { + ("/".to_string(), Some(path.to_string())) + } +} + +#[cfg(test)] +mod tests { + + use url::Url; + + use super::*; + + #[test] + fn test_parse_uri() { + struct Test<'a> { + uri: &'a str, + expected_path: &'a str, + expected_schema: &'a str, + } + + let tests = [ + Test { + uri: "s3://bucket/to/path/", + expected_path: "/to/path/", + expected_schema: "s3", + }, + Test { + uri: "fs:///to/path/", + expected_path: "/to/path/", + expected_schema: "fs", + }, + Test { + uri: "fs:///to/path/file", + expected_path: "/to/path/file", + expected_schema: "fs", + }, + ]; + for test in tests { + let parsed_uri = Url::parse(test.uri).unwrap(); + assert_eq!(parsed_uri.path(), test.expected_path); + assert_eq!(parsed_uri.scheme(), test.expected_schema); + } + } + + #[test] + fn test_parse_path_and_dir() { + let parsed = Url::from_file_path("/to/path/file").unwrap(); + assert_eq!(parsed.path(), "/to/path/file"); + + let parsed = Url::from_directory_path("/to/path/").unwrap(); + assert_eq!(parsed.path(), "/to/path/"); + } + + #[test] + fn test_find_dir_and_filename() { + struct Test<'a> { + path: &'a str, + expected_dir: &'a str, + expected_filename: Option, + } + + let tests = [ + Test { + path: "to/path/", + expected_dir: "to/path/", + expected_filename: None, + }, + Test { + path: "to/path/filename", + expected_dir: "to/path/", + expected_filename: Some("filename".into()), + }, + Test { + path: "/to/path/filename", + expected_dir: "/to/path/", + expected_filename: Some("filename".into()), + }, + Test { + path: "/", + expected_dir: "/", + expected_filename: None, + }, + Test { + path: "filename", + expected_dir: "/", + expected_filename: Some("filename".into()), + }, + Test { + path: "", + expected_dir: "/", + expected_filename: None, + }, + ]; + + for test in tests { + let (path, filename) = find_dir_and_filename(test.path); + assert_eq!(test.expected_dir, path); + assert_eq!(test.expected_filename, filename) + } + } +} diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 7745b8bcfd..0cf7355aba 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -20,6 +20,7 @@ catalog = { path = "../catalog" } common-base = { path = "../common/base" } common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } +common-datasource = { path = "../common/datasource" } common-grpc = { path = "../common/grpc" } common-grpc-expr = { path = "../common/grpc-expr" } common-procedure = { path = "../common/procedure" } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 3adf6cf47f..4b03ee937d 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -14,6 +14,7 @@ use std::any::Any; +use common_datasource::error::Error as DataSourceError; use common_error::prelude::*; use common_procedure::ProcedureId; use common_recordbatch::error::Error as RecordBatchError; @@ -218,7 +219,13 @@ pub enum Error { #[snafu(display("Failed to build backend, source: {}", source))] BuildBackend { - source: object_store::Error, + #[snafu(backtrace)] + source: DataSourceError, + }, + + #[snafu(display("Failed to parse url, source: {}", source))] + ParseUrl { + source: DataSourceError, backtrace: Backtrace, }, @@ -249,6 +256,12 @@ pub enum Error { source: regex::Error, }, + #[snafu(display("Failed to list objects, source: {}", source))] + ListObjects { + #[snafu(backtrace)] + source: DataSourceError, + }, + #[snafu(display("Failed to parse the data, source: {}", source))] ParseDataTypes { #[snafu(backtrace)] @@ -475,13 +488,6 @@ pub enum Error { source: object_store::Error, }, - #[snafu(display("Failed to lists object in path: {}, source: {}", path, source))] - ListObjects { - path: String, - backtrace: Backtrace, - source: object_store::Error, - }, - #[snafu(display("Unrecognized table option: {}", source))] UnrecognizedTableOption { #[snafu(backtrace)] @@ -584,7 +590,8 @@ impl ErrorExt for Error { | DatabaseNotFound { .. } | MissingNodeId { .. } | MissingMetasrvOpts { .. } - | ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments, + | ColumnNoneDefaultValue { .. } + | ParseUrl { .. } => StatusCode::InvalidArguments, // TODO(yingwen): Further categorize http error. StartServer { .. } diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index b363d2c8cd..ee4d78366e 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -28,12 +28,10 @@ use servers::prom::PromHandler; use session::context::{QueryContext, QueryContextRef}; use snafu::prelude::*; use sql::ast::ObjectName; -use sql::statements::copy::CopyTable; +use sql::statements::copy::{CopyTable, CopyTableArgument}; use sql::statements::statement::Statement; use table::engine::TableReference; -use table::requests::{ - CopyTableFromRequest, CopyTableRequest, CreateDatabaseRequest, DropTableRequest, -}; +use table::requests::{CopyDirection, CopyTableRequest, CreateDatabaseRequest, DropTableRequest}; use crate::error::{ self, BumpTableIdSnafu, ExecuteSqlSnafu, ExecuteStatementSnafu, PlanStatementSnafu, Result, @@ -160,39 +158,54 @@ impl Instance { QueryStatement::Sql(Statement::ShowCreateTable(_show_create_table)) => { unimplemented!("SHOW CREATE TABLE is unimplemented yet"); } - QueryStatement::Sql(Statement::Copy(copy_table)) => match copy_table { - CopyTable::To(copy_table) => { - let (catalog_name, schema_name, table_name) = - table_idents_to_full_name(©_table.table_name, query_ctx.clone())?; - let file_name = copy_table.file_name; - let req = CopyTableRequest { - catalog_name, - schema_name, - table_name, - file_name, - connection: copy_table.connection, - }; + QueryStatement::Sql(Statement::Copy(copy_table)) => { + let req = match copy_table { + CopyTable::To(copy_table) => { + let CopyTableArgument { + location, + connection, + pattern, + table_name, + .. + } = copy_table; + let (catalog_name, schema_name, table_name) = + table_idents_to_full_name(&table_name, query_ctx.clone())?; + CopyTableRequest { + catalog_name, + schema_name, + table_name, + location, + connection, + pattern, + direction: CopyDirection::Export, + } + } + CopyTable::From(copy_table) => { + let CopyTableArgument { + location, + connection, + pattern, + table_name, + .. + } = copy_table; + let (catalog_name, schema_name, table_name) = + table_idents_to_full_name(&table_name, query_ctx.clone())?; + CopyTableRequest { + catalog_name, + schema_name, + table_name, + location, + connection, + pattern, + direction: CopyDirection::Import, + } + } + }; - self.sql_handler - .execute(SqlRequest::CopyTable(req), query_ctx) - .await - } - CopyTable::From(copy_table) => { - let (catalog_name, schema_name, table_name) = - table_idents_to_full_name(©_table.table_name, query_ctx.clone())?; - let req = CopyTableFromRequest { - catalog_name, - schema_name, - table_name, - connection: copy_table.connection, - pattern: copy_table.pattern, - from: copy_table.from, - }; - self.sql_handler - .execute(SqlRequest::CopyTableFrom(req), query_ctx) - .await - } - }, + self.sql_handler + .execute(SqlRequest::CopyTable(req), query_ctx) + .await + } QueryStatement::Sql(Statement::Query(_)) | QueryStatement::Sql(Statement::Explain(_)) | QueryStatement::Sql(Statement::Use(_)) diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 2a05e8c8f6..551ba0fca6 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -34,8 +34,8 @@ use crate::error::{ use crate::instance::sql::table_idents_to_full_name; mod alter; -mod copy_table; mod copy_table_from; +mod copy_table_to; mod create; mod delete; mod drop_table; @@ -55,7 +55,6 @@ pub enum SqlRequest { DescribeTable(DescribeTable), Delete(Delete), CopyTable(CopyTableRequest), - CopyTableFrom(CopyTableFromRequest), } // Handler to execute SQL except query @@ -96,8 +95,10 @@ impl SqlHandler { SqlRequest::Alter(req) => self.alter(req).await, SqlRequest::DropTable(req) => self.drop_table(req).await, SqlRequest::Delete(req) => self.delete(query_ctx.clone(), req).await, - SqlRequest::CopyTable(req) => self.copy_table(req).await, - SqlRequest::CopyTableFrom(req) => self.copy_table_from(req).await, + SqlRequest::CopyTable(req) => match req.direction { + CopyDirection::Export => self.copy_table_to(req).await, + CopyDirection::Import => self.copy_table_from(req).await, + }, SqlRequest::ShowDatabases(req) => { show_databases(req, self.catalog_manager.clone()).context(ExecuteSqlSnafu) } diff --git a/src/datanode/src/sql/copy_table_from.rs b/src/datanode/src/sql/copy_table_from.rs index 3c21e15769..1ca4a5f27a 100644 --- a/src/datanode/src/sql/copy_table_from.rs +++ b/src/datanode/src/sql/copy_table_from.rs @@ -15,35 +15,26 @@ use std::collections::HashMap; use async_compat::CompatExt; +use common_datasource::lister::{Lister, Source}; +use common_datasource::object_store::{build_backend, parse_url}; +use common_datasource::util::find_dir_and_filename; use common_query::Output; use common_recordbatch::error::DataTypesSnafu; use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder; use datatypes::arrow::record_batch::RecordBatch; use datatypes::vectors::{Helper, VectorRef}; -use futures::future; use futures_util::TryStreamExt; -use object_store::services::{Fs, S3}; -use object_store::{Object, ObjectStore, ObjectStoreBuilder}; use regex::Regex; use snafu::{ensure, ResultExt}; use table::engine::TableReference; -use table::requests::{CopyTableFromRequest, InsertRequest}; +use table::requests::{CopyTableRequest, InsertRequest}; use tokio::io::BufReader; -use url::{ParseError, Url}; use crate::error::{self, Result}; use crate::sql::SqlHandler; -pub const S3_SCHEMA: &str = "S3"; -const ENDPOINT_URL: &str = "ENDPOINT_URL"; -const ACCESS_KEY_ID: &str = "ACCESS_KEY_ID"; -const SECRET_ACCESS_KEY: &str = "SECRET_ACCESS_KEY"; -const SESSION_TOKEN: &str = "SESSION_TOKEN"; -const REGION: &str = "REGION"; -const ENABLE_VIRTUAL_HOST_STYLE: &str = "ENABLE_VIRTUAL_HOST_STYLE"; - impl SqlHandler { - pub(crate) async fn copy_table_from(&self, req: CopyTableFromRequest) -> Result { + pub(crate) async fn copy_table_from(&self, req: CopyTableRequest) -> Result { let table_ref = TableReference { catalog: &req.catalog_name, schema: &req.schema_name, @@ -51,9 +42,29 @@ impl SqlHandler { }; let table = self.get_table(&table_ref)?; - let datasource = DataSource::new(&req.from, req.pattern, req.connection)?; + let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?; - let objects = datasource.list().await?; + let object_store = + build_backend(&req.location, req.connection).context(error::BuildBackendSnafu)?; + + let (dir, filename) = find_dir_and_filename(&path); + + let regex = req + .pattern + .as_ref() + .map(|x| Regex::new(x)) + .transpose() + .context(error::BuildRegexSnafu)?; + + let source = if let Some(filename) = filename { + Source::Filename(filename) + } else { + Source::Dir + }; + + let lister = Lister::new(object_store, source, dir, regex); + + let objects = lister.list().await.context(error::ListObjectsSnafu)?; let mut buf: Vec = Vec::new(); @@ -131,321 +142,3 @@ impl SqlHandler { Ok(Output::AffectedRows(result.iter().sum())) } } - -#[derive(Debug, Clone, PartialEq, Eq)] -enum Source { - Filename(String), - Dir, -} - -struct DataSource { - object_store: ObjectStore, - source: Source, - path: String, - regex: Option, -} - -impl DataSource { - fn from_path(url: &str, regex: Option) -> Result { - let result = if url.ends_with('/') { - Url::from_directory_path(url) - } else { - Url::from_file_path(url) - }; - - match result { - Ok(url) => { - let path = url.path(); - - let (path, filename) = DataSource::find_dir_and_filename(path); - - let source = if let Some(filename) = filename { - Source::Filename(filename) - } else { - Source::Dir - }; - - let object_store = build_fs_backend(&path)?; - - Ok(DataSource { - object_store, - source, - path, - regex, - }) - } - Err(()) => error::InvalidPathSnafu { - path: url.to_string(), - } - .fail(), - } - } - - fn from_url( - url: Url, - regex: Option, - connection: HashMap, - ) -> Result { - let host = url.host_str(); - - let path = url.path(); - - let schema = url.scheme(); - - let (dir, filename) = DataSource::find_dir_and_filename(path); - - let source = if let Some(filename) = filename { - Source::Filename(filename) - } else { - Source::Dir - }; - - let object_store = match schema.to_uppercase().as_str() { - S3_SCHEMA => build_s3_backend(host, &dir, connection)?, - _ => { - return error::UnsupportedBackendProtocolSnafu { - protocol: schema.to_string(), - } - .fail() - } - }; - - Ok(DataSource { - object_store, - source, - path: dir, - regex, - }) - } - - pub fn new( - url: &str, - pattern: Option, - connection: HashMap, - ) -> Result { - let regex = if let Some(pattern) = pattern { - let regex = Regex::new(&pattern).context(error::BuildRegexSnafu)?; - Some(regex) - } else { - None - }; - let result = Url::parse(url); - - match result { - Ok(url) => DataSource::from_url(url, regex, connection), - Err(err) => { - if ParseError::RelativeUrlWithoutBase == err { - DataSource::from_path(url, regex) - } else { - Err(error::Error::InvalidUrl { - url: url.to_string(), - source: err, - }) - } - } - } - } - - pub async fn list(&self) -> Result> { - match &self.source { - Source::Dir => { - let streamer = self - .object_store - .object("/") - .list() - .await - .context(error::ListObjectsSnafu { path: &self.path })?; - streamer - .try_filter(|f| { - let res = if let Some(regex) = &self.regex { - regex.is_match(f.name()) - } else { - true - }; - future::ready(res) - }) - .try_collect::>() - .await - .context(error::ListObjectsSnafu { path: &self.path }) - } - Source::Filename(filename) => { - let obj = self.object_store.object(filename); - - Ok(vec![obj]) - } - } - } - - fn find_dir_and_filename(path: &str) -> (String, Option) { - if path.is_empty() { - ("/".to_string(), None) - } else if path.ends_with('/') { - (path.to_string(), None) - } else if let Some(idx) = path.rfind('/') { - ( - path[..idx + 1].to_string(), - Some(path[idx + 1..].to_string()), - ) - } else { - ("/".to_string(), Some(path.to_string())) - } - } -} - -pub fn build_s3_backend( - host: Option<&str>, - path: &str, - connection: HashMap, -) -> Result { - let mut builder = S3::default(); - - builder.root(path); - - if let Some(bucket) = host { - builder.bucket(bucket); - } - - if let Some(endpoint) = connection.get(ENDPOINT_URL) { - builder.endpoint(endpoint); - } - - if let Some(region) = connection.get(REGION) { - builder.region(region); - } - - if let Some(key_id) = connection.get(ACCESS_KEY_ID) { - builder.access_key_id(key_id); - } - - if let Some(key) = connection.get(SECRET_ACCESS_KEY) { - builder.secret_access_key(key); - } - - if let Some(session_token) = connection.get(SESSION_TOKEN) { - builder.security_token(session_token); - } - - if let Some(enable_str) = connection.get(ENABLE_VIRTUAL_HOST_STYLE) { - let enable = enable_str.as_str().parse::().map_err(|e| { - error::InvalidConnectionSnafu { - msg: format!( - "failed to parse the option {}={}, {}", - ENABLE_VIRTUAL_HOST_STYLE, enable_str, e - ), - } - .build() - })?; - if enable { - builder.enable_virtual_host_style(); - } - } - - let accessor = builder.build().context(error::BuildBackendSnafu)?; - - Ok(ObjectStore::new(accessor).finish()) -} - -pub fn build_fs_backend(root: &str) -> Result { - let accessor = Fs::default() - .root(root) - .build() - .context(error::BuildBackendSnafu)?; - - Ok(ObjectStore::new(accessor).finish()) -} - -#[cfg(test)] -mod tests { - - use url::Url; - - use super::*; - #[test] - fn test_parse_uri() { - struct Test<'a> { - uri: &'a str, - expected_path: &'a str, - expected_schema: &'a str, - } - - let tests = [ - Test { - uri: "s3://bucket/to/path/", - expected_path: "/to/path/", - expected_schema: "s3", - }, - Test { - uri: "fs:///to/path/", - expected_path: "/to/path/", - expected_schema: "fs", - }, - Test { - uri: "fs:///to/path/file", - expected_path: "/to/path/file", - expected_schema: "fs", - }, - ]; - for test in tests { - let parsed_uri = Url::parse(test.uri).unwrap(); - assert_eq!(parsed_uri.path(), test.expected_path); - assert_eq!(parsed_uri.scheme(), test.expected_schema); - } - } - - #[test] - fn test_parse_path_and_dir() { - let parsed = Url::from_file_path("/to/path/file").unwrap(); - assert_eq!(parsed.path(), "/to/path/file"); - - let parsed = Url::from_directory_path("/to/path/").unwrap(); - assert_eq!(parsed.path(), "/to/path/"); - } - - #[test] - fn test_find_dir_and_filename() { - struct Test<'a> { - path: &'a str, - expected_dir: &'a str, - expected_filename: Option, - } - - let tests = [ - Test { - path: "to/path/", - expected_dir: "to/path/", - expected_filename: None, - }, - Test { - path: "to/path/filename", - expected_dir: "to/path/", - expected_filename: Some("filename".into()), - }, - Test { - path: "/to/path/filename", - expected_dir: "/to/path/", - expected_filename: Some("filename".into()), - }, - Test { - path: "/", - expected_dir: "/", - expected_filename: None, - }, - Test { - path: "filename", - expected_dir: "/", - expected_filename: Some("filename".into()), - }, - Test { - path: "", - expected_dir: "/", - expected_filename: None, - }, - ]; - - for test in tests { - let (path, filename) = DataSource::find_dir_and_filename(test.path); - assert_eq!(test.expected_dir, path); - assert_eq!(test.expected_filename, filename) - } - } -} diff --git a/src/datanode/src/sql/copy_table.rs b/src/datanode/src/sql/copy_table_to.rs similarity index 76% rename from src/datanode/src/sql/copy_table.rs rename to src/datanode/src/sql/copy_table_to.rs index 8acd9447ae..17a4c4e08d 100644 --- a/src/datanode/src/sql/copy_table.rs +++ b/src/datanode/src/sql/copy_table_to.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::pin::Pin; +use common_datasource; +use common_datasource::object_store::{build_backend, parse_url}; use common_query::physical_plan::SessionContext; use common_query::Output; use common_recordbatch::adapter::DfRecordBatchStreamAdapter; @@ -27,51 +28,12 @@ use object_store::ObjectStore; use snafu::ResultExt; use table::engine::TableReference; use table::requests::CopyTableRequest; -use url::{ParseError, Url}; -use super::copy_table_from::{build_fs_backend, build_s3_backend, S3_SCHEMA}; use crate::error::{self, Result}; use crate::sql::SqlHandler; impl SqlHandler { - fn build_backend( - &self, - url: &str, - connection: HashMap, - ) -> Result<(ObjectStore, String)> { - let result = Url::parse(url); - - match result { - Ok(url) => { - let host = url.host_str(); - - let schema = url.scheme(); - - let path = url.path(); - - match schema.to_uppercase().as_str() { - S3_SCHEMA => { - let object_store = build_s3_backend(host, "/", connection)?; - Ok((object_store, path.to_string())) - } - - _ => error::UnsupportedBackendProtocolSnafu { - protocol: schema.to_string(), - } - .fail(), - } - } - Err(ParseError::RelativeUrlWithoutBase) => { - let object_store = build_fs_backend("/")?; - Ok((object_store, url.to_string())) - } - Err(err) => Err(error::Error::InvalidUrl { - url: url.to_string(), - source: err, - }), - } - } - pub(crate) async fn copy_table(&self, req: CopyTableRequest) -> Result { + pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result { let table_ref = TableReference { catalog: &req.catalog_name, schema: &req.schema_name, @@ -91,9 +53,11 @@ impl SqlHandler { .context(error::TableScanExecSnafu)?; let stream = Box::pin(DfRecordBatchStreamAdapter::new(stream)); - let (object_store, file_name) = self.build_backend(&req.file_name, req.connection)?; + let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?; + let object_store = + build_backend(&req.location, req.connection).context(error::BuildBackendSnafu)?; - let mut parquet_writer = ParquetWriter::new(file_name, stream, object_store); + let mut parquet_writer = ParquetWriter::new(path.to_string(), stream, object_store); // TODO(jiachun): // For now, COPY is implemented synchronously. // When copying large table, it will be blocked for a long time. diff --git a/src/sql/src/parsers/copy_parser.rs b/src/sql/src/parsers/copy_parser.rs index 518056dea8..cd7c581b88 100644 --- a/src/sql/src/parsers/copy_parser.rs +++ b/src/sql/src/parsers/copy_parser.rs @@ -18,7 +18,7 @@ use sqlparser::keywords::Keyword; use crate::error::{self, Result}; use crate::parser::ParserContext; -use crate::statements::copy::{CopyTable, CopyTableFrom, CopyTableTo, Format}; +use crate::statements::copy::{CopyTable, CopyTableArgument, Format}; use crate::statements::statement::Statement; // COPY tbl TO 'output.parquet'; @@ -40,24 +40,24 @@ impl<'a> ParserContext<'a> { })?; if self.parser.parse_keyword(Keyword::TO) { - self.parse_copy_table_to(table_name) + Ok(CopyTable::To(self.parse_copy_table_to(table_name)?)) } else { self.parser .expect_keyword(Keyword::FROM) .context(error::SyntaxSnafu { sql: self.sql })?; - self.parse_copy_table_from(table_name) + Ok(CopyTable::From(self.parse_copy_table_from(table_name)?)) } } - fn parse_copy_table_from(&mut self, table_name: ObjectName) -> Result { - let uri = self - .parser - .parse_literal_string() - .with_context(|_| error::UnexpectedSnafu { - sql: self.sql, - expected: "a uri", - actual: self.peek_token_as_string(), - })?; + fn parse_copy_table_from(&mut self, table_name: ObjectName) -> Result { + let location = + self.parser + .parse_literal_string() + .with_context(|_| error::UnexpectedSnafu { + sql: self.sql, + expected: "a uri", + actual: self.peek_token_as_string(), + })?; let options = self .parser @@ -99,14 +99,17 @@ impl<'a> ParserContext<'a> { } }) .collect(); - - Ok(CopyTable::From(CopyTableFrom::new( - table_name, uri, format, pattern, connection, - ))) + Ok(CopyTableArgument { + table_name, + format, + pattern, + connection, + location, + }) } - fn parse_copy_table_to(&mut self, table_name: ObjectName) -> Result { - let file_name = + fn parse_copy_table_to(&mut self, table_name: ObjectName) -> Result { + let location = self.parser .parse_literal_string() .with_context(|_| error::UnexpectedSnafu { @@ -146,9 +149,13 @@ impl<'a> ParserContext<'a> { }) .collect(); - Ok(CopyTable::To(CopyTableTo::new( - table_name, file_name, format, connection, - ))) + Ok(CopyTableArgument { + table_name, + format, + connection, + pattern: None, + location, + }) } fn parse_option_string(value: Value) -> Option { @@ -197,7 +204,7 @@ mod tests { assert_eq!("schema0", schema); assert_eq!("tbl", table); - let file_name = copy_table.file_name; + let file_name = copy_table.location; assert_eq!("tbl_file.parquet", file_name); let format = copy_table.format; @@ -240,7 +247,7 @@ mod tests { assert_eq!("schema0", schema); assert_eq!("tbl", table); - let file_name = copy_table.from; + let file_name = copy_table.location; assert_eq!("tbl_file.parquet", file_name); let format = copy_table.format; diff --git a/src/sql/src/statements/copy.rs b/src/sql/src/statements/copy.rs index e2c3862a1a..b56d48e9cb 100644 --- a/src/sql/src/statements/copy.rs +++ b/src/sql/src/statements/copy.rs @@ -20,60 +20,18 @@ use crate::error::{self, Result}; #[derive(Debug, Clone, PartialEq, Eq)] pub enum CopyTable { - To(CopyTableTo), - From(CopyTableFrom), + To(CopyTableArgument), + From(CopyTableArgument), } #[derive(Debug, Clone, PartialEq, Eq)] -pub struct CopyTableTo { - pub table_name: ObjectName, - pub file_name: String, - pub format: Format, - pub connection: HashMap, -} - -impl CopyTableTo { - pub(crate) fn new( - table_name: ObjectName, - file_name: String, - format: Format, - connection: HashMap, - ) -> Self { - Self { - table_name, - file_name, - format, - connection, - } - } -} - -// TODO: To combine struct CopyTableFrom and CopyTableTo -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct CopyTableFrom { +pub struct CopyTableArgument { pub table_name: ObjectName, pub format: Format, pub connection: HashMap, pub pattern: Option, - pub from: String, -} - -impl CopyTableFrom { - pub(crate) fn new( - table_name: ObjectName, - from: String, - format: Format, - pattern: Option, - connection: HashMap, - ) -> Self { - CopyTableFrom { - table_name, - format, - connection, - pattern, - from, - } - } + /// Copy tbl [To|From] 'location'. + pub location: String, } #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 6740823800..f5cead9221 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -190,24 +190,22 @@ pub struct DeleteRequest { pub key_column_values: HashMap, } +#[derive(Debug)] +pub enum CopyDirection { + Export, + Import, +} + /// Copy table request #[derive(Debug)] pub struct CopyTableRequest { pub catalog_name: String, pub schema_name: String, pub table_name: String, - pub file_name: String, - pub connection: HashMap, -} - -#[derive(Debug)] -pub struct CopyTableFromRequest { - pub catalog_name: String, - pub schema_name: String, - pub table_name: String, + pub location: String, pub connection: HashMap, pub pattern: Option, - pub from: String, + pub direction: CopyDirection, } #[derive(Debug, Clone, Default)]