refactor: combine Copy To and Copy From (#1197)

* refactor: combine Copy To and Copy From

* Apply suggestions from code review

Co-authored-by: LFC <bayinamine@gmail.com>

* Apply suggestions from code review

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

---------

Co-authored-by: LFC <bayinamine@gmail.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
Weny Xu
2023-03-20 19:23:25 +08:00
committed by GitHub
parent ad886f5b3e
commit e19c8fa2b6
19 changed files with 643 additions and 508 deletions

View File

@@ -20,6 +20,7 @@ catalog = { path = "../catalog" }
common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-datasource = { path = "../common/datasource" }
common-grpc = { path = "../common/grpc" }
common-grpc-expr = { path = "../common/grpc-expr" }
common-procedure = { path = "../common/procedure" }

View File

@@ -14,6 +14,7 @@
use std::any::Any;
use common_datasource::error::Error as DataSourceError;
use common_error::prelude::*;
use common_procedure::ProcedureId;
use common_recordbatch::error::Error as RecordBatchError;
@@ -218,7 +219,13 @@ pub enum Error {
#[snafu(display("Failed to build backend, source: {}", source))]
BuildBackend {
source: object_store::Error,
#[snafu(backtrace)]
source: DataSourceError,
},
#[snafu(display("Failed to parse url, source: {}", source))]
ParseUrl {
source: DataSourceError,
backtrace: Backtrace,
},
@@ -249,6 +256,12 @@ pub enum Error {
source: regex::Error,
},
#[snafu(display("Failed to list objects, source: {}", source))]
ListObjects {
#[snafu(backtrace)]
source: DataSourceError,
},
#[snafu(display("Failed to parse the data, source: {}", source))]
ParseDataTypes {
#[snafu(backtrace)]
@@ -475,13 +488,6 @@ pub enum Error {
source: object_store::Error,
},
#[snafu(display("Failed to lists object in path: {}, source: {}", path, source))]
ListObjects {
path: String,
backtrace: Backtrace,
source: object_store::Error,
},
#[snafu(display("Unrecognized table option: {}", source))]
UnrecognizedTableOption {
#[snafu(backtrace)]
@@ -584,7 +590,8 @@ impl ErrorExt for Error {
| DatabaseNotFound { .. }
| MissingNodeId { .. }
| MissingMetasrvOpts { .. }
| ColumnNoneDefaultValue { .. } => StatusCode::InvalidArguments,
| ColumnNoneDefaultValue { .. }
| ParseUrl { .. } => StatusCode::InvalidArguments,
// TODO(yingwen): Further categorize http error.
StartServer { .. }

View File

@@ -28,12 +28,10 @@ use servers::prom::PromHandler;
use session::context::{QueryContext, QueryContextRef};
use snafu::prelude::*;
use sql::ast::ObjectName;
use sql::statements::copy::CopyTable;
use sql::statements::copy::{CopyTable, CopyTableArgument};
use sql::statements::statement::Statement;
use table::engine::TableReference;
use table::requests::{
CopyTableFromRequest, CopyTableRequest, CreateDatabaseRequest, DropTableRequest,
};
use table::requests::{CopyDirection, CopyTableRequest, CreateDatabaseRequest, DropTableRequest};
use crate::error::{
self, BumpTableIdSnafu, ExecuteSqlSnafu, ExecuteStatementSnafu, PlanStatementSnafu, Result,
@@ -160,39 +158,54 @@ impl Instance {
QueryStatement::Sql(Statement::ShowCreateTable(_show_create_table)) => {
unimplemented!("SHOW CREATE TABLE is unimplemented yet");
}
QueryStatement::Sql(Statement::Copy(copy_table)) => match copy_table {
CopyTable::To(copy_table) => {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&copy_table.table_name, query_ctx.clone())?;
let file_name = copy_table.file_name;
let req = CopyTableRequest {
catalog_name,
schema_name,
table_name,
file_name,
connection: copy_table.connection,
};
QueryStatement::Sql(Statement::Copy(copy_table)) => {
let req = match copy_table {
CopyTable::To(copy_table) => {
let CopyTableArgument {
location,
connection,
pattern,
table_name,
..
} = copy_table;
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&table_name, query_ctx.clone())?;
CopyTableRequest {
catalog_name,
schema_name,
table_name,
location,
connection,
pattern,
direction: CopyDirection::Export,
}
}
CopyTable::From(copy_table) => {
let CopyTableArgument {
location,
connection,
pattern,
table_name,
..
} = copy_table;
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&table_name, query_ctx.clone())?;
CopyTableRequest {
catalog_name,
schema_name,
table_name,
location,
connection,
pattern,
direction: CopyDirection::Import,
}
}
};
self.sql_handler
.execute(SqlRequest::CopyTable(req), query_ctx)
.await
}
CopyTable::From(copy_table) => {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(&copy_table.table_name, query_ctx.clone())?;
let req = CopyTableFromRequest {
catalog_name,
schema_name,
table_name,
connection: copy_table.connection,
pattern: copy_table.pattern,
from: copy_table.from,
};
self.sql_handler
.execute(SqlRequest::CopyTableFrom(req), query_ctx)
.await
}
},
self.sql_handler
.execute(SqlRequest::CopyTable(req), query_ctx)
.await
}
QueryStatement::Sql(Statement::Query(_))
| QueryStatement::Sql(Statement::Explain(_))
| QueryStatement::Sql(Statement::Use(_))

View File

@@ -34,8 +34,8 @@ use crate::error::{
use crate::instance::sql::table_idents_to_full_name;
mod alter;
mod copy_table;
mod copy_table_from;
mod copy_table_to;
mod create;
mod delete;
mod drop_table;
@@ -55,7 +55,6 @@ pub enum SqlRequest {
DescribeTable(DescribeTable),
Delete(Delete),
CopyTable(CopyTableRequest),
CopyTableFrom(CopyTableFromRequest),
}
// Handler to execute SQL except query
@@ -96,8 +95,10 @@ impl SqlHandler {
SqlRequest::Alter(req) => self.alter(req).await,
SqlRequest::DropTable(req) => self.drop_table(req).await,
SqlRequest::Delete(req) => self.delete(query_ctx.clone(), req).await,
SqlRequest::CopyTable(req) => self.copy_table(req).await,
SqlRequest::CopyTableFrom(req) => self.copy_table_from(req).await,
SqlRequest::CopyTable(req) => match req.direction {
CopyDirection::Export => self.copy_table_to(req).await,
CopyDirection::Import => self.copy_table_from(req).await,
},
SqlRequest::ShowDatabases(req) => {
show_databases(req, self.catalog_manager.clone()).context(ExecuteSqlSnafu)
}

View File

@@ -15,35 +15,26 @@
use std::collections::HashMap;
use async_compat::CompatExt;
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::{build_backend, parse_url};
use common_datasource::util::find_dir_and_filename;
use common_query::Output;
use common_recordbatch::error::DataTypesSnafu;
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::vectors::{Helper, VectorRef};
use futures::future;
use futures_util::TryStreamExt;
use object_store::services::{Fs, S3};
use object_store::{Object, ObjectStore, ObjectStoreBuilder};
use regex::Regex;
use snafu::{ensure, ResultExt};
use table::engine::TableReference;
use table::requests::{CopyTableFromRequest, InsertRequest};
use table::requests::{CopyTableRequest, InsertRequest};
use tokio::io::BufReader;
use url::{ParseError, Url};
use crate::error::{self, Result};
use crate::sql::SqlHandler;
pub const S3_SCHEMA: &str = "S3";
const ENDPOINT_URL: &str = "ENDPOINT_URL";
const ACCESS_KEY_ID: &str = "ACCESS_KEY_ID";
const SECRET_ACCESS_KEY: &str = "SECRET_ACCESS_KEY";
const SESSION_TOKEN: &str = "SESSION_TOKEN";
const REGION: &str = "REGION";
const ENABLE_VIRTUAL_HOST_STYLE: &str = "ENABLE_VIRTUAL_HOST_STYLE";
impl SqlHandler {
pub(crate) async fn copy_table_from(&self, req: CopyTableFromRequest) -> Result<Output> {
pub(crate) async fn copy_table_from(&self, req: CopyTableRequest) -> Result<Output> {
let table_ref = TableReference {
catalog: &req.catalog_name,
schema: &req.schema_name,
@@ -51,9 +42,29 @@ impl SqlHandler {
};
let table = self.get_table(&table_ref)?;
let datasource = DataSource::new(&req.from, req.pattern, req.connection)?;
let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
let objects = datasource.list().await?;
let object_store =
build_backend(&req.location, req.connection).context(error::BuildBackendSnafu)?;
let (dir, filename) = find_dir_and_filename(&path);
let regex = req
.pattern
.as_ref()
.map(|x| Regex::new(x))
.transpose()
.context(error::BuildRegexSnafu)?;
let source = if let Some(filename) = filename {
Source::Filename(filename)
} else {
Source::Dir
};
let lister = Lister::new(object_store, source, dir, regex);
let objects = lister.list().await.context(error::ListObjectsSnafu)?;
let mut buf: Vec<RecordBatch> = Vec::new();
@@ -131,321 +142,3 @@ impl SqlHandler {
Ok(Output::AffectedRows(result.iter().sum()))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum Source {
Filename(String),
Dir,
}
struct DataSource {
object_store: ObjectStore,
source: Source,
path: String,
regex: Option<Regex>,
}
impl DataSource {
fn from_path(url: &str, regex: Option<Regex>) -> Result<DataSource> {
let result = if url.ends_with('/') {
Url::from_directory_path(url)
} else {
Url::from_file_path(url)
};
match result {
Ok(url) => {
let path = url.path();
let (path, filename) = DataSource::find_dir_and_filename(path);
let source = if let Some(filename) = filename {
Source::Filename(filename)
} else {
Source::Dir
};
let object_store = build_fs_backend(&path)?;
Ok(DataSource {
object_store,
source,
path,
regex,
})
}
Err(()) => error::InvalidPathSnafu {
path: url.to_string(),
}
.fail(),
}
}
fn from_url(
url: Url,
regex: Option<Regex>,
connection: HashMap<String, String>,
) -> Result<DataSource> {
let host = url.host_str();
let path = url.path();
let schema = url.scheme();
let (dir, filename) = DataSource::find_dir_and_filename(path);
let source = if let Some(filename) = filename {
Source::Filename(filename)
} else {
Source::Dir
};
let object_store = match schema.to_uppercase().as_str() {
S3_SCHEMA => build_s3_backend(host, &dir, connection)?,
_ => {
return error::UnsupportedBackendProtocolSnafu {
protocol: schema.to_string(),
}
.fail()
}
};
Ok(DataSource {
object_store,
source,
path: dir,
regex,
})
}
pub fn new(
url: &str,
pattern: Option<String>,
connection: HashMap<String, String>,
) -> Result<DataSource> {
let regex = if let Some(pattern) = pattern {
let regex = Regex::new(&pattern).context(error::BuildRegexSnafu)?;
Some(regex)
} else {
None
};
let result = Url::parse(url);
match result {
Ok(url) => DataSource::from_url(url, regex, connection),
Err(err) => {
if ParseError::RelativeUrlWithoutBase == err {
DataSource::from_path(url, regex)
} else {
Err(error::Error::InvalidUrl {
url: url.to_string(),
source: err,
})
}
}
}
}
pub async fn list(&self) -> Result<Vec<Object>> {
match &self.source {
Source::Dir => {
let streamer = self
.object_store
.object("/")
.list()
.await
.context(error::ListObjectsSnafu { path: &self.path })?;
streamer
.try_filter(|f| {
let res = if let Some(regex) = &self.regex {
regex.is_match(f.name())
} else {
true
};
future::ready(res)
})
.try_collect::<Vec<_>>()
.await
.context(error::ListObjectsSnafu { path: &self.path })
}
Source::Filename(filename) => {
let obj = self.object_store.object(filename);
Ok(vec![obj])
}
}
}
fn find_dir_and_filename(path: &str) -> (String, Option<String>) {
if path.is_empty() {
("/".to_string(), None)
} else if path.ends_with('/') {
(path.to_string(), None)
} else if let Some(idx) = path.rfind('/') {
(
path[..idx + 1].to_string(),
Some(path[idx + 1..].to_string()),
)
} else {
("/".to_string(), Some(path.to_string()))
}
}
}
pub fn build_s3_backend(
host: Option<&str>,
path: &str,
connection: HashMap<String, String>,
) -> Result<ObjectStore> {
let mut builder = S3::default();
builder.root(path);
if let Some(bucket) = host {
builder.bucket(bucket);
}
if let Some(endpoint) = connection.get(ENDPOINT_URL) {
builder.endpoint(endpoint);
}
if let Some(region) = connection.get(REGION) {
builder.region(region);
}
if let Some(key_id) = connection.get(ACCESS_KEY_ID) {
builder.access_key_id(key_id);
}
if let Some(key) = connection.get(SECRET_ACCESS_KEY) {
builder.secret_access_key(key);
}
if let Some(session_token) = connection.get(SESSION_TOKEN) {
builder.security_token(session_token);
}
if let Some(enable_str) = connection.get(ENABLE_VIRTUAL_HOST_STYLE) {
let enable = enable_str.as_str().parse::<bool>().map_err(|e| {
error::InvalidConnectionSnafu {
msg: format!(
"failed to parse the option {}={}, {}",
ENABLE_VIRTUAL_HOST_STYLE, enable_str, e
),
}
.build()
})?;
if enable {
builder.enable_virtual_host_style();
}
}
let accessor = builder.build().context(error::BuildBackendSnafu)?;
Ok(ObjectStore::new(accessor).finish())
}
pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
let accessor = Fs::default()
.root(root)
.build()
.context(error::BuildBackendSnafu)?;
Ok(ObjectStore::new(accessor).finish())
}
#[cfg(test)]
mod tests {
use url::Url;
use super::*;
#[test]
fn test_parse_uri() {
struct Test<'a> {
uri: &'a str,
expected_path: &'a str,
expected_schema: &'a str,
}
let tests = [
Test {
uri: "s3://bucket/to/path/",
expected_path: "/to/path/",
expected_schema: "s3",
},
Test {
uri: "fs:///to/path/",
expected_path: "/to/path/",
expected_schema: "fs",
},
Test {
uri: "fs:///to/path/file",
expected_path: "/to/path/file",
expected_schema: "fs",
},
];
for test in tests {
let parsed_uri = Url::parse(test.uri).unwrap();
assert_eq!(parsed_uri.path(), test.expected_path);
assert_eq!(parsed_uri.scheme(), test.expected_schema);
}
}
#[test]
fn test_parse_path_and_dir() {
let parsed = Url::from_file_path("/to/path/file").unwrap();
assert_eq!(parsed.path(), "/to/path/file");
let parsed = Url::from_directory_path("/to/path/").unwrap();
assert_eq!(parsed.path(), "/to/path/");
}
#[test]
fn test_find_dir_and_filename() {
struct Test<'a> {
path: &'a str,
expected_dir: &'a str,
expected_filename: Option<String>,
}
let tests = [
Test {
path: "to/path/",
expected_dir: "to/path/",
expected_filename: None,
},
Test {
path: "to/path/filename",
expected_dir: "to/path/",
expected_filename: Some("filename".into()),
},
Test {
path: "/to/path/filename",
expected_dir: "/to/path/",
expected_filename: Some("filename".into()),
},
Test {
path: "/",
expected_dir: "/",
expected_filename: None,
},
Test {
path: "filename",
expected_dir: "/",
expected_filename: Some("filename".into()),
},
Test {
path: "",
expected_dir: "/",
expected_filename: None,
},
];
for test in tests {
let (path, filename) = DataSource::find_dir_and_filename(test.path);
assert_eq!(test.expected_dir, path);
assert_eq!(test.expected_filename, filename)
}
}
}

View File

@@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::pin::Pin;
use common_datasource;
use common_datasource::object_store::{build_backend, parse_url};
use common_query::physical_plan::SessionContext;
use common_query::Output;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
@@ -27,51 +28,12 @@ use object_store::ObjectStore;
use snafu::ResultExt;
use table::engine::TableReference;
use table::requests::CopyTableRequest;
use url::{ParseError, Url};
use super::copy_table_from::{build_fs_backend, build_s3_backend, S3_SCHEMA};
use crate::error::{self, Result};
use crate::sql::SqlHandler;
impl SqlHandler {
fn build_backend(
&self,
url: &str,
connection: HashMap<String, String>,
) -> Result<(ObjectStore, String)> {
let result = Url::parse(url);
match result {
Ok(url) => {
let host = url.host_str();
let schema = url.scheme();
let path = url.path();
match schema.to_uppercase().as_str() {
S3_SCHEMA => {
let object_store = build_s3_backend(host, "/", connection)?;
Ok((object_store, path.to_string()))
}
_ => error::UnsupportedBackendProtocolSnafu {
protocol: schema.to_string(),
}
.fail(),
}
}
Err(ParseError::RelativeUrlWithoutBase) => {
let object_store = build_fs_backend("/")?;
Ok((object_store, url.to_string()))
}
Err(err) => Err(error::Error::InvalidUrl {
url: url.to_string(),
source: err,
}),
}
}
pub(crate) async fn copy_table(&self, req: CopyTableRequest) -> Result<Output> {
pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result<Output> {
let table_ref = TableReference {
catalog: &req.catalog_name,
schema: &req.schema_name,
@@ -91,9 +53,11 @@ impl SqlHandler {
.context(error::TableScanExecSnafu)?;
let stream = Box::pin(DfRecordBatchStreamAdapter::new(stream));
let (object_store, file_name) = self.build_backend(&req.file_name, req.connection)?;
let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
let object_store =
build_backend(&req.location, req.connection).context(error::BuildBackendSnafu)?;
let mut parquet_writer = ParquetWriter::new(file_name, stream, object_store);
let mut parquet_writer = ParquetWriter::new(path.to_string(), stream, object_store);
// TODO(jiachun):
// For now, COPY is implemented synchronously.
// When copying large table, it will be blocked for a long time.