fix: impl RecordBatchStream method explicitly (#3482)

fix: impl RecordBatchStream method explicitly
This commit is contained in:
shuiyisong
2024-03-11 17:07:10 +08:00
committed by GitHub
parent aa125a50f9
commit aa953dcc34
9 changed files with 104 additions and 14 deletions

View File

@@ -32,7 +32,7 @@ use snafu::ResultExt;
use crate::error::{self, Result};
use crate::{
DfRecordBatch, DfSendableRecordBatchStream, RecordBatch, RecordBatchStream,
DfRecordBatch, DfSendableRecordBatchStream, OrderOption, RecordBatch, RecordBatchStream,
SendableRecordBatchStream, Stream,
};
@@ -228,6 +228,10 @@ impl RecordBatchStream for RecordBatchStreamAdapter {
Metrics::Unavailable | Metrics::Unresolved(_) => None,
}
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
}
impl Stream for RecordBatchStreamAdapter {
@@ -316,6 +320,14 @@ impl RecordBatchStream for AsyncRecordBatchStreamAdapter {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
None
}
}
impl Stream for AsyncRecordBatchStreamAdapter {
@@ -375,6 +387,14 @@ mod test {
fn schema(&self) -> SchemaRef {
unimplemented!()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
None
}
}
impl Stream for MaybeErrorRecordBatchStream {

View File

@@ -39,13 +39,9 @@ use snafu::{ensure, ResultExt};
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn schema(&self) -> SchemaRef;
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn output_ordering(&self) -> Option<&[OrderOption]>;
fn metrics(&self) -> Option<RecordBatchMetrics> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics>;
}
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
@@ -74,6 +70,14 @@ impl RecordBatchStream for EmptyRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
None
}
}
impl Stream for EmptyRecordBatchStream {
@@ -192,6 +196,14 @@ impl RecordBatchStream for SimpleRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
None
}
}
impl Stream for SimpleRecordBatchStream {

View File

@@ -41,7 +41,8 @@ mod tests {
use futures::Stream;
use super::*;
use crate::RecordBatchStream;
use crate::adapter::RecordBatchMetrics;
use crate::{OrderOption, RecordBatchStream};
struct MockRecordBatchStream {
batch: Option<RecordBatch>,
@@ -52,6 +53,14 @@ mod tests {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
None
}
}
impl Stream for MockRecordBatchStream {

View File

@@ -22,8 +22,9 @@ use std::task::{Context, Poll};
use common_datasource::object_store::build_backend;
use common_error::ext::BoxedError;
use common_query::prelude::Expr;
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::error::{CastVectorSnafu, ExternalSnafu, Result as RecordBatchResult};
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion::logical_expr::utils as df_logical_expr_utils;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
@@ -151,6 +152,14 @@ impl RecordBatchStream for FileToScanRegionStream {
fn schema(&self) -> SchemaRef {
self.scan_schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
None
}
}
impl Stream for FileToScanRegionStream {

View File

@@ -15,7 +15,8 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datatypes::schema::SchemaRef;
use futures::Stream;
use futures_util::ready;
@@ -78,6 +79,14 @@ impl<F: FnOnce() + Unpin> RecordBatchStream for OnDone<F> {
fn schema(&self) -> SchemaRef {
self.stream.schema()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.stream.output_ordering()
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
self.stream.metrics()
}
}
impl<F: FnOnce() + Unpin> Stream for OnDone<F> {

View File

@@ -26,9 +26,10 @@ use common_function::function_registry::FUNCTION_REGISTRY;
use common_query::error::{PyUdfSnafu, UdfTempRecordBatchSnafu};
use common_query::prelude::Signature;
use common_query::{Output, OutputData};
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::error::{ExternalSnafu, Result as RecordBatchResult};
use common_recordbatch::{
RecordBatch, RecordBatchStream, RecordBatches, SendableRecordBatchStream,
OrderOption, RecordBatch, RecordBatchStream, RecordBatches, SendableRecordBatchStream,
};
use datafusion_expr::Volatility;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
@@ -255,6 +256,14 @@ impl RecordBatchStream for CoprStream {
// FIXME(discord9): use copr returns for schema
self.ret_schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
None
}
}
impl Stream for CoprStream {

View File

@@ -17,8 +17,9 @@ use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::array::UInt32Array;
use datatypes::data_type::ConcreteDataType;
@@ -123,6 +124,14 @@ impl RecordBatchStream for NumbersStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
None
}
}
impl Stream for NumbersStream {

View File

@@ -23,7 +23,7 @@ use common_query::error::Result as QueryResult;
use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef};
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_telemetry::tracing::Span;
use common_telemetry::tracing_context::TracingContext;
use datafusion::execution::context::TaskContext;
@@ -157,6 +157,10 @@ impl RecordBatchStream for StreamWithMetricWrapper {
fn metrics(&self) -> Option<RecordBatchMetrics> {
self.stream.metrics()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.stream.output_ordering()
}
}
#[cfg(test)]

View File

@@ -17,8 +17,9 @@ use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::UInt32Vector;
@@ -165,6 +166,14 @@ impl RecordBatchStream for MemtableStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<RecordBatchMetrics> {
None
}
}
struct MemtableStream {