mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-25 17:30:41 +00:00
refactor(frontend): TableScan instead of scan_to_stream for COPY TO (#2244)
* refactor(frontend): TableScan instead of `scan_to_stream` for `COPY TO` Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * fix: format Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
@@ -104,11 +104,12 @@ impl StatementExecutor {
|
||||
Statement::ShowTables(stmt) => self.show_tables(stmt, query_ctx).await,
|
||||
|
||||
Statement::Copy(sql::statements::copy::Copy::CopyTable(stmt)) => {
|
||||
let req = to_copy_table_request(stmt, query_ctx)?;
|
||||
let req = to_copy_table_request(stmt, query_ctx.clone())?;
|
||||
match req.direction {
|
||||
CopyDirection::Export => {
|
||||
self.copy_table_to(req).await.map(Output::AffectedRows)
|
||||
}
|
||||
CopyDirection::Export => self
|
||||
.copy_table_to(req, query_ctx)
|
||||
.await
|
||||
.map(Output::AffectedRows),
|
||||
CopyDirection::Import => {
|
||||
self.copy_table_from(req).await.map(Output::AffectedRows)
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use common_datasource::file_format::Format;
|
||||
use common_query::Output;
|
||||
use common_telemetry::info;
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{ensure, ResultExt};
|
||||
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
|
||||
|
||||
@@ -65,17 +66,20 @@ impl StatementExecutor {
|
||||
);
|
||||
|
||||
let exported = self
|
||||
.copy_table_to(CopyTableRequest {
|
||||
catalog_name: req.catalog_name.clone(),
|
||||
schema_name: req.schema_name.clone(),
|
||||
table_name,
|
||||
location: table_file,
|
||||
with: req.with.clone(),
|
||||
connection: req.connection.clone(),
|
||||
pattern: None,
|
||||
direction: CopyDirection::Export,
|
||||
timestamp_range: req.time_range,
|
||||
})
|
||||
.copy_table_to(
|
||||
CopyTableRequest {
|
||||
catalog_name: req.catalog_name.clone(),
|
||||
schema_name: req.schema_name.clone(),
|
||||
table_name,
|
||||
location: table_file,
|
||||
with: req.with.clone(),
|
||||
connection: req.connection.clone(),
|
||||
pattern: None,
|
||||
direction: CopyDirection::Export,
|
||||
timestamp_range: req.time_range,
|
||||
},
|
||||
QueryContextBuilder::default().build(),
|
||||
)
|
||||
.await?;
|
||||
exported_rows += exported;
|
||||
}
|
||||
|
||||
@@ -12,22 +12,32 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_datasource::file_format::csv::stream_to_csv;
|
||||
use common_datasource::file_format::json::stream_to_json;
|
||||
use common_datasource::file_format::Format;
|
||||
use common_datasource::object_store::{build_backend, parse_url};
|
||||
use common_query::Output;
|
||||
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion_common::TableReference as DfTableReference;
|
||||
use datafusion_expr::LogicalPlanBuilder;
|
||||
use object_store::ObjectStore;
|
||||
use query::plan::LogicalPlan;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use storage::sst::SstInfo;
|
||||
use storage::{ParquetWriter, Source};
|
||||
use store_api::storage::ScanRequest;
|
||||
use table::engine::TableReference;
|
||||
use table::requests::CopyTableRequest;
|
||||
use table::table::adapter::DfTableProviderAdapter;
|
||||
|
||||
use crate::error::{self, Result, WriteParquetSnafu};
|
||||
use crate::error::{
|
||||
self, BuildDfLogicalPlanSnafu, ExecLogicalPlanSnafu, Result, WriteParquetSnafu,
|
||||
};
|
||||
use crate::statement::StatementExecutor;
|
||||
|
||||
impl StatementExecutor {
|
||||
@@ -72,16 +82,18 @@ impl StatementExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result<usize> {
|
||||
let table_ref = TableReference {
|
||||
catalog: &req.catalog_name,
|
||||
schema: &req.schema_name,
|
||||
table: &req.table_name,
|
||||
};
|
||||
pub(crate) async fn copy_table_to(
|
||||
&self,
|
||||
req: CopyTableRequest,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<usize> {
|
||||
let table_ref = TableReference::full(&req.catalog_name, &req.schema_name, &req.table_name);
|
||||
let table = self.get_table(&table_ref).await?;
|
||||
|
||||
let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?;
|
||||
|
||||
let df_table_ref = DfTableReference::from(table_ref);
|
||||
|
||||
let filters = table
|
||||
.schema()
|
||||
.timestamp_column()
|
||||
@@ -91,20 +103,33 @@ impl StatementExecutor {
|
||||
req.timestamp_range.as_ref(),
|
||||
)
|
||||
})
|
||||
.map(|filter| filter.df_expr().clone())
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let scan_req = ScanRequest {
|
||||
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
|
||||
let table_source = Arc::new(DefaultTableSource::new(table_provider));
|
||||
|
||||
let plan = LogicalPlanBuilder::scan_with_filters(
|
||||
df_table_ref.to_owned_reference(),
|
||||
table_source,
|
||||
None,
|
||||
filters,
|
||||
..Default::default()
|
||||
)
|
||||
.context(BuildDfLogicalPlanSnafu)?
|
||||
.build()
|
||||
.context(BuildDfLogicalPlanSnafu)?;
|
||||
|
||||
let output = self
|
||||
.query_engine
|
||||
.execute(LogicalPlan::DfPlan(plan), query_ctx)
|
||||
.await
|
||||
.context(ExecLogicalPlanSnafu)?;
|
||||
let stream = match output {
|
||||
Output::Stream(stream) => stream,
|
||||
Output::RecordBatches(record_batches) => record_batches.as_stream(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let stream =
|
||||
table
|
||||
.scan_to_stream(scan_req)
|
||||
.await
|
||||
.with_context(|_| error::CopyTableSnafu {
|
||||
table_name: table_ref.to_string(),
|
||||
})?;
|
||||
|
||||
let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?;
|
||||
let object_store =
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::sync::Arc;
|
||||
|
||||
use common_base::paths::DATA_DIR;
|
||||
use common_procedure::BoxedProcedure;
|
||||
use datafusion_common::TableReference as DfTableReference;
|
||||
use store_api::storage::RegionNumber;
|
||||
|
||||
use crate::error::{self, Result};
|
||||
@@ -63,6 +64,12 @@ impl<'a> Display for TableReference<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<TableReference<'a>> for DfTableReference<'a> {
|
||||
fn from(val: TableReference<'a>) -> Self {
|
||||
DfTableReference::full(val.catalog, val.schema, val.table)
|
||||
}
|
||||
}
|
||||
|
||||
/// CloseTableResult
|
||||
///
|
||||
/// Returns [`CloseTableResult::Released`] and closed region numbers if a table was removed
|
||||
|
||||
Reference in New Issue
Block a user