From 39e2f122eb5aeed3db8ea0a0e28a3f76115700f0 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 6 Aug 2025 14:50:01 +0800 Subject: [PATCH] feat: EncodedBulkPartIter iters flat format and returns RecordBatch (#6655) * feat: implements iter to read bulk part Signed-off-by: evenyag * feat: BulkPartEncoder encodes BulkPart instead of mutation Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/benches/memtable_bench.rs | 2 - src/mito2/src/memtable.rs | 3 + src/mito2/src/memtable/bulk/context.rs | 3 +- src/mito2/src/memtable/bulk/part.rs | 91 +++--- src/mito2/src/memtable/bulk/part_reader.rs | 290 ++++++++---------- .../src/memtable/bulk/row_group_reader.rs | 15 +- 6 files changed, 172 insertions(+), 232 deletions(-) diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index bcc2352222..db6f50418e 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -458,7 +458,6 @@ fn bulk_part_record_batch_iter_filter(c: &mut Criterion) { metadata.clone(), &None, // No projection Some(predicate.clone()), // With hostname filter - true, )); // Create and iterate over BulkPartRecordBatchIter with filter @@ -480,7 +479,6 @@ fn bulk_part_record_batch_iter_filter(c: &mut Criterion) { metadata.clone(), &None, // No projection None, // No predicate - true, )); // Create and iterate over BulkPartRecordBatchIter diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index b5d4000d9e..5269d0eabd 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -22,6 +22,7 @@ use std::time::Duration; pub use bulk::part::EncodedBulkPart; use common_time::Timestamp; +use datatypes::arrow::record_batch::RecordBatch; use mito_codec::key_values::KeyValue; pub use mito_codec::key_values::KeyValues; use serde::{Deserialize, Serialize}; @@ -137,6 +138,8 @@ impl MemtableStats { pub type BoxedBatchIterator = Box> + Send>; +pub type BoxedRecordBatchIterator = Box> + Send>; + /// Ranges in a memtable. #[derive(Default)] pub struct MemtableRanges { diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index 3d85694fae..a88bd31d1b 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -41,7 +41,6 @@ impl BulkIterContext { region_metadata: RegionMetadataRef, projection: &Option<&[ColumnId]>, predicate: Option, - flat_format: bool, ) -> Self { let codec = build_primary_key_codec(®ion_metadata); @@ -56,7 +55,7 @@ impl BulkIterContext { }) .collect(); - let read_format = build_read_format(region_metadata, projection, flat_format); + let read_format = build_read_format(region_metadata, projection, true); Self { base: RangeBase { diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 7e025aef50..5be155eedb 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -60,7 +60,7 @@ use crate::error::{ use crate::memtable::bulk::context::BulkIterContextRef; use crate::memtable::bulk::part_reader::EncodedBulkPartIter; use crate::memtable::time_series::{ValueBuilder, Values}; -use crate::memtable::BoxedBatchIterator; +use crate::memtable::BoxedRecordBatchIterator; use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat}; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::to_sst_arrow_schema; @@ -511,7 +511,7 @@ impl EncodedBulkPart { &self, context: BulkIterContextRef, sequence: Option, - ) -> Result> { + ) -> Result> { // use predicate to find row groups to read. let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata); @@ -527,7 +527,7 @@ impl EncodedBulkPart { self.data.clone(), sequence, )?; - Ok(Some(Box::new(iter) as BoxedBatchIterator)) + Ok(Some(Box::new(iter) as BoxedRecordBatchIterator)) } } @@ -547,19 +547,12 @@ pub struct BulkPartMeta { pub struct BulkPartEncoder { metadata: RegionMetadataRef, - pk_encoder: DensePrimaryKeyCodec, row_group_size: usize, - dedup: bool, writer_props: Option, } impl BulkPartEncoder { - pub(crate) fn new( - metadata: RegionMetadataRef, - dedup: bool, - row_group_size: usize, - ) -> BulkPartEncoder { - let codec = DensePrimaryKeyCodec::new(&metadata); + pub(crate) fn new(metadata: RegionMetadataRef, row_group_size: usize) -> BulkPartEncoder { let writer_props = Some( WriterProperties::builder() .set_write_batch_size(row_group_size) @@ -568,33 +561,27 @@ impl BulkPartEncoder { ); Self { metadata, - pk_encoder: codec, row_group_size, - dedup, writer_props, } } } impl BulkPartEncoder { - /// Encodes mutations to a [EncodedBulkPart], returns true if encoded data has been written to `dest`. - fn encode_mutations(&self, mutations: &[Mutation]) -> Result> { - let Some((arrow_record_batch, min_ts, max_ts)) = - mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)? - else { + /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data. + fn encode_part(&self, part: &BulkPart) -> Result> { + if part.batch.num_rows() == 0 { return Ok(None); - }; + } let mut buf = Vec::with_capacity(4096); - let arrow_schema = arrow_record_batch.schema(); + let arrow_schema = part.batch.schema(); let file_metadata = { let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone()) .context(EncodeMemtableSnafu)?; - writer - .write(&arrow_record_batch) - .context(EncodeMemtableSnafu)?; + writer.write(&part.batch).context(EncodeMemtableSnafu)?; writer.finish().context(EncodeMemtableSnafu)? }; @@ -604,9 +591,9 @@ impl BulkPartEncoder { Ok(Some(EncodedBulkPart { data: buf, metadata: BulkPartMeta { - num_rows: arrow_record_batch.num_rows(), - max_timestamp: max_ts, - min_timestamp: min_ts, + num_rows: part.batch.num_rows(), + max_timestamp: part.max_ts, + min_timestamp: part.min_ts, parquet_metadata, region_metadata: self.metadata.clone(), }, @@ -881,6 +868,7 @@ mod tests { use api::v1::{Row, WriteHint}; use datafusion_common::ScalarValue; + use datatypes::arrow::array::Float64Array; use datatypes::prelude::{ConcreteDataType, ScalarVector, Value}; use datatypes::vectors::{Float64Vector, TimestampMillisecondVector}; use store_api::storage::consts::ReservedColumnId; @@ -890,8 +878,7 @@ mod tests { use crate::sst::parquet::format::{PrimaryKeyReadFormat, ReadFormat}; use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions}; use crate::test_util::memtable_util::{ - build_key_values_with_ts_seq_values, metadata_for_test, metadata_with_primary_key, - region_metadata_to_row_schema, + build_key_values_with_ts_seq_values, metadata_for_test, region_metadata_to_row_schema, }; fn check_binary_array_to_dictionary( @@ -1191,7 +1178,7 @@ mod tests { fn encode(input: &[MutationInput]) -> EncodedBulkPart { let metadata = metadata_for_test(); - let mutations = input + let kvs = input .iter() .map(|m| { build_key_values_with_ts_seq_values( @@ -1202,11 +1189,17 @@ mod tests { m.v1.iter().copied(), m.sequence, ) - .mutation }) .collect::>(); - let encoder = BulkPartEncoder::new(metadata, true, 1024); - encoder.encode_mutations(&mutations).unwrap().unwrap() + let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + let primary_key_codec = build_primary_key_codec(&metadata); + let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true); + for kv in kvs { + converter.append_key_values(&kv).unwrap(); + } + let part = converter.convert().unwrap(); + let encoder = BulkPartEncoder::new(metadata, 1024); + encoder.encode_part(&part).unwrap().unwrap() } #[test] @@ -1236,14 +1229,12 @@ mod tests { ]); let projection = &[4u32]; - let mut reader = part .read( Arc::new(BulkIterContext::new( part.metadata.region_metadata.clone(), &Some(projection.as_slice()), None, - false, )), None, ) @@ -1251,19 +1242,17 @@ mod tests { .expect("expect at least one row group"); let mut total_rows_read = 0; - let mut field = vec![]; + let mut field: Vec = vec![]; for res in reader { let batch = res.unwrap(); - assert_eq!(1, batch.fields().len()); - assert_eq!(4, batch.fields()[0].column_id); - field.extend( - batch.fields()[0] - .data + assert_eq!(5, batch.num_columns()); + field.extend_from_slice( + batch + .column(0) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap() - .iter_data() - .map(|v| v.unwrap()), + .values(), ); total_rows_read += batch.num_rows(); } @@ -1273,17 +1262,23 @@ mod tests { fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart { let metadata = metadata_for_test(); - let mutations = key_values + let kvs = key_values .into_iter() .map(|(k0, k1, (start, end), sequence)| { let ts = (start..end); let v1 = (start..end).map(|_| None); build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence) - .mutation }) .collect::>(); - let encoder = BulkPartEncoder::new(metadata, true, 100); - encoder.encode_mutations(&mutations).unwrap().unwrap() + let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + let primary_key_codec = build_primary_key_codec(&metadata); + let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true); + for kv in kvs { + converter.append_key_values(&kv).unwrap(); + } + let part = converter.convert().unwrap(); + let encoder = BulkPartEncoder::new(metadata, 1024); + encoder.encode_part(&part).unwrap().unwrap() } fn check_prune_row_group( @@ -1295,7 +1290,6 @@ mod tests { part.metadata.region_metadata.clone(), &None, predicate, - false, )); let mut reader = part .read(context, None) @@ -1326,7 +1320,6 @@ mod tests { Some(Predicate::new(vec![datafusion_expr::col("ts").eq( datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)), )])), - false, )); assert!(part.read(context, None).unwrap().is_none()); diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index f750054027..e51f8caf01 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -20,26 +20,26 @@ use bytes::Bytes; use datatypes::arrow::array::{BooleanArray, Scalar, UInt64Array}; use datatypes::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; +use parquet::arrow::arrow_reader::ParquetRecordBatchReader; use parquet::arrow::ProjectionMask; use parquet::file::metadata::ParquetMetaData; use snafu::ResultExt; use store_api::storage::SequenceNumber; -use crate::error::{self, ComputeArrowSnafu}; -use crate::memtable::bulk::context::BulkIterContextRef; -use crate::memtable::bulk::row_group_reader::{ - MemtableRowGroupReader, MemtableRowGroupReaderBuilder, -}; -use crate::read::Batch; +use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu}; +use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef}; +use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder; use crate::sst::parquet::flat_format::sequence_column_index; -use crate::sst::parquet::reader::MaybeFilter; +use crate::sst::parquet::reader::{MaybeFilter, RowGroupReaderContext}; /// Iterator for reading data inside a bulk part. pub struct EncodedBulkPartIter { + context: BulkIterContextRef, row_groups_to_read: VecDeque, - current_reader: Option, + current_reader: Option, builder: MemtableRowGroupReaderBuilder, - sequence: Option, + /// Sequence number filter. + sequence: Option>, } impl EncodedBulkPartIter { @@ -51,24 +51,23 @@ impl EncodedBulkPartIter { data: Bytes, sequence: Option, ) -> error::Result { + assert!(context.read_format().as_flat().is_some()); + + let sequence = sequence.map(UInt64Array::new_scalar); + let projection_mask = ProjectionMask::roots( parquet_meta.file_metadata().schema_descr(), context.read_format().projection_indices().iter().copied(), ); - - let builder = MemtableRowGroupReaderBuilder::try_new( - context.clone(), - projection_mask, - parquet_meta, - data, - )?; + let builder = + MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?; let init_reader = row_groups_to_read .pop_front() .map(|first_row_group| builder.build_row_group_reader(first_row_group, None)) - .transpose()? - .map(|r| PruneReader::new(context, r)); + .transpose()?; Ok(Self { + context, row_groups_to_read, current_reader: init_reader, builder, @@ -76,88 +75,42 @@ impl EncodedBulkPartIter { }) } - pub(crate) fn next_batch(&mut self) -> error::Result> { + /// Fetches next non-empty record batch. + pub(crate) fn next_record_batch(&mut self) -> error::Result> { let Some(current) = &mut self.current_reader else { // All row group exhausted. return Ok(None); }; - if let Some(mut batch) = current.next_batch()? { - batch.filter_by_sequence(self.sequence)?; - return Ok(Some(batch)); + for batch in current { + let batch = batch.context(DecodeArrowRowGroupSnafu)?; + if let Some(batch) = apply_combined_filters(&self.context, &self.sequence, batch)? { + return Ok(Some(batch)); + } } // Previous row group exhausted, read next row group while let Some(next_row_group) = self.row_groups_to_read.pop_front() { - current.reset(self.builder.build_row_group_reader(next_row_group, None)?); - if let Some(mut next_batch) = current.next_batch()? { - next_batch.filter_by_sequence(self.sequence)?; - return Ok(Some(next_batch)); + let next_reader = self.builder.build_row_group_reader(next_row_group, None)?; + let current = self.current_reader.insert(next_reader); + + for batch in current { + let batch = batch.context(DecodeArrowRowGroupSnafu)?; + if let Some(batch) = apply_combined_filters(&self.context, &self.sequence, batch)? { + return Ok(Some(batch)); + } } } + Ok(None) } } impl Iterator for EncodedBulkPartIter { - type Item = error::Result; + type Item = error::Result; fn next(&mut self) -> Option { - self.next_batch().transpose() - } -} - -struct PruneReader { - context: BulkIterContextRef, - row_group_reader: MemtableRowGroupReader, -} - -//todo(hl): maybe we also need to support lastrow mode here. -impl PruneReader { - fn new(context: BulkIterContextRef, reader: MemtableRowGroupReader) -> Self { - Self { - context, - row_group_reader: reader, - } - } - - /// Iterates current inner reader until exhausted. - fn next_batch(&mut self) -> error::Result> { - while let Some(b) = self.row_group_reader.next_inner()? { - match self.prune(b)? { - Some(b) => { - return Ok(Some(b)); - } - None => { - continue; - } - } - } - Ok(None) - } - - /// Prunes batch according to filters. - fn prune(&mut self, batch: Batch) -> error::Result> { - //todo(hl): add metrics. - - // fast path - if self.context.base.filters.is_empty() { - return Ok(Some(batch)); - } - - let Some(batch_filtered) = self.context.base.precise_filter(batch)? else { - // the entire batch is filtered out - return Ok(None); - }; - if !batch_filtered.is_empty() { - Ok(Some(batch_filtered)) - } else { - Ok(None) - } - } - - fn reset(&mut self, reader: MemtableRowGroupReader) { - self.row_group_reader = reader; + self.next_record_batch().transpose() } } @@ -201,95 +154,13 @@ impl BulkPartRecordBatchIter { .context(ComputeArrowSnafu) } - // TODO(yingwen): Supports sparse encoding which doesn't have decoded primary key columns. - /// Applies both predicate filtering and sequence filtering in a single pass. - /// Returns None if the filtered batch is empty. - fn apply_combined_filters( - &self, - record_batch: RecordBatch, - ) -> error::Result> { - let num_rows = record_batch.num_rows(); - let mut combined_filter = None; - - // First, apply predicate filters. - if !self.context.base.filters.is_empty() { - let num_rows = record_batch.num_rows(); - let mut mask = BooleanBuffer::new_set(num_rows); - - // Run filter one by one and combine them result, similar to RangeBase::precise_filter - for filter_ctx in &self.context.base.filters { - let filter = match filter_ctx.filter() { - MaybeFilter::Filter(f) => f, - // Column matches. - MaybeFilter::Matched => continue, - // Column doesn't match, filter the entire batch. - MaybeFilter::Pruned => return Ok(None), - }; - - // Safety: We checked the format type in new(). - let Some(column_index) = self - .context - .read_format() - .as_flat() - .unwrap() - .projected_index_by_id(filter_ctx.column_id()) - else { - continue; - }; - let array = record_batch.column(column_index); - let result = filter - .evaluate_array(array) - .context(crate::error::RecordBatchSnafu)?; - - mask = mask.bitand(&result); - } - // Convert the mask to BooleanArray - combined_filter = Some(BooleanArray::from(mask)); - } - - // Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`. - if let Some(sequence) = &self.sequence { - let sequence_column = - record_batch.column(sequence_column_index(record_batch.num_columns())); - let sequence_filter = - datatypes::arrow::compute::kernels::cmp::lt_eq(sequence_column, sequence) - .context(ComputeArrowSnafu)?; - // Combine with existing filter using AND operation - combined_filter = match combined_filter { - None => Some(sequence_filter), - Some(existing_filter) => { - let and_result = - datatypes::arrow::compute::and(&existing_filter, &sequence_filter) - .context(ComputeArrowSnafu)?; - Some(and_result) - } - }; - } - - // Apply the combined filter if any filters were applied - let Some(filter_array) = combined_filter else { - // No filters applied, return original batch - return Ok(Some(record_batch)); - }; - let select_count = filter_array.true_count(); - if select_count == 0 { - return Ok(None); - } - if select_count == num_rows { - return Ok(Some(record_batch)); - } - let filtered_batch = - datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array) - .context(ComputeArrowSnafu)?; - - Ok(Some(filtered_batch)) - } - fn process_batch(&mut self, record_batch: RecordBatch) -> error::Result> { // Apply projection first. let projected_batch = self.apply_projection(record_batch)?; // Apply combined filtering (both predicate and sequence filters) - let Some(filtered_batch) = self.apply_combined_filters(projected_batch)? else { + let Some(filtered_batch) = + apply_combined_filters(&self.context, &self.sequence, projected_batch)? + else { return Ok(None); }; @@ -307,6 +178,89 @@ impl Iterator for BulkPartRecordBatchIter { } } +// TODO(yingwen): Supports sparse encoding which doesn't have decoded primary key columns. +/// Applies both predicate filtering and sequence filtering in a single pass. +/// Returns None if the filtered batch is empty. +fn apply_combined_filters( + context: &BulkIterContext, + sequence: &Option>, + record_batch: RecordBatch, +) -> error::Result> { + let num_rows = record_batch.num_rows(); + let mut combined_filter = None; + + // First, apply predicate filters. + if !context.base.filters.is_empty() { + let num_rows = record_batch.num_rows(); + let mut mask = BooleanBuffer::new_set(num_rows); + + // Run filter one by one and combine them result, similar to RangeBase::precise_filter + for filter_ctx in &context.base.filters { + let filter = match filter_ctx.filter() { + MaybeFilter::Filter(f) => f, + // Column matches. + MaybeFilter::Matched => continue, + // Column doesn't match, filter the entire batch. + MaybeFilter::Pruned => return Ok(None), + }; + + // Safety: We checked the format type in new(). + let Some(column_index) = context + .read_format() + .as_flat() + .unwrap() + .projected_index_by_id(filter_ctx.column_id()) + else { + continue; + }; + let array = record_batch.column(column_index); + let result = filter + .evaluate_array(array) + .context(crate::error::RecordBatchSnafu)?; + + mask = mask.bitand(&result); + } + // Convert the mask to BooleanArray + combined_filter = Some(BooleanArray::from(mask)); + } + + // Filters rows by the given `sequence`. Only preserves rows with sequence less than or equal to `sequence`. + if let Some(sequence) = sequence { + let sequence_column = + record_batch.column(sequence_column_index(record_batch.num_columns())); + let sequence_filter = + datatypes::arrow::compute::kernels::cmp::lt_eq(sequence_column, sequence) + .context(ComputeArrowSnafu)?; + // Combine with existing filter using AND operation + combined_filter = match combined_filter { + None => Some(sequence_filter), + Some(existing_filter) => { + let and_result = datatypes::arrow::compute::and(&existing_filter, &sequence_filter) + .context(ComputeArrowSnafu)?; + Some(and_result) + } + }; + } + + // Apply the combined filter if any filters were applied + let Some(filter_array) = combined_filter else { + // No filters applied, return original batch + return Ok(Some(record_batch)); + }; + let select_count = filter_array.true_count(); + if select_count == 0 { + return Ok(None); + } + if select_count == num_rows { + return Ok(Some(record_batch)); + } + let filtered_batch = + datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array) + .context(ComputeArrowSnafu)?; + + Ok(Some(filtered_batch)) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -412,7 +366,6 @@ mod tests { Arc::new(region_metadata.clone()), &None, // No projection None, // No predicate - true, )); // Iterates all rows. let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None); @@ -436,7 +389,6 @@ mod tests { Arc::new(region_metadata), &Some(&[0, 2]), Some(Predicate::new(vec![col("key1").eq(lit("key2"))])), - true, )); // Creates iter with projection and predicate. let iter = BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None); diff --git a/src/mito2/src/memtable/bulk/row_group_reader.rs b/src/mito2/src/memtable/bulk/row_group_reader.rs index 3c1fd3e1a1..660ed15ec4 100644 --- a/src/mito2/src/memtable/bulk/row_group_reader.rs +++ b/src/mito2/src/memtable/bulk/row_group_reader.rs @@ -28,7 +28,7 @@ use crate::error; use crate::error::ReadDataPartSnafu; use crate::memtable::bulk::context::BulkIterContextRef; use crate::sst::parquet::format::ReadFormat; -use crate::sst::parquet::reader::{RowGroupReaderBase, RowGroupReaderContext}; +use crate::sst::parquet::reader::RowGroupReaderContext; use crate::sst::parquet::row_group::{ColumnChunkIterator, RowGroupBase}; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; @@ -118,10 +118,7 @@ impl RowGroupReaderContext for BulkIterContextRef { } } -pub(crate) type MemtableRowGroupReader = RowGroupReaderBase; - pub(crate) struct MemtableRowGroupReaderBuilder { - context: BulkIterContextRef, projection: ProjectionMask, parquet_metadata: Arc, field_levels: FieldLevels, @@ -130,7 +127,7 @@ pub(crate) struct MemtableRowGroupReaderBuilder { impl MemtableRowGroupReaderBuilder { pub(crate) fn try_new( - context: BulkIterContextRef, + context: &BulkIterContextRef, projection: ProjectionMask, parquet_metadata: Arc, data: Bytes, @@ -141,7 +138,6 @@ impl MemtableRowGroupReaderBuilder { parquet_to_arrow_field_levels(parquet_schema_desc, projection.clone(), hint) .context(ReadDataPartSnafu)?; Ok(Self { - context, projection, parquet_metadata, field_levels, @@ -154,7 +150,7 @@ impl MemtableRowGroupReaderBuilder { &self, row_group_idx: usize, row_selection: Option, - ) -> error::Result { + ) -> error::Result { let mut row_group = MemtableRowGroupPageFetcher::create( row_group_idx, &self.parquet_metadata, @@ -165,13 +161,12 @@ impl MemtableRowGroupReaderBuilder { // Builds the parquet reader. // Now the row selection is None. - let reader = ParquetRecordBatchReader::try_new_with_row_groups( + ParquetRecordBatchReader::try_new_with_row_groups( &self.field_levels, &row_group, DEFAULT_READ_BATCH_SIZE, row_selection, ) - .context(ReadDataPartSnafu)?; - Ok(MemtableRowGroupReader::create(self.context.clone(), reader)) + .context(ReadDataPartSnafu) } }