feat: Make recordbatch compile

This commit is contained in:
evenyag
2022-12-06 11:38:59 +08:00
parent cc1ec26416
commit fe505fecfd
7 changed files with 34 additions and 22 deletions

4
Cargo.lock generated
View File

@@ -258,6 +258,9 @@ name = "arrow-schema"
version = "26.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f9406eb7834ca6bd8350d1baa515d18b9fcec487eddacfb62f5e19511f7bd37"
dependencies = [
"serde",
]
[[package]]
name = "arrow-select"
@@ -2029,6 +2032,7 @@ name = "datatypes"
version = "0.1.0"
dependencies = [
"arrow",
"arrow-schema",
"common-base",
"common-error",
"common-time",

View File

@@ -19,7 +19,6 @@ use std::task::{Context, Poll};
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datafusion_common::DataFusionError;
use datatypes::arrow::error::{ArrowError, Result as ArrowResult};
use datatypes::schema::{Schema, SchemaRef};
@@ -28,7 +27,8 @@ use snafu::ResultExt;
use crate::error::{self, Result};
use crate::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStream, SendableRecordBatchStream, Stream,
DfRecordBatch, DfSendableRecordBatchStream, RecordBatch, RecordBatchStream,
SendableRecordBatchStream, Stream,
};
type FutureStream = Pin<
@@ -64,7 +64,7 @@ impl Stream for DfRecordBatchStreamAdapter {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(recordbatch)) => match recordbatch {
Ok(recordbatch) => Poll::Ready(Some(Ok(recordbatch.df_recordbatch))),
Err(e) => Poll::Ready(Some(Err(ArrowError::External("".to_owned(), Box::new(e))))),
Err(e) => Poll::Ready(Some(Err(ArrowError::External(Box::new(e))))),
},
Poll::Ready(None) => Poll::Ready(None),
}

View File

@@ -59,6 +59,12 @@ pub enum InnerError {
source: datatypes::arrow::error::ArrowError,
backtrace: Backtrace,
},
#[snafu(display("Fail to format record batch, source: {}", source))]
Format {
source: datatypes::arrow::error::ArrowError,
backtrace: Backtrace,
},
}
impl ErrorExt for InnerError {

View File

@@ -20,16 +20,17 @@ pub mod util;
use std::pin::Pin;
use std::sync::Arc;
use datafusion::arrow_print;
use datafusion::physical_plan::memory::MemoryStream;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::util::pretty;
use datatypes::prelude::VectorRef;
use datatypes::schema::{Schema, SchemaRef};
use error::Result;
use futures::task::{Context, Poll};
use futures::Stream;
pub use recordbatch::RecordBatch;
use snafu::ensure;
use snafu::{ensure, ResultExt};
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn schema(&self) -> SchemaRef;
@@ -92,17 +93,18 @@ impl RecordBatches {
self.batches.iter()
}
pub fn pretty_print(&self) -> String {
arrow_print::write(
&self
.iter()
.map(|x| x.df_recordbatch.clone())
.collect::<Vec<_>>(),
)
pub fn pretty_print(&self) -> Result<String> {
let df_batches = &self
.iter()
.map(|x| x.df_recordbatch.clone())
.collect::<Vec<_>>();
let result = pretty::pretty_format_batches(&df_batches).context(error::FormatSnafu)?;
Ok(result.to_string())
}
pub fn try_new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Result<Self> {
for batch in batches.iter() {
for batch in &batches {
ensure!(
batch.schema == schema,
error::CreateRecordBatchesSnafu {
@@ -236,7 +238,7 @@ mod tests {
| 1 | hello |
| 2 | world |
+---+-------+";
assert_eq!(batches.pretty_print(), expected);
assert_eq!(batches.pretty_print().unwrap(), expected);
assert_eq!(schema1, batches.schema());
assert_eq!(vec![batch1], batches.take());

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow_array::arrow_array_get;
use datatypes::schema::SchemaRef;
use datatypes::value::Value;
@@ -22,8 +21,10 @@ use serde::{Serialize, Serializer};
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::DfRecordBatch;
// TODO(yingwen): We should hold vectors in the RecordBatch.
/// A two-dimensional batch of column-oriented data with a defined schema.
#[derive(Clone, Debug, PartialEq)]
pub struct RecordBatch {
pub schema: SchemaRef,
@@ -125,15 +126,14 @@ impl<'a> Iterator for RecordBatchRowIterator<'a> {
mod tests {
use std::sync::Arc;
use datafusion_common::field_util::SchemaExt;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::array::UInt32Array;
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::prelude::*;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{StringVector, UInt32Vector, Vector};
use super::*;
use crate::DfRecordBatch;
#[test]
fn test_new_record_batch() {

View File

@@ -17,6 +17,7 @@ use futures::TryStreamExt;
use crate::error::Result;
use crate::{RecordBatch, SendableRecordBatchStream};
/// Collect all the items from the stream into a vector of [`RecordBatch`].
pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> {
stream.try_collect::<Vec<_>>().await
}
@@ -27,8 +28,6 @@ mod tests {
use std::pin::Pin;
use std::sync::Arc;
use datafusion_common::field_util::SchemaExt;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::array::UInt32Array;
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::schema::{Schema, SchemaRef};
@@ -36,7 +35,7 @@ mod tests {
use futures::Stream;
use super::*;
use crate::RecordBatchStream;
use crate::{DfRecordBatch, RecordBatchStream};
struct MockRecordBatchStream {
batch: Option<RecordBatch>,

View File

@@ -21,4 +21,5 @@ paste = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
arrow = "26.0"
arrow = { version = "26.0" }
arrow-schema = { version = "26.0", features = ["serde"] }