diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index afa3d1f860..5d5c1c9ef3 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -16,6 +16,7 @@ use datatypes::arrow::io::parquet::read::{ use datatypes::arrow::io::parquet::write::{ Compression, Encoding, FileSink, Version, WriteOptions, }; +use futures::AsyncWriteExt; use futures_util::sink::SinkExt; use futures_util::{Stream, TryStreamExt}; use object_store::{ObjectStore, SeekableReader}; @@ -64,13 +65,13 @@ impl<'a> ParquetWriter<'a> { // FIXME(hl): writer size is not used in fs backend so just leave it to 0, // but in s3/azblob backend the Content-Length field of HTTP request is set // to this value. - let writer = object.writer(0).await.context(error::FlushIoSnafu)?; + let mut writer = object.writer(0).await.context(error::FlushIoSnafu)?; // now all physical types use plain encoding, maybe let caller to choose encoding for each type. let encodings = get_encoding_for_schema(schema, |_| Encoding::Plain); let mut sink = FileSink::try_new( - writer, + &mut writer, // The file sink needs the `Schema` instead of a reference. (**schema).clone(), encodings, @@ -95,9 +96,11 @@ impl<'a> ParquetWriter<'a> { } } sink.close().await.context(error::WriteParquetSnafu)?; - // FIXME(yingwen): Hack to workaround an [arrow2 BUG](https://github.com/jorgecarleitao/parquet2/issues/162), - // upgrading to latest arrow2 can fixed this, but now datafusion is still using an old arrow2 version. - sink.flush().await.context(error::WriteParquetSnafu) + + drop(sink); + writer.flush().await.context(error::WriteObjectSnafu { + path: self.file_path, + }) } } diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index 69069cb83b..9fcab030f3 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -68,9 +68,10 @@ mod tests { use datatypes::arrow::io::parquet::write::{ Compression, Encoding, FileSink, Version, WriteOptions, }; + use futures::AsyncWriteExt; use futures::SinkExt; use tempdir::TempDir; - use tokio_util::compat::TokioAsyncReadCompatExt; + use tokio_util::compat::TokioAsyncWriteCompatExt; use super::*; @@ -89,16 +90,16 @@ mod tests { // now all physical types use plain encoding, maybe let caller to choose encoding for each type. let encodings = vec![Encoding::Plain].repeat(schema.fields.len()); - let writer = tokio::fs::OpenOptions::new() + let mut writer = tokio::fs::OpenOptions::new() .write(true) .create(true) .open(&path) .await .unwrap() - .compat(); + .compat_write(); let mut sink = FileSink::try_new( - writer, + &mut writer, schema.clone(), encodings, WriteOptions { @@ -129,6 +130,10 @@ mod tests { .unwrap(); } sink.close().await.unwrap(); + + drop(sink); + writer.flush().await.unwrap(); + (path, Arc::new(schema)) }