diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index d26296d449..12f5ecfdc9 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -32,7 +32,7 @@ use snafu::ResultExt; use crate::error::{self, Result}; use crate::{ - DfRecordBatch, DfSendableRecordBatchStream, RecordBatch, RecordBatchStream, + DfRecordBatch, DfSendableRecordBatchStream, OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream, Stream, }; @@ -228,6 +228,10 @@ impl RecordBatchStream for RecordBatchStreamAdapter { Metrics::Unavailable | Metrics::Unresolved(_) => None, } } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } } impl Stream for RecordBatchStreamAdapter { @@ -316,6 +320,14 @@ impl RecordBatchStream for AsyncRecordBatchStreamAdapter { fn schema(&self) -> SchemaRef { self.schema.clone() } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } + + fn metrics(&self) -> Option { + None + } } impl Stream for AsyncRecordBatchStreamAdapter { @@ -375,6 +387,14 @@ mod test { fn schema(&self) -> SchemaRef { unimplemented!() } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } + + fn metrics(&self) -> Option { + None + } } impl Stream for MaybeErrorRecordBatchStream { diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 93ba03a333..f062a6474f 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -39,13 +39,9 @@ use snafu::{ensure, ResultExt}; pub trait RecordBatchStream: Stream> { fn schema(&self) -> SchemaRef; - fn output_ordering(&self) -> Option<&[OrderOption]> { - None - } + fn output_ordering(&self) -> Option<&[OrderOption]>; - fn metrics(&self) -> Option { - None - } + fn metrics(&self) -> Option; } pub type SendableRecordBatchStream = Pin>; @@ -74,6 +70,14 @@ impl RecordBatchStream for EmptyRecordBatchStream { fn schema(&self) -> SchemaRef { self.schema.clone() } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } + + fn metrics(&self) -> Option { + None + } } impl Stream for EmptyRecordBatchStream { @@ -192,6 +196,14 @@ impl RecordBatchStream for SimpleRecordBatchStream { fn schema(&self) -> SchemaRef { self.inner.schema() } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } + + fn metrics(&self) -> Option { + None + } } impl Stream for SimpleRecordBatchStream { diff --git a/src/common/recordbatch/src/util.rs b/src/common/recordbatch/src/util.rs index 258e113d85..723a0f9dca 100644 --- a/src/common/recordbatch/src/util.rs +++ b/src/common/recordbatch/src/util.rs @@ -41,7 +41,8 @@ mod tests { use futures::Stream; use super::*; - use crate::RecordBatchStream; + use crate::adapter::RecordBatchMetrics; + use crate::{OrderOption, RecordBatchStream}; struct MockRecordBatchStream { batch: Option, @@ -52,6 +53,14 @@ mod tests { fn schema(&self) -> SchemaRef { self.schema.clone() } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } + + fn metrics(&self) -> Option { + None + } } impl Stream for MockRecordBatchStream { diff --git a/src/file-engine/src/query.rs b/src/file-engine/src/query.rs index e305376b2a..31e14a9898 100644 --- a/src/file-engine/src/query.rs +++ b/src/file-engine/src/query.rs @@ -22,8 +22,9 @@ use std::task::{Context, Poll}; use common_datasource::object_store::build_backend; use common_error::ext::BoxedError; use common_query::prelude::Expr; +use common_recordbatch::adapter::RecordBatchMetrics; use common_recordbatch::error::{CastVectorSnafu, ExternalSnafu, Result as RecordBatchResult}; -use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datafusion::logical_expr::utils as df_logical_expr_utils; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; @@ -151,6 +152,14 @@ impl RecordBatchStream for FileToScanRegionStream { fn schema(&self) -> SchemaRef { self.scan_schema.clone() } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } + + fn metrics(&self) -> Option { + None + } } impl Stream for FileToScanRegionStream { diff --git a/src/query/src/metrics.rs b/src/query/src/metrics.rs index eb5f9c3ede..ac9397eec3 100644 --- a/src/query/src/metrics.rs +++ b/src/query/src/metrics.rs @@ -15,7 +15,8 @@ use std::pin::Pin; use std::task::{Context, Poll}; -use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_recordbatch::adapter::RecordBatchMetrics; +use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datatypes::schema::SchemaRef; use futures::Stream; use futures_util::ready; @@ -78,6 +79,14 @@ impl RecordBatchStream for OnDone { fn schema(&self) -> SchemaRef { self.stream.schema() } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + self.stream.output_ordering() + } + + fn metrics(&self) -> Option { + self.stream.metrics() + } } impl Stream for OnDone { diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index dbf1eec6e3..31a814dedf 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -26,9 +26,10 @@ use common_function::function_registry::FUNCTION_REGISTRY; use common_query::error::{PyUdfSnafu, UdfTempRecordBatchSnafu}; use common_query::prelude::Signature; use common_query::{Output, OutputData}; +use common_recordbatch::adapter::RecordBatchMetrics; use common_recordbatch::error::{ExternalSnafu, Result as RecordBatchResult}; use common_recordbatch::{ - RecordBatch, RecordBatchStream, RecordBatches, SendableRecordBatchStream, + OrderOption, RecordBatch, RecordBatchStream, RecordBatches, SendableRecordBatchStream, }; use datafusion_expr::Volatility; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; @@ -255,6 +256,14 @@ impl RecordBatchStream for CoprStream { // FIXME(discord9): use copr returns for schema self.ret_schema.clone() } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } + + fn metrics(&self) -> Option { + None + } } impl Stream for CoprStream { diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index d761abc50a..3ae0bf5438 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -17,8 +17,9 @@ use std::sync::Arc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::BoxedError; +use common_recordbatch::adapter::RecordBatchMetrics; use common_recordbatch::error::Result as RecordBatchResult; -use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch; use datatypes::arrow::array::UInt32Array; use datatypes::data_type::ConcreteDataType; @@ -123,6 +124,14 @@ impl RecordBatchStream for NumbersStream { fn schema(&self) -> SchemaRef { self.schema.clone() } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } + + fn metrics(&self) -> Option { + None + } } impl Stream for NumbersStream { diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index e1f19c5730..0d5b769136 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -23,7 +23,7 @@ use common_query::error::Result as QueryResult; use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef}; use common_recordbatch::adapter::RecordBatchMetrics; use common_recordbatch::error::Result as RecordBatchResult; -use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use common_telemetry::tracing::Span; use common_telemetry::tracing_context::TracingContext; use datafusion::execution::context::TaskContext; @@ -157,6 +157,10 @@ impl RecordBatchStream for StreamWithMetricWrapper { fn metrics(&self) -> Option { self.stream.metrics() } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + self.stream.output_ordering() + } } #[cfg(test)] diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index cb36bac2c7..837a6055b5 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -17,8 +17,9 @@ use std::sync::Arc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::BoxedError; +use common_recordbatch::adapter::RecordBatchMetrics; use common_recordbatch::error::Result as RecordBatchResult; -use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::UInt32Vector; @@ -165,6 +166,14 @@ impl RecordBatchStream for MemtableStream { fn schema(&self) -> SchemaRef { self.schema.clone() } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } + + fn metrics(&self) -> Option { + None + } } struct MemtableStream {