fix: cast orc data against output schema (#1922)

fix: cast data against output schema
This commit is contained in:
Weny Xu
2023-07-10 17:53:38 +09:00
committed by GitHub
parent 207d3d23a1
commit c8ed1bbfae
2 changed files with 27 additions and 4 deletions

View File

@@ -15,6 +15,7 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use arrow::compute::cast;
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch;
@@ -60,12 +61,16 @@ pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>
}
pub struct OrcArrowStreamReaderAdapter<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> {
output_schema: SchemaRef,
stream: ArrowStreamReader<T>,
}
impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> OrcArrowStreamReaderAdapter<T> {
pub fn new(stream: ArrowStreamReader<T>) -> Self {
Self { stream }
pub fn new(output_schema: SchemaRef, stream: ArrowStreamReader<T>) -> Self {
Self {
stream,
output_schema,
}
}
}
@@ -73,7 +78,7 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> RecordBatchStream
for OrcArrowStreamReaderAdapter<T>
{
fn schema(&self) -> SchemaRef {
self.stream.schema()
self.output_schema.clone()
}
}
@@ -83,6 +88,24 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream for OrcArrowStrea
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let batch = futures::ready!(Pin::new(&mut self.stream).poll_next(cx))
.map(|r| r.map_err(|e| DataFusionError::External(Box::new(e))));
let batch = batch.map(|b| {
b.and_then(|b| {
let mut columns = Vec::with_capacity(b.num_columns());
for (idx, column) in b.columns().iter().enumerate() {
if column.data_type() != self.output_schema.field(idx).data_type() {
let output = cast(&column, self.output_schema.field(idx).data_type())?;
columns.push(output)
} else {
columns.push(column.clone())
}
}
let record_batch = DfRecordBatch::try_new(self.output_schema.clone(), columns)?;
Ok(record_batch)
})
});
Poll::Ready(batch)
}
}

View File

@@ -224,7 +224,7 @@ impl StatementExecutor {
let stream = new_orc_stream_reader(reader)
.await
.context(error::ReadOrcSnafu)?;
let stream = OrcArrowStreamReaderAdapter::new(stream);
let stream = OrcArrowStreamReaderAdapter::new(schema, stream);
Ok(Box::pin(stream))
}