From 1138f32af90f5f299d3cf249807c387b68509fe3 Mon Sep 17 00:00:00 2001 From: taobo Date: Tue, 30 Jul 2024 00:55:19 +0800 Subject: [PATCH] feat: support setting time range in Copy From statement (#4405) * feat: support setting time range in Copy From statement * test: add batch_filter_test * fix: ts data type inconsistent error * test: add sqlness test for copy from with statement * fix: sqlness result error * fix: cr comments --- src/common/query/src/logical_plan.rs | 2 +- src/common/query/src/logical_plan/expr.rs | 29 +++++++ src/common/recordbatch/src/adapter.rs | 37 ++++++++- src/common/recordbatch/src/error.rs | 11 ++- src/common/recordbatch/src/filter.rs | 74 ++++++++++++++++- src/operator/src/error.rs | 10 ++- src/operator/src/statement/copy_table_from.rs | 51 +++++++----- .../common/copy/copy_from_fs_csv.result | 25 +++--- .../common/copy/copy_from_fs_csv.sql | 13 ++- .../common/copy/copy_from_fs_json.result | 17 ++-- .../common/copy/copy_from_fs_json.sql | 13 ++- .../common/copy/copy_from_fs_parquet.result | 82 +++++++++++-------- .../common/copy/copy_from_fs_parquet.sql | 26 ++++-- 13 files changed, 294 insertions(+), 96 deletions(-) diff --git a/src/common/query/src/logical_plan.rs b/src/common/query/src/logical_plan.rs index 6cfee747c2..7fd081c219 100644 --- a/src/common/query/src/logical_plan.rs +++ b/src/common/query/src/logical_plan.rs @@ -25,7 +25,7 @@ use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder}; use datafusion_common::Column; use datafusion_expr::col; use datatypes::prelude::ConcreteDataType; -pub use expr::build_filter_from_timestamp; +pub use expr::{build_filter_from_timestamp, build_same_type_ts_filter}; pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef}; pub use self::udaf::AggregateFunction; diff --git a/src/common/query/src/logical_plan/expr.rs b/src/common/query/src/logical_plan/expr.rs index 2d30bee2af..3b9f6cd120 100644 --- a/src/common/query/src/logical_plan/expr.rs +++ b/src/common/query/src/logical_plan/expr.rs @@ -18,6 +18,35 @@ use common_time::Timestamp; use datafusion_common::{Column, ScalarValue}; use datafusion_expr::expr::Expr; use datafusion_expr::{and, binary_expr, Operator}; +use datatypes::data_type::DataType; +use datatypes::schema::ColumnSchema; +use datatypes::value::Value; + +/// Builds a filter for a timestamp column with the same type as the timestamp column. +/// Returns [None] if time range is [None] or full time range. +pub fn build_same_type_ts_filter( + ts_schema: &ColumnSchema, + time_range: Option, +) -> Option { + let ts_type = ts_schema.data_type.clone(); + let time_range = time_range?; + let start = time_range + .start() + .and_then(|start| ts_type.try_cast(Value::Timestamp(start))); + let end = time_range + .end() + .and_then(|end| ts_type.try_cast(Value::Timestamp(end))); + + let time_range = match (start, end) { + (Some(Value::Timestamp(start)), Some(Value::Timestamp(end))) => { + TimestampRange::new(start, end) + } + (Some(Value::Timestamp(start)), None) => Some(TimestampRange::from_start(start)), + (None, Some(Value::Timestamp(end))) => Some(TimestampRange::until_end(end, false)), + _ => return None, + }; + build_filter_from_timestamp(&ts_schema.name, time_range.as_ref()) +} /// Builds an `Expr` that filters timestamp column from given timestamp range. /// Returns [None] if time range is [None] or full time range. diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 0af2c8ac33..85236381b2 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -22,19 +22,25 @@ use std::task::{Context, Poll}; use datafusion::arrow::compute::cast; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::error::Result as DfResult; +use datafusion::execution::context::ExecutionProps; +use datafusion::logical_expr::utils::conjunction; +use datafusion::logical_expr::Expr; +use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue}; use datafusion::physical_plan::{ - accept, displayable, ExecutionPlan, ExecutionPlanVisitor, + accept, displayable, ExecutionPlan, ExecutionPlanVisitor, PhysicalExpr, RecordBatchStream as DfRecordBatchStream, }; use datafusion_common::arrow::error::ArrowError; -use datafusion_common::DataFusionError; +use datafusion_common::{DataFusionError, ToDFSchema}; +use datatypes::arrow::array::Array; use datatypes::schema::{Schema, SchemaRef}; use futures::ready; use pin_project::pin_project; use snafu::ResultExt; use crate::error::{self, Result}; +use crate::filter::batch_filter; use crate::{ DfRecordBatch, DfSendableRecordBatchStream, OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream, Stream, @@ -50,6 +56,7 @@ pub struct RecordBatchStreamTypeAdapter { stream: T, projected_schema: DfSchemaRef, projection: Vec, + predicate: Option>, phantom: PhantomData, } @@ -69,9 +76,28 @@ where stream, projected_schema, projection, + predicate: None, phantom: Default::default(), } } + + pub fn with_filter(mut self, filters: Vec) -> Result { + let filters = if let Some(expr) = conjunction(filters) { + let df_schema = self + .projected_schema + .clone() + .to_dfschema_ref() + .context(error::PhysicalExprSnafu)?; + + let filters = create_physical_expr(&expr, &df_schema, &ExecutionProps::new()) + .context(error::PhysicalExprSnafu)?; + Some(filters) + } else { + None + }; + self.predicate = filters; + Ok(self) + } } impl DfRecordBatchStream for RecordBatchStreamTypeAdapter @@ -99,6 +125,8 @@ where let projected_schema = this.projected_schema.clone(); let projection = this.projection.clone(); + let predicate = this.predicate.clone(); + let batch = batch.map(|b| { b.and_then(|b| { let projected_column = b.project(&projection)?; @@ -121,6 +149,11 @@ where } } let record_batch = DfRecordBatch::try_new(projected_schema, columns)?; + let record_batch = if let Some(predicate) = predicate { + batch_filter(&record_batch, &predicate)? + } else { + record_batch + }; Ok(record_batch) }) }); diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index f5424d410a..f2114f645f 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -73,6 +73,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Create physical expr error"))] + PhysicalExpr { + #[snafu(source)] + error: datafusion::error::DataFusionError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Fail to format record batch"))] Format { #[snafu(source)] @@ -167,7 +175,8 @@ impl ErrorExt for Error { | Error::PollStream { .. } | Error::Format { .. } | Error::ToArrowScalar { .. } - | Error::ProjectArrowRecordBatch { .. } => StatusCode::Internal, + | Error::ProjectArrowRecordBatch { .. } + | Error::PhysicalExpr { .. } => StatusCode::Internal, Error::ArrowCompute { .. } => StatusCode::IllegalState, diff --git a/src/common/recordbatch/src/filter.rs b/src/common/recordbatch/src/filter.rs index 195abb1181..8c1ebe7d53 100644 --- a/src/common/recordbatch/src/filter.rs +++ b/src/common/recordbatch/src/filter.rs @@ -14,11 +14,18 @@ //! Util record batch stream wrapper that can perform precise filter. +use std::sync::Arc; + +use datafusion::error::Result as DfResult; use datafusion::logical_expr::{Expr, Literal, Operator}; +use datafusion::physical_plan::PhysicalExpr; use datafusion_common::arrow::array::{ArrayRef, Datum, Scalar}; use datafusion_common::arrow::buffer::BooleanBuffer; use datafusion_common::arrow::compute::kernels::cmp; -use datafusion_common::ScalarValue; +use datafusion_common::cast::{as_boolean_array, as_null_array}; +use datafusion_common::{internal_err, DataFusionError, ScalarValue}; +use datatypes::arrow::array::{Array, BooleanArray, RecordBatch}; +use datatypes::arrow::compute::filter_record_batch; use datatypes::vectors::VectorRef; use snafu::ResultExt; @@ -144,13 +151,43 @@ impl SimpleFilterEvaluator { } } +/// Evaluate the predicate on the input [RecordBatch], and return a new [RecordBatch]. +/// Copy from datafusion::physical_plan::src::filter.rs +pub fn batch_filter( + batch: &RecordBatch, + predicate: &Arc, +) -> DfResult { + predicate + .evaluate(batch) + .and_then(|v| v.into_array(batch.num_rows())) + .and_then(|array| { + let filter_array = match as_boolean_array(&array) { + Ok(boolean_array) => Ok(boolean_array.clone()), + Err(_) => { + let Ok(null_array) = as_null_array(&array) else { + return internal_err!( + "Cannot create filter_array from non-boolean predicates" + ); + }; + + // if the predicate is null, then the result is also null + Ok::(BooleanArray::new_null(null_array.len())) + } + }?; + Ok(filter_record_batch(batch, &filter_array)?) + }) +} + #[cfg(test)] mod test { use std::sync::Arc; - use datafusion::logical_expr::BinaryExpr; - use datafusion_common::Column; + use datafusion::execution::context::ExecutionProps; + use datafusion::logical_expr::{col, lit, BinaryExpr}; + use datafusion::physical_expr::create_physical_expr; + use datafusion_common::{Column, DFSchema}; + use datatypes::arrow::datatypes::{DataType, Field, Schema}; use super::*; @@ -281,4 +318,35 @@ mod test { let result = evaluator.evaluate_scalar(&input_3).unwrap(); assert!(!result); } + + #[test] + fn batch_filter_test() { + let expr = col("ts").gt(lit(123456u64)); + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("ts", DataType::UInt64, false), + ]); + let df_schema = DFSchema::try_from(schema.clone()).unwrap(); + let props = ExecutionProps::new(); + let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(datatypes::arrow::array::Int32Array::from(vec![4, 5, 6])), + Arc::new(datatypes::arrow::array::UInt64Array::from(vec![ + 123456, 123457, 123458, + ])), + ], + ) + .unwrap(); + let new_batch = batch_filter(&batch, &physical_expr).unwrap(); + assert_eq!(new_batch.num_rows(), 2); + let first_column_values = new_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let expected = datatypes::arrow::array::Int32Array::from(vec![5, 6]); + assert_eq!(first_column_values, &expected); + } } diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 4c657ceeb0..8e7991fd06 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -754,6 +754,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Create physical expr error"))] + PhysicalExpr { + #[snafu(source)] + error: common_recordbatch::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -788,7 +795,8 @@ impl ErrorExt for Error { | Error::ViewColumnsMismatch { .. } | Error::InvalidViewStmt { .. } | Error::ConvertIdentifier { .. } - | Error::InvalidPartition { .. } => StatusCode::InvalidArguments, + | Error::InvalidPartition { .. } + | Error::PhysicalExpr { .. } => StatusCode::InvalidArguments, Error::TableAlreadyExists { .. } | Error::ViewAlreadyExists { .. } => { StatusCode::TableAlreadyExists diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index a0818d6ea3..b3a151b581 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -36,6 +36,7 @@ use datafusion::parquet::arrow::arrow_reader::ArrowReaderMetadata; use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_common::Statistics; +use datafusion_expr::Expr; use datatypes::arrow::compute::can_cast_types; use datatypes::arrow::datatypes::{Schema, SchemaRef}; use datatypes::vectors::Helper; @@ -225,6 +226,7 @@ impl StatementExecutor { object_store: &ObjectStore, file_metadata: &FileMetadata, projection: Vec, + filters: Vec, ) -> Result { match file_metadata { FileMetadata::Csv { @@ -252,11 +254,11 @@ impl StatementExecutor { ) .await?; - Ok(Box::pin(RecordBatchStreamTypeAdapter::new( - projected_schema, - stream, - Some(projection), - ))) + Ok(Box::pin( + RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection)) + .with_filter(filters) + .context(error::PhysicalExprSnafu)?, + )) } FileMetadata::Json { format, @@ -286,11 +288,11 @@ impl StatementExecutor { ) .await?; - Ok(Box::pin(RecordBatchStreamTypeAdapter::new( - projected_schema, - stream, - Some(projection), - ))) + Ok(Box::pin( + RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection)) + .with_filter(filters) + .context(error::PhysicalExprSnafu)?, + )) } FileMetadata::Parquet { metadata, path, .. } => { let meta = object_store @@ -317,11 +319,11 @@ impl StatementExecutor { .project(&projection) .context(error::ProjectSchemaSnafu)?, ); - Ok(Box::pin(RecordBatchStreamTypeAdapter::new( - projected_schema, - stream, - Some(projection), - ))) + Ok(Box::pin( + RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection)) + .with_filter(filters) + .context(error::PhysicalExprSnafu)?, + )) } FileMetadata::Orc { path, .. } => { let meta = object_store @@ -345,11 +347,11 @@ impl StatementExecutor { .context(error::ProjectSchemaSnafu)?, ); - Ok(Box::pin(RecordBatchStreamTypeAdapter::new( - projected_schema, - stream, - Some(projection), - ))) + Ok(Box::pin( + RecordBatchStreamTypeAdapter::new(projected_schema, stream, Some(projection)) + .with_filter(filters) + .context(error::PhysicalExprSnafu)?, + )) } } } @@ -370,6 +372,14 @@ impl StatementExecutor { let (object_store, entries) = self.list_copy_from_entries(&req).await?; let mut files = Vec::with_capacity(entries.len()); let table_schema = table.schema().arrow_schema().clone(); + let filters = table + .schema() + .timestamp_column() + .and_then(|c| { + common_query::logical_plan::build_same_type_ts_filter(c, req.timestamp_range) + }) + .into_iter() + .collect::>(); for entry in entries.iter() { if entry.metadata().mode() != EntryMode::FILE { @@ -414,6 +424,7 @@ impl StatementExecutor { &object_store, &file_metadata, file_schema_projection, + filters.clone(), ) .await?; diff --git a/tests/cases/standalone/common/copy/copy_from_fs_csv.result b/tests/cases/standalone/common/copy/copy_from_fs_csv.result index d46b5b75fb..19b3fda3f9 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_csv.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_csv.result @@ -2,19 +2,24 @@ CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time inde Affected Rows: 0 -insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); +insert into + demo(host, cpu, memory, ts) +values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000), + ('host3', 99.9, 444.4, 1722077263000); -Affected Rows: 2 +Affected Rows: 3 Copy demo TO '/tmp/demo/export/csv/demo.csv' with (format='csv'); -Affected Rows: 2 +Affected Rows: 3 CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index); Affected Rows: 0 -Copy with_filename FROM '/tmp/demo/export/csv/demo.csv' with (format='csv'); +Copy with_filename FROM '/tmp/demo/export/csv/demo.csv' with (format='csv', start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:39'); Affected Rows: 2 @@ -31,26 +36,25 @@ CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time Affected Rows: 0 -Copy with_path FROM '/tmp/demo/export/csv/' with (format='csv'); +Copy with_path FROM '/tmp/demo/export/csv/' with (format='csv', start_time='2023-06-15 07:02:37'); -Affected Rows: 2 +Affected Rows: 1 select * from with_path order by ts; +-------+------+--------+---------------------+ | host | cpu | memory | ts | +-------+------+--------+---------------------+ -| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | -| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | +| host3 | 99.9 | 444.4 | 2024-07-27T10:47:43 | +-------+------+--------+---------------------+ CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index); Affected Rows: 0 -Copy with_pattern FROM '/tmp/demo/export/csv/' WITH (pattern = 'demo.*',format='csv'); +Copy with_pattern FROM '/tmp/demo/export/csv/' WITH (pattern = 'demo.*', format='csv', end_time='2025-06-15 07:02:39'); -Affected Rows: 2 +Affected Rows: 3 select * from with_pattern order by ts; @@ -59,6 +63,7 @@ select * from with_pattern order by ts; +-------+------+--------+---------------------+ | host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | | host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | +| host3 | 99.9 | 444.4 | 2024-07-27T10:47:43 | +-------+------+--------+---------------------+ drop table demo; diff --git a/tests/cases/standalone/common/copy/copy_from_fs_csv.sql b/tests/cases/standalone/common/copy/copy_from_fs_csv.sql index b38c566e2a..cd6b91f4f8 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_csv.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_csv.sql @@ -1,24 +1,29 @@ CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); -insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); +insert into + demo(host, cpu, memory, ts) +values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000), + ('host3', 99.9, 444.4, 1722077263000); Copy demo TO '/tmp/demo/export/csv/demo.csv' with (format='csv'); CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index); -Copy with_filename FROM '/tmp/demo/export/csv/demo.csv' with (format='csv'); +Copy with_filename FROM '/tmp/demo/export/csv/demo.csv' with (format='csv', start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:39'); select * from with_filename order by ts; CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index); -Copy with_path FROM '/tmp/demo/export/csv/' with (format='csv'); +Copy with_path FROM '/tmp/demo/export/csv/' with (format='csv', start_time='2023-06-15 07:02:37'); select * from with_path order by ts; CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index); -Copy with_pattern FROM '/tmp/demo/export/csv/' WITH (pattern = 'demo.*',format='csv'); +Copy with_pattern FROM '/tmp/demo/export/csv/' WITH (pattern = 'demo.*', format='csv', end_time='2025-06-15 07:02:39'); select * from with_pattern order by ts; diff --git a/tests/cases/standalone/common/copy/copy_from_fs_json.result b/tests/cases/standalone/common/copy/copy_from_fs_json.result index 3f8826e251..bd71b5d624 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_json.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_json.result @@ -2,19 +2,24 @@ CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time inde Affected Rows: 0 -insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); +insert into + demo(host, cpu, memory, ts) +values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000), + ('host3', 99.9, 444.4, 1722077263000); -Affected Rows: 2 +Affected Rows: 3 Copy demo TO '/tmp/demo/export/json/demo.json' with (format='json'); -Affected Rows: 2 +Affected Rows: 3 CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index); Affected Rows: 0 -Copy with_filename FROM '/tmp/demo/export/json/demo.json' with (format='json'); +Copy with_filename FROM '/tmp/demo/export/json/demo.json' with (format='json', start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:39'); Affected Rows: 2 @@ -31,7 +36,7 @@ CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time Affected Rows: 0 -Copy with_path FROM '/tmp/demo/export/json/' with (format='json'); +Copy with_path FROM '/tmp/demo/export/json/' with (format='json', start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:39'); Affected Rows: 2 @@ -48,7 +53,7 @@ CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp t Affected Rows: 0 -Copy with_pattern FROM '/tmp/demo/export/json/' WITH (pattern = 'demo.*',format='json'); +Copy with_pattern FROM '/tmp/demo/export/json/' WITH (pattern = 'demo.*',format='json', end_time='2022-06-15 07:02:39'); Affected Rows: 2 diff --git a/tests/cases/standalone/common/copy/copy_from_fs_json.sql b/tests/cases/standalone/common/copy/copy_from_fs_json.sql index e1f751bd83..c182bb82dc 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_json.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_json.sql @@ -1,24 +1,29 @@ CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); -insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); +insert into + demo(host, cpu, memory, ts) +values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000), + ('host3', 99.9, 444.4, 1722077263000); Copy demo TO '/tmp/demo/export/json/demo.json' with (format='json'); CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index); -Copy with_filename FROM '/tmp/demo/export/json/demo.json' with (format='json'); +Copy with_filename FROM '/tmp/demo/export/json/demo.json' with (format='json', start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:39'); select * from with_filename order by ts; CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index); -Copy with_path FROM '/tmp/demo/export/json/' with (format='json'); +Copy with_path FROM '/tmp/demo/export/json/' with (format='json', start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:39'); select * from with_path order by ts; CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index); -Copy with_pattern FROM '/tmp/demo/export/json/' WITH (pattern = 'demo.*',format='json'); +Copy with_pattern FROM '/tmp/demo/export/json/' WITH (pattern = 'demo.*',format='json', end_time='2022-06-15 07:02:39'); select * from with_pattern order by ts; diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result index 54ec2f1af3..4744756632 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result @@ -2,31 +2,41 @@ CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time inde Affected Rows: 0 -insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); - -Affected Rows: 2 - -Copy demo TO '/tmp/demo/export/parquet_files/demo.parquet'; - -Affected Rows: 2 - CREATE TABLE demo_2(host string, cpu double, memory double, ts TIMESTAMP time index); Affected Rows: 0 -insert into demo_2(host, cpu, memory, ts) values ('host3', 77.7, 1111, 1655276555000), ('host4', 99.9, 444.4, 1655276556000); +insert into + demo(host, cpu, memory, ts) +values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000), + ('host3', 111.1, 444.4, 1722077263000); -Affected Rows: 2 +Affected Rows: 3 + +insert into + demo_2(host, cpu, memory, ts) +values + ('host4', 77.7, 1111, 1655276555000), + ('host5', 99.9, 444.4, 1655276556000), + ('host6', 222.2, 555.5, 1722077264000); + +Affected Rows: 3 + +Copy demo TO '/tmp/demo/export/parquet_files/demo.parquet'; + +Affected Rows: 3 Copy demo_2 TO '/tmp/demo/export/parquet_files/demo_2.parquet'; -Affected Rows: 2 +Affected Rows: 3 CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index); Affected Rows: 0 -Copy with_filename FROM '/tmp/demo/export/parquet_files/demo.parquet'; +Copy with_filename FROM '/tmp/demo/export/parquet_files/demo.parquet' with (start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:39'); Affected Rows: 2 @@ -45,37 +55,37 @@ Affected Rows: 0 Copy with_path FROM '/tmp/demo/export/parquet_files/'; -Affected Rows: 4 +Affected Rows: 6 select * from with_path order by ts; -+-------+------+--------+---------------------+ -| host | cpu | memory | ts | -+-------+------+--------+---------------------+ -| host3 | 77.7 | 1111.0 | 2022-06-15T07:02:35 | -| host4 | 99.9 | 444.4 | 2022-06-15T07:02:36 | -| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | -| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | -+-------+------+--------+---------------------+ ++-------+-------+--------+---------------------+ +| host | cpu | memory | ts | ++-------+-------+--------+---------------------+ +| host4 | 77.7 | 1111.0 | 2022-06-15T07:02:35 | +| host5 | 99.9 | 444.4 | 2022-06-15T07:02:36 | +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | +| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | +| host3 | 111.1 | 444.4 | 2024-07-27T10:47:43 | +| host6 | 222.2 | 555.5 | 2024-07-27T10:47:44 | ++-------+-------+--------+---------------------+ CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index); Affected Rows: 0 -Copy with_pattern FROM '/tmp/demo/export/parquet_files/' WITH (PATTERN = 'demo.*'); +Copy with_pattern FROM '/tmp/demo/export/parquet_files/' WITH (PATTERN = 'demo.*', start_time='2022-06-15 07:02:39'); -Affected Rows: 4 +Affected Rows: 2 select * from with_pattern order by ts; -+-------+------+--------+---------------------+ -| host | cpu | memory | ts | -+-------+------+--------+---------------------+ -| host3 | 77.7 | 1111.0 | 2022-06-15T07:02:35 | -| host4 | 99.9 | 444.4 | 2022-06-15T07:02:36 | -| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | -| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | -+-------+------+--------+---------------------+ ++-------+-------+--------+---------------------+ +| host | cpu | memory | ts | ++-------+-------+--------+---------------------+ +| host3 | 111.1 | 444.4 | 2024-07-27T10:47:43 | +| host6 | 222.2 | 555.5 | 2024-07-27T10:47:44 | ++-------+-------+--------+---------------------+ CREATE TABLE without_limit_rows(host string, cpu double, memory double, ts timestamp time index); @@ -83,30 +93,30 @@ Affected Rows: 0 Copy without_limit_rows FROM '/tmp/demo/export/parquet_files/'; -Affected Rows: 4 +Affected Rows: 6 select count(*) from without_limit_rows; +----------+ | COUNT(*) | +----------+ -| 4 | +| 6 | +----------+ CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index); Affected Rows: 0 -Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 2; +Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 3; -Affected Rows: 2 +Affected Rows: 3 select count(*) from with_limit_rows_segment; +----------+ | COUNT(*) | +----------+ -| 2 | +| 3 | +----------+ Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT hello; diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql index 83cdc4f74c..10319e1281 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql @@ -1,18 +1,28 @@ CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); -insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); - -Copy demo TO '/tmp/demo/export/parquet_files/demo.parquet'; - CREATE TABLE demo_2(host string, cpu double, memory double, ts TIMESTAMP time index); -insert into demo_2(host, cpu, memory, ts) values ('host3', 77.7, 1111, 1655276555000), ('host4', 99.9, 444.4, 1655276556000); +insert into + demo(host, cpu, memory, ts) +values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000), + ('host3', 111.1, 444.4, 1722077263000); + +insert into + demo_2(host, cpu, memory, ts) +values + ('host4', 77.7, 1111, 1655276555000), + ('host5', 99.9, 444.4, 1655276556000), + ('host6', 222.2, 555.5, 1722077264000); + +Copy demo TO '/tmp/demo/export/parquet_files/demo.parquet'; Copy demo_2 TO '/tmp/demo/export/parquet_files/demo_2.parquet'; CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index); -Copy with_filename FROM '/tmp/demo/export/parquet_files/demo.parquet'; +Copy with_filename FROM '/tmp/demo/export/parquet_files/demo.parquet' with (start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:39'); select * from with_filename order by ts; @@ -24,7 +34,7 @@ select * from with_path order by ts; CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index); -Copy with_pattern FROM '/tmp/demo/export/parquet_files/' WITH (PATTERN = 'demo.*'); +Copy with_pattern FROM '/tmp/demo/export/parquet_files/' WITH (PATTERN = 'demo.*', start_time='2022-06-15 07:02:39'); select * from with_pattern order by ts; @@ -36,7 +46,7 @@ select count(*) from without_limit_rows; CREATE TABLE with_limit_rows_segment(host string, cpu double, memory double, ts timestamp time index); -Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 2; +Copy with_limit_rows_segment FROM '/tmp/demo/export/parquet_files/' LIMIT 3; select count(*) from with_limit_rows_segment;