diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index 46b12d0e45..7664d8f0fd 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -140,9 +140,9 @@ impl Stream for NumbersStream { ) .unwrap(); - Poll::Ready(Some(Ok(RecordBatch { - schema: self.schema.clone(), - df_recordbatch: batch, - }))) + Poll::Ready(Some(RecordBatch::try_from_df_record_batch( + self.schema.clone(), + batch, + ))) } } diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index 2fdd1228f6..4fef5ed68c 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -17,6 +17,7 @@ use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; +use common_error::prelude::BoxedError; use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; use common_recordbatch::error::Result as RecordBatchResult; @@ -29,7 +30,7 @@ use futures::Stream; use snafu::prelude::*; use store_api::storage::RegionNumber; -use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu}; +use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu, TablesRecordBatchSnafu}; use crate::metadata::{ TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType, TableVersion, }; @@ -145,11 +146,11 @@ impl Table for MemTable { ) -> Result { let df_recordbatch = if let Some(indices) = projection { self.recordbatch - .df_recordbatch + .df_record_batch() .project(indices) .context(TableProjectionSnafu)? } else { - self.recordbatch.df_recordbatch.clone() + self.recordbatch.df_record_batch().clone() }; let rows = df_recordbatch.num_rows(); @@ -160,12 +161,14 @@ impl Table for MemTable { }; let df_recordbatch = df_recordbatch.slice(0, limit); - let recordbatch = RecordBatch { - schema: Arc::new( + let recordbatch = RecordBatch::try_from_df_record_batch( + Arc::new( Schema::try_from(df_recordbatch.schema().clone()).context(SchemaConversionSnafu)?, ), df_recordbatch, - }; + ) + .map_err(BoxedError::new) + .context(TablesRecordBatchSnafu)?; Ok(Arc::new(SimpleTableScan::new(Box::pin(MemtableStream { schema: recordbatch.schema.clone(), recordbatch: Some(recordbatch), @@ -214,7 +217,7 @@ mod test { let scan_stream = scan_stream.execute(0, ctx.task_ctx()).unwrap(); let recordbatch = util::collect(scan_stream).await.unwrap(); assert_eq!(1, recordbatch.len()); - let columns = recordbatch[0].df_recordbatch.columns(); + let columns = recordbatch[0].df_record_batch().columns(); assert_eq!(1, columns.len()); let string_column = Helper::try_into_vector(&columns[0]).unwrap(); @@ -235,7 +238,7 @@ mod test { let scan_stream = scan_stream.execute(0, ctx.task_ctx()).unwrap(); let recordbatch = util::collect(scan_stream).await.unwrap(); assert_eq!(1, recordbatch.len()); - let columns = recordbatch[0].df_recordbatch.columns(); + let columns = recordbatch[0].df_record_batch().columns(); assert_eq!(2, columns.len()); let i32_column = Helper::try_into_vector(&columns[0]).unwrap();