Files
greptimedb/src/common/recordbatch/src/lib.rs
LFC ea9af42091 chore: upgrade Rust to nightly 2022-12-20 (#772)
* chore: upgrade Rust to nightly 2022-12-20

* chore: upgrade Rust to nightly 2022-12-20

Co-authored-by: luofucong <luofucong@greptime.com>
2022-12-21 19:32:30 +08:00

275 lines
8.7 KiB
Rust

// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod adapter;
pub mod error;
mod recordbatch;
pub mod util;
use std::pin::Pin;
use std::sync::Arc;
use datafusion::physical_plan::memory::MemoryStream;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::util::pretty;
use datatypes::prelude::VectorRef;
use datatypes::schema::{Schema, SchemaRef};
use error::Result;
use futures::task::{Context, Poll};
use futures::{Stream, TryStreamExt};
pub use recordbatch::RecordBatch;
use snafu::{ensure, ResultExt};
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn schema(&self) -> SchemaRef;
}
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
/// EmptyRecordBatchStream can be used to create a RecordBatchStream
/// that will produce no results
pub struct EmptyRecordBatchStream {
/// Schema wrapped by Arc
schema: SchemaRef,
}
impl EmptyRecordBatchStream {
/// Create an empty RecordBatchStream
pub fn new(schema: SchemaRef) -> Self {
Self { schema }
}
}
impl RecordBatchStream for EmptyRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
impl Stream for EmptyRecordBatchStream {
type Item = Result<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}
#[derive(Debug, PartialEq)]
pub struct RecordBatches {
schema: SchemaRef,
batches: Vec<RecordBatch>,
}
impl RecordBatches {
pub fn try_from_columns<I: IntoIterator<Item = VectorRef>>(
schema: SchemaRef,
columns: I,
) -> Result<Self> {
let batches = vec![RecordBatch::new(schema.clone(), columns)?];
Ok(Self { schema, batches })
}
pub async fn try_collect(stream: SendableRecordBatchStream) -> Result<Self> {
let schema = stream.schema();
let batches = stream.try_collect::<Vec<_>>().await?;
Ok(Self { schema, batches })
}
#[inline]
pub fn empty() -> Self {
Self {
schema: Arc::new(Schema::new(vec![])),
batches: vec![],
}
}
pub fn iter(&self) -> impl Iterator<Item = &RecordBatch> {
self.batches.iter()
}
pub fn pretty_print(&self) -> Result<String> {
let df_batches = &self
.iter()
.map(|x| x.df_record_batch().clone())
.collect::<Vec<_>>();
let result = pretty::pretty_format_batches(df_batches).context(error::FormatSnafu)?;
Ok(result.to_string())
}
pub fn try_new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Result<Self> {
for batch in &batches {
ensure!(
batch.schema == schema,
error::CreateRecordBatchesSnafu {
reason: format!(
"expect RecordBatch schema equals {:?}, actual: {:?}",
schema, batch.schema
)
}
)
}
Ok(Self { schema, batches })
}
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
pub fn take(self) -> Vec<RecordBatch> {
self.batches
}
pub fn as_stream(&self) -> SendableRecordBatchStream {
Box::pin(SimpleRecordBatchStream {
inner: RecordBatches {
schema: self.schema(),
batches: self.batches.clone(),
},
index: 0,
})
}
pub fn into_df_stream(self) -> DfSendableRecordBatchStream {
let df_record_batches = self
.batches
.into_iter()
.map(|batch| batch.into_df_record_batch())
.collect();
// unwrap safety: `MemoryStream::try_new` won't fail
Box::pin(
MemoryStream::try_new(df_record_batches, self.schema.arrow_schema().clone(), None)
.unwrap(),
)
}
}
pub struct SimpleRecordBatchStream {
inner: RecordBatches,
index: usize,
}
impl RecordBatchStream for SimpleRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.inner.schema()
}
}
impl Stream for SimpleRecordBatchStream {
type Item = Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(if self.index < self.inner.batches.len() {
let batch = self.inner.batches[self.index].clone();
self.index += 1;
Some(Ok(batch))
} else {
None
})
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::{BooleanVector, Int32Vector, StringVector};
use super::*;
#[test]
fn test_recordbatches_try_from_columns() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"a",
ConcreteDataType::int32_datatype(),
false,
)]));
let result = RecordBatches::try_from_columns(
schema.clone(),
vec![Arc::new(StringVector::from(vec!["hello", "world"])) as _],
);
assert!(result.is_err());
let v: VectorRef = Arc::new(Int32Vector::from_slice(&[1, 2]));
let expected = vec![RecordBatch::new(schema.clone(), vec![v.clone()]).unwrap()];
let r = RecordBatches::try_from_columns(schema, vec![v]).unwrap();
assert_eq!(r.take(), expected);
}
#[test]
fn test_recordbatches_try_new() {
let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false);
let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false);
let column_c = ColumnSchema::new("c", ConcreteDataType::boolean_datatype(), false);
let va: VectorRef = Arc::new(Int32Vector::from_slice(&[1, 2]));
let vb: VectorRef = Arc::new(StringVector::from(vec!["hello", "world"]));
let vc: VectorRef = Arc::new(BooleanVector::from(vec![true, false]));
let schema1 = Arc::new(Schema::new(vec![column_a.clone(), column_b]));
let batch1 = RecordBatch::new(schema1.clone(), vec![va.clone(), vb]).unwrap();
let schema2 = Arc::new(Schema::new(vec![column_a, column_c]));
let batch2 = RecordBatch::new(schema2.clone(), vec![va, vc]).unwrap();
let result = RecordBatches::try_new(schema1.clone(), vec![batch1.clone(), batch2]);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
format!(
"Failed to create RecordBatches, reason: expect RecordBatch schema equals {schema1:?}, actual: {schema2:?}",
)
);
let batches = RecordBatches::try_new(schema1.clone(), vec![batch1.clone()]).unwrap();
let expected = "\
+---+-------+
| a | b |
+---+-------+
| 1 | hello |
| 2 | world |
+---+-------+";
assert_eq!(batches.pretty_print().unwrap(), expected);
assert_eq!(schema1, batches.schema());
assert_eq!(vec![batch1], batches.take());
}
#[tokio::test]
async fn test_simple_recordbatch_stream() {
let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false);
let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false);
let schema = Arc::new(Schema::new(vec![column_a, column_b]));
let va1: VectorRef = Arc::new(Int32Vector::from_slice(&[1, 2]));
let vb1: VectorRef = Arc::new(StringVector::from(vec!["a", "b"]));
let batch1 = RecordBatch::new(schema.clone(), vec![va1, vb1]).unwrap();
let va2: VectorRef = Arc::new(Int32Vector::from_slice(&[3, 4, 5]));
let vb2: VectorRef = Arc::new(StringVector::from(vec!["c", "d", "e"]));
let batch2 = RecordBatch::new(schema.clone(), vec![va2, vb2]).unwrap();
let recordbatches =
RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
let stream = recordbatches.as_stream();
let collected = util::collect(stream).await.unwrap();
assert_eq!(collected.len(), 2);
assert_eq!(collected[0], batch1);
assert_eq!(collected[1], batch2);
}
}