diff --git a/Cargo.lock b/Cargo.lock index 784aa0f52c..c8f6deddc3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -258,6 +258,9 @@ name = "arrow-schema" version = "26.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f9406eb7834ca6bd8350d1baa515d18b9fcec487eddacfb62f5e19511f7bd37" +dependencies = [ + "serde", +] [[package]] name = "arrow-select" @@ -2029,6 +2032,7 @@ name = "datatypes" version = "0.1.0" dependencies = [ "arrow", + "arrow-schema", "common-base", "common-error", "common-time", diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 2994d8f078..479ed9963a 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -19,7 +19,6 @@ use std::task::{Context, Poll}; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream; -use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datafusion_common::DataFusionError; use datatypes::arrow::error::{ArrowError, Result as ArrowResult}; use datatypes::schema::{Schema, SchemaRef}; @@ -28,7 +27,8 @@ use snafu::ResultExt; use crate::error::{self, Result}; use crate::{ - DfSendableRecordBatchStream, RecordBatch, RecordBatchStream, SendableRecordBatchStream, Stream, + DfRecordBatch, DfSendableRecordBatchStream, RecordBatch, RecordBatchStream, + SendableRecordBatchStream, Stream, }; type FutureStream = Pin< @@ -64,7 +64,7 @@ impl Stream for DfRecordBatchStreamAdapter { Poll::Pending => Poll::Pending, Poll::Ready(Some(recordbatch)) => match recordbatch { Ok(recordbatch) => Poll::Ready(Some(Ok(recordbatch.df_recordbatch))), - Err(e) => Poll::Ready(Some(Err(ArrowError::External("".to_owned(), Box::new(e))))), + Err(e) => Poll::Ready(Some(Err(ArrowError::External(Box::new(e))))), }, Poll::Ready(None) => Poll::Ready(None), } diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 2425defad8..3ef82e5d8d 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -59,6 +59,12 @@ pub enum InnerError { source: datatypes::arrow::error::ArrowError, backtrace: Backtrace, }, + + #[snafu(display("Fail to format record batch, source: {}", source))] + Format { + source: datatypes::arrow::error::ArrowError, + backtrace: Backtrace, + }, } impl ErrorExt for InnerError { diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index ce2c2f1e5a..5108435033 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -20,16 +20,17 @@ pub mod util; use std::pin::Pin; use std::sync::Arc; -use datafusion::arrow_print; use datafusion::physical_plan::memory::MemoryStream; pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; +pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch; +use datatypes::arrow::util::pretty; use datatypes::prelude::VectorRef; use datatypes::schema::{Schema, SchemaRef}; use error::Result; use futures::task::{Context, Poll}; use futures::Stream; pub use recordbatch::RecordBatch; -use snafu::ensure; +use snafu::{ensure, ResultExt}; pub trait RecordBatchStream: Stream> { fn schema(&self) -> SchemaRef; @@ -92,17 +93,18 @@ impl RecordBatches { self.batches.iter() } - pub fn pretty_print(&self) -> String { - arrow_print::write( - &self - .iter() - .map(|x| x.df_recordbatch.clone()) - .collect::>(), - ) + pub fn pretty_print(&self) -> Result { + let df_batches = &self + .iter() + .map(|x| x.df_recordbatch.clone()) + .collect::>(); + let result = pretty::pretty_format_batches(&df_batches).context(error::FormatSnafu)?; + + Ok(result.to_string()) } pub fn try_new(schema: SchemaRef, batches: Vec) -> Result { - for batch in batches.iter() { + for batch in &batches { ensure!( batch.schema == schema, error::CreateRecordBatchesSnafu { @@ -236,7 +238,7 @@ mod tests { | 1 | hello | | 2 | world | +---+-------+"; - assert_eq!(batches.pretty_print(), expected); + assert_eq!(batches.pretty_print().unwrap(), expected); assert_eq!(schema1, batches.schema()); assert_eq!(vec![batch1], batches.take()); diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 5fc886f8b9..681335045c 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datatypes::arrow_array::arrow_array_get; use datatypes::schema::SchemaRef; use datatypes::value::Value; @@ -22,8 +21,10 @@ use serde::{Serialize, Serializer}; use snafu::ResultExt; use crate::error::{self, Result}; +use crate::DfRecordBatch; // TODO(yingwen): We should hold vectors in the RecordBatch. +/// A two-dimensional batch of column-oriented data with a defined schema. #[derive(Clone, Debug, PartialEq)] pub struct RecordBatch { pub schema: SchemaRef, @@ -125,15 +126,14 @@ impl<'a> Iterator for RecordBatchRowIterator<'a> { mod tests { use std::sync::Arc; - use datafusion_common::field_util::SchemaExt; - use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datatypes::arrow::array::UInt32Array; use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; - use datatypes::prelude::*; + use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::{StringVector, UInt32Vector, Vector}; use super::*; + use crate::DfRecordBatch; #[test] fn test_new_record_batch() { diff --git a/src/common/recordbatch/src/util.rs b/src/common/recordbatch/src/util.rs index efe34dbfed..461a64d8ae 100644 --- a/src/common/recordbatch/src/util.rs +++ b/src/common/recordbatch/src/util.rs @@ -17,6 +17,7 @@ use futures::TryStreamExt; use crate::error::Result; use crate::{RecordBatch, SendableRecordBatchStream}; +/// Collect all the items from the stream into a vector of [`RecordBatch`]. pub async fn collect(stream: SendableRecordBatchStream) -> Result> { stream.try_collect::>().await } @@ -27,8 +28,6 @@ mod tests { use std::pin::Pin; use std::sync::Arc; - use datafusion_common::field_util::SchemaExt; - use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datatypes::arrow::array::UInt32Array; use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datatypes::schema::{Schema, SchemaRef}; @@ -36,7 +35,7 @@ mod tests { use futures::Stream; use super::*; - use crate::RecordBatchStream; + use crate::{DfRecordBatch, RecordBatchStream}; struct MockRecordBatchStream { batch: Option, diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index 0ca8bf378c..5adf32a825 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -21,4 +21,5 @@ paste = "1.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } -arrow = "26.0" +arrow = { version = "26.0" } +arrow-schema = { version = "26.0", features = ["serde"] }