fix: parquet native row group pruning support

This commit is contained in:
Lei, HUANG
2023-01-07 18:01:35 +08:00
parent 7e3c59fb51
commit a640872cda

View File

@@ -157,30 +157,34 @@ impl<'a> ParquetReader<'a> {
let adapter = ReadAdapter::new(store_schema.clone(), self.projected_schema.clone())?;
let pruned_row_groups = self.predicate.prune_row_groups(
store_schema.schema().clone(),
builder.metadata().row_groups(),
);
let pruned_row_groups = self
.predicate
.prune_row_groups(
store_schema.schema().clone(),
builder.metadata().row_groups(),
)
.into_iter()
.enumerate()
.flat_map(|(idx, valid)| if valid { Some(idx) } else { None })
.collect::<Vec<_>>();
let projection = ProjectionMask::roots(
builder.metadata().file_metadata().schema_descr(),
adapter.fields_to_read(),
);
let mut masked_stream = builder
let mut stream = builder
.with_projection(projection)
.with_row_groups(pruned_row_groups)
.build()
.context(ReadParquetSnafu {
file: self.file_path,
})?
.zip(futures_util::stream::iter(pruned_row_groups.into_iter()));
})?;
let file_name = self.file_path.to_string();
let chunk_stream = try_stream!({
while let Some((record_batch, valid)) = masked_stream.next().await {
if valid {
yield record_batch.context(ReadParquetSnafu { file: &file_name })?
}
while let Some(res) = stream.next().await {
yield res.context(ReadParquetSnafu { file: &file_name })?
}
});
@@ -330,6 +334,65 @@ mod tests {
);
}
#[tokio::test]
async fn test_parquet_read_large_batch() {
common_telemetry::init_default_ut_logging();
let schema = memtable_tests::schema_for_test();
let memtable = DefaultMemtableBuilder::default().build(schema.clone());
let rows_total = 4096 * 4;
let mut keys_vec = Vec::with_capacity(rows_total);
let mut values_vec = Vec::with_capacity(rows_total);
for i in 0..rows_total {
keys_vec.push((i as i64, i as u64));
values_vec.push((Some(i as u64), Some(i as u64)));
}
memtable_tests::write_kvs(
&*memtable,
10, // sequence
OpType::Put,
&keys_vec, // keys
&values_vec, // values
);
let dir = TempDir::new("write_parquet").unwrap();
let path = dir.path().to_str().unwrap();
let backend = Builder::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend);
let sst_file_name = "test-read-large.parquet";
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone());
writer
.write_sst(&sst::WriteOptions::default())
.await
.unwrap();
let operator = ObjectStore::new(
object_store::backend::fs::Builder::default()
.root(dir.path().to_str().unwrap())
.build()
.unwrap(),
);
let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap());
let reader = ParquetReader::new(
"test-read-large.parquet",
operator,
projected_schema,
Predicate::empty(),
);
let mut rows_fetched = 0;
let mut stream = reader.chunk_stream().await.unwrap();
while let Some(res) = stream.next_batch().await.unwrap() {
rows_fetched += res.num_rows();
}
assert_eq!(rows_total, rows_fetched);
}
#[tokio::test]
async fn test_parquet_reader() {
common_telemetry::init_default_ut_logging();