fix common record batch

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2022-12-08 17:58:53 +08:00
parent b936d8b18a
commit fff530cb50
2 changed files with 15 additions and 12 deletions

View File

@@ -140,9 +140,9 @@ impl Stream for NumbersStream {
)
.unwrap();
Poll::Ready(Some(Ok(RecordBatch {
schema: self.schema.clone(),
df_recordbatch: batch,
})))
Poll::Ready(Some(RecordBatch::try_from_df_record_batch(
self.schema.clone(),
batch,
)))
}
}

View File

@@ -17,6 +17,7 @@ use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
use common_error::prelude::BoxedError;
use common_query::physical_plan::PhysicalPlanRef;
use common_query::prelude::Expr;
use common_recordbatch::error::Result as RecordBatchResult;
@@ -29,7 +30,7 @@ use futures::Stream;
use snafu::prelude::*;
use store_api::storage::RegionNumber;
use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu};
use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu, TablesRecordBatchSnafu};
use crate::metadata::{
TableId, TableInfoBuilder, TableInfoRef, TableMetaBuilder, TableType, TableVersion,
};
@@ -145,11 +146,11 @@ impl Table for MemTable {
) -> Result<PhysicalPlanRef> {
let df_recordbatch = if let Some(indices) = projection {
self.recordbatch
.df_recordbatch
.df_record_batch()
.project(indices)
.context(TableProjectionSnafu)?
} else {
self.recordbatch.df_recordbatch.clone()
self.recordbatch.df_record_batch().clone()
};
let rows = df_recordbatch.num_rows();
@@ -160,12 +161,14 @@ impl Table for MemTable {
};
let df_recordbatch = df_recordbatch.slice(0, limit);
let recordbatch = RecordBatch {
schema: Arc::new(
let recordbatch = RecordBatch::try_from_df_record_batch(
Arc::new(
Schema::try_from(df_recordbatch.schema().clone()).context(SchemaConversionSnafu)?,
),
df_recordbatch,
};
)
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?;
Ok(Arc::new(SimpleTableScan::new(Box::pin(MemtableStream {
schema: recordbatch.schema.clone(),
recordbatch: Some(recordbatch),
@@ -214,7 +217,7 @@ mod test {
let scan_stream = scan_stream.execute(0, ctx.task_ctx()).unwrap();
let recordbatch = util::collect(scan_stream).await.unwrap();
assert_eq!(1, recordbatch.len());
let columns = recordbatch[0].df_recordbatch.columns();
let columns = recordbatch[0].df_record_batch().columns();
assert_eq!(1, columns.len());
let string_column = Helper::try_into_vector(&columns[0]).unwrap();
@@ -235,7 +238,7 @@ mod test {
let scan_stream = scan_stream.execute(0, ctx.task_ctx()).unwrap();
let recordbatch = util::collect(scan_stream).await.unwrap();
assert_eq!(1, recordbatch.len());
let columns = recordbatch[0].df_recordbatch.columns();
let columns = recordbatch[0].df_record_batch().columns();
assert_eq!(2, columns.len());
let i32_column = Helper::try_into_vector(&columns[0]).unwrap();