From a640872cdaa165cf8738c1d014de6379eac5795c Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sat, 7 Jan 2023 18:01:35 +0800 Subject: [PATCH] fix: parquet native row group pruning support --- src/storage/src/sst/parquet.rs | 85 +++++++++++++++++++++++++++++----- 1 file changed, 74 insertions(+), 11 deletions(-) diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 5bde4ac4e4..970b31621e 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -157,30 +157,34 @@ impl<'a> ParquetReader<'a> { let adapter = ReadAdapter::new(store_schema.clone(), self.projected_schema.clone())?; - let pruned_row_groups = self.predicate.prune_row_groups( - store_schema.schema().clone(), - builder.metadata().row_groups(), - ); + let pruned_row_groups = self + .predicate + .prune_row_groups( + store_schema.schema().clone(), + builder.metadata().row_groups(), + ) + .into_iter() + .enumerate() + .flat_map(|(idx, valid)| if valid { Some(idx) } else { None }) + .collect::>(); let projection = ProjectionMask::roots( builder.metadata().file_metadata().schema_descr(), adapter.fields_to_read(), ); - let mut masked_stream = builder + let mut stream = builder .with_projection(projection) + .with_row_groups(pruned_row_groups) .build() .context(ReadParquetSnafu { file: self.file_path, - })? - .zip(futures_util::stream::iter(pruned_row_groups.into_iter())); + })?; let file_name = self.file_path.to_string(); let chunk_stream = try_stream!({ - while let Some((record_batch, valid)) = masked_stream.next().await { - if valid { - yield record_batch.context(ReadParquetSnafu { file: &file_name })? - } + while let Some(res) = stream.next().await { + yield res.context(ReadParquetSnafu { file: &file_name })? } }); @@ -330,6 +334,65 @@ mod tests { ); } + #[tokio::test] + async fn test_parquet_read_large_batch() { + common_telemetry::init_default_ut_logging(); + let schema = memtable_tests::schema_for_test(); + let memtable = DefaultMemtableBuilder::default().build(schema.clone()); + + let rows_total = 4096 * 4; + let mut keys_vec = Vec::with_capacity(rows_total); + let mut values_vec = Vec::with_capacity(rows_total); + + for i in 0..rows_total { + keys_vec.push((i as i64, i as u64)); + values_vec.push((Some(i as u64), Some(i as u64))); + } + + memtable_tests::write_kvs( + &*memtable, + 10, // sequence + OpType::Put, + &keys_vec, // keys + &values_vec, // values + ); + + let dir = TempDir::new("write_parquet").unwrap(); + let path = dir.path().to_str().unwrap(); + let backend = Builder::default().root(path).build().unwrap(); + let object_store = ObjectStore::new(backend); + let sst_file_name = "test-read-large.parquet"; + let iter = memtable.iter(&IterContext::default()).unwrap(); + let writer = ParquetWriter::new(sst_file_name, iter, object_store.clone()); + + writer + .write_sst(&sst::WriteOptions::default()) + .await + .unwrap(); + + let operator = ObjectStore::new( + object_store::backend::fs::Builder::default() + .root(dir.path().to_str().unwrap()) + .build() + .unwrap(), + ); + + let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap()); + let reader = ParquetReader::new( + "test-read-large.parquet", + operator, + projected_schema, + Predicate::empty(), + ); + + let mut rows_fetched = 0; + let mut stream = reader.chunk_stream().await.unwrap(); + while let Some(res) = stream.next_batch().await.unwrap() { + rows_fetched += res.num_rows(); + } + assert_eq!(rows_total, rows_fetched); + } + #[tokio::test] async fn test_parquet_reader() { common_telemetry::init_default_ut_logging();