fix: flaky parquet predicate suits (#307)

* fix: flaky parquet predicate suits

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: change ParquetWriter::write_rows as well

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2022-10-13 14:00:42 +08:00
committed by GitHub
parent a8a6426abf
commit b61d5989b7
2 changed files with 17 additions and 9 deletions

View File

@@ -16,6 +16,7 @@ use datatypes::arrow::io::parquet::read::{
use datatypes::arrow::io::parquet::write::{
Compression, Encoding, FileSink, Version, WriteOptions,
};
use futures::AsyncWriteExt;
use futures_util::sink::SinkExt;
use futures_util::{Stream, TryStreamExt};
use object_store::{ObjectStore, SeekableReader};
@@ -64,13 +65,13 @@ impl<'a> ParquetWriter<'a> {
// FIXME(hl): writer size is not used in fs backend so just leave it to 0,
// but in s3/azblob backend the Content-Length field of HTTP request is set
// to this value.
let writer = object.writer(0).await.context(error::FlushIoSnafu)?;
let mut writer = object.writer(0).await.context(error::FlushIoSnafu)?;
// now all physical types use plain encoding, maybe let caller to choose encoding for each type.
let encodings = get_encoding_for_schema(schema, |_| Encoding::Plain);
let mut sink = FileSink::try_new(
writer,
&mut writer,
// The file sink needs the `Schema` instead of a reference.
(**schema).clone(),
encodings,
@@ -95,9 +96,11 @@ impl<'a> ParquetWriter<'a> {
}
}
sink.close().await.context(error::WriteParquetSnafu)?;
// FIXME(yingwen): Hack to workaround an [arrow2 BUG](https://github.com/jorgecarleitao/parquet2/issues/162),
// upgrading to latest arrow2 can fixed this, but now datafusion is still using an old arrow2 version.
sink.flush().await.context(error::WriteParquetSnafu)
drop(sink);
writer.flush().await.context(error::WriteObjectSnafu {
path: self.file_path,
})
}
}

View File

@@ -68,9 +68,10 @@ mod tests {
use datatypes::arrow::io::parquet::write::{
Compression, Encoding, FileSink, Version, WriteOptions,
};
use futures::AsyncWriteExt;
use futures::SinkExt;
use tempdir::TempDir;
use tokio_util::compat::TokioAsyncReadCompatExt;
use tokio_util::compat::TokioAsyncWriteCompatExt;
use super::*;
@@ -89,16 +90,16 @@ mod tests {
// now all physical types use plain encoding, maybe let caller to choose encoding for each type.
let encodings = vec![Encoding::Plain].repeat(schema.fields.len());
let writer = tokio::fs::OpenOptions::new()
let mut writer = tokio::fs::OpenOptions::new()
.write(true)
.create(true)
.open(&path)
.await
.unwrap()
.compat();
.compat_write();
let mut sink = FileSink::try_new(
writer,
&mut writer,
schema.clone(),
encodings,
WriteOptions {
@@ -129,6 +130,10 @@ mod tests {
.unwrap();
}
sink.close().await.unwrap();
drop(sink);
writer.flush().await.unwrap();
(path, Arc::new(schema))
}