From 9ce73e7ca1450a6a245da21a88f3bede08d93a57 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Thu, 24 Aug 2023 20:46:54 +0800 Subject: [PATCH] refactor(frontend): TableScan instead of `scan_to_stream` for `COPY TO` (#2244) * refactor(frontend): TableScan instead of `scan_to_stream` for `COPY TO` Signed-off-by: Zhenchi * fix: format Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- src/frontend/src/statement.rs | 9 ++-- src/frontend/src/statement/backup.rs | 26 +++++---- src/frontend/src/statement/copy_table_to.rs | 59 +++++++++++++++------ src/table/src/engine.rs | 7 +++ 4 files changed, 69 insertions(+), 32 deletions(-) diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs index 26ee518f68..3deadcf864 100644 --- a/src/frontend/src/statement.rs +++ b/src/frontend/src/statement.rs @@ -104,11 +104,12 @@ impl StatementExecutor { Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await, Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => { - let req = to_copy_table_request(stmt, query_ctx)?; + let req = to_copy_table_request(stmt, query_ctx.clone())?; match req.direction { - CopyDirection::Export => { - self.copy_table_to(req).await.map(Output::AffectedRows) - } + CopyDirection::Export => self + .copy_table_to(req, query_ctx) + .await + .map(Output::AffectedRows), CopyDirection::Import => { self.copy_table_from(req).await.map(Output::AffectedRows) } diff --git a/src/frontend/src/statement/backup.rs b/src/frontend/src/statement/backup.rs index b0004ad5f2..7d34d376f5 100644 --- a/src/frontend/src/statement/backup.rs +++ b/src/frontend/src/statement/backup.rs @@ -15,6 +15,7 @@ use common_datasource::file_format::Format; use common_query::Output; use common_telemetry::info; +use session::context::QueryContextBuilder; use snafu::{ensure, ResultExt}; use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest}; @@ -65,17 +66,20 @@ impl StatementExecutor { ); let exported = self - .copy_table_to(CopyTableRequest { - catalog_name: req.catalog_name.clone(), - schema_name: req.schema_name.clone(), - table_name, - location: table_file, - with: req.with.clone(), - connection: req.connection.clone(), - pattern: None, - direction: CopyDirection::Export, - timestamp_range: req.time_range, - }) + .copy_table_to( + CopyTableRequest { + catalog_name: req.catalog_name.clone(), + schema_name: req.schema_name.clone(), + table_name, + location: table_file, + with: req.with.clone(), + connection: req.connection.clone(), + pattern: None, + direction: CopyDirection::Export, + timestamp_range: req.time_range, + }, + QueryContextBuilder::default().build(), + ) .await?; exported_rows += exported; } diff --git a/src/frontend/src/statement/copy_table_to.rs b/src/frontend/src/statement/copy_table_to.rs index 5efd934ec5..c23d5473f3 100644 --- a/src/frontend/src/statement/copy_table_to.rs +++ b/src/frontend/src/statement/copy_table_to.rs @@ -12,22 +12,32 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use common_base::readable_size::ReadableSize; use common_datasource::file_format::csv::stream_to_csv; use common_datasource::file_format::json::stream_to_json; use common_datasource::file_format::Format; use common_datasource::object_store::{build_backend, parse_url}; +use common_query::Output; use common_recordbatch::adapter::DfRecordBatchStreamAdapter; use common_recordbatch::SendableRecordBatchStream; +use datafusion::datasource::DefaultTableSource; +use datafusion_common::TableReference as DfTableReference; +use datafusion_expr::LogicalPlanBuilder; use object_store::ObjectStore; +use query::plan::LogicalPlan; +use session::context::QueryContextRef; use snafu::ResultExt; use storage::sst::SstInfo; use storage::{ParquetWriter, Source}; -use store_api::storage::ScanRequest; use table::engine::TableReference; use table::requests::CopyTableRequest; +use table::table::adapter::DfTableProviderAdapter; -use crate::error::{self, Result, WriteParquetSnafu}; +use crate::error::{ + self, BuildDfLogicalPlanSnafu, ExecLogicalPlanSnafu, Result, WriteParquetSnafu, +}; use crate::statement::StatementExecutor; impl StatementExecutor { @@ -72,16 +82,18 @@ impl StatementExecutor { } } - pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result { - let table_ref = TableReference { - catalog: &req.catalog_name, - schema: &req.schema_name, - table: &req.table_name, - }; + pub(crate) async fn copy_table_to( + &self, + req: CopyTableRequest, + query_ctx: QueryContextRef, + ) -> Result { + let table_ref = TableReference::full(&req.catalog_name, &req.schema_name, &req.table_name); let table = self.get_table(&table_ref).await?; let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?; + let df_table_ref = DfTableReference::from(table_ref); + let filters = table .schema() .timestamp_column() @@ -91,20 +103,33 @@ impl StatementExecutor { req.timestamp_range.as_ref(), ) }) + .map(|filter| filter.df_expr().clone()) .into_iter() .collect::>(); - let scan_req = ScanRequest { + let table_provider = Arc::new(DfTableProviderAdapter::new(table)); + let table_source = Arc::new(DefaultTableSource::new(table_provider)); + + let plan = LogicalPlanBuilder::scan_with_filters( + df_table_ref.to_owned_reference(), + table_source, + None, filters, - ..Default::default() + ) + .context(BuildDfLogicalPlanSnafu)? + .build() + .context(BuildDfLogicalPlanSnafu)?; + + let output = self + .query_engine + .execute(LogicalPlan::DfPlan(plan), query_ctx) + .await + .context(ExecLogicalPlanSnafu)?; + let stream = match output { + Output::Stream(stream) => stream, + Output::RecordBatches(record_batches) => record_batches.as_stream(), + _ => unreachable!(), }; - let stream = - table - .scan_to_stream(scan_req) - .await - .with_context(|_| error::CopyTableSnafu { - table_name: table_ref.to_string(), - })?; let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?; let object_store = diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index d26dc0b768..f4937bfb60 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_base::paths::DATA_DIR; use common_procedure::BoxedProcedure; +use datafusion_common::TableReference as DfTableReference; use store_api::storage::RegionNumber; use crate::error::{self, Result}; @@ -63,6 +64,12 @@ impl<'a> Display for TableReference<'a> { } } +impl<'a> From> for DfTableReference<'a> { + fn from(val: TableReference<'a>) -> Self { + DfTableReference::full(val.catalog, val.schema, val.table) + } +} + /// CloseTableResult /// /// Returns [`CloseTableResult::Released`] and closed region numbers if a table was removed