1use std::collections::VecDeque;
16use std::time::Instant;
17
18use datatypes::arrow::array::BooleanArray;
19use datatypes::arrow::record_batch::RecordBatch;
20use parquet::arrow::ProjectionMask;
21use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
22use snafu::ResultExt;
23use store_api::storage::SequenceRange;
24
25use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
26use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
27use crate::memtable::bulk::part::EncodedBulkPart;
28use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
29use crate::memtable::{MemScanMetrics, MemScanMetricsData};
30use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
31use crate::sst::parquet::file_range::{PreFilterMode, TagDecodeState};
32use crate::sst::parquet::flat_format::sequence_column_index;
33
34pub struct EncodedBulkPartIter {
36 context: BulkIterContextRef,
37 row_groups_to_read: VecDeque<usize>,
38 current_reader: Option<ParquetRecordBatchReader>,
39 builder: MemtableRowGroupReaderBuilder,
40 sequence: Option<SequenceRange>,
42 current_skip_fields: bool,
44 metrics: MemScanMetricsData,
46 mem_scan_metrics: Option<MemScanMetrics>,
48}
49
50impl EncodedBulkPartIter {
51 pub fn try_new(
53 encoded_part: &EncodedBulkPart,
54 context: BulkIterContextRef,
55 mut row_groups_to_read: VecDeque<usize>,
56 sequence: Option<SequenceRange>,
57 mem_scan_metrics: Option<MemScanMetrics>,
58 ) -> error::Result<Self> {
59 assert!(context.read_format().as_flat().is_some());
60
61 let parquet_meta = encoded_part.metadata().parquet_metadata.clone();
62 let data = encoded_part.data().clone();
63 let series_count = encoded_part.metadata().num_series as usize;
64
65 let projection_mask = ProjectionMask::roots(
66 parquet_meta.file_metadata().schema_descr(),
67 context.read_format().projection_indices().iter().copied(),
68 );
69 let builder =
70 MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
71
72 let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() {
73 Some(first_row_group) => {
74 let skip_fields = builder.compute_skip_fields(&context, first_row_group);
75 let reader = builder.build_row_group_reader(first_row_group, None)?;
76 (Some(reader), skip_fields)
77 }
78 None => (None, false),
79 };
80
81 Ok(Self {
82 context,
83 row_groups_to_read,
84 current_reader: init_reader,
85 builder,
86 sequence,
87 current_skip_fields,
88 metrics: MemScanMetricsData {
89 total_series: series_count,
90 ..Default::default()
91 },
92 mem_scan_metrics,
93 })
94 }
95
96 fn report_mem_scan_metrics(&mut self) {
97 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
98 mem_scan_metrics.merge_inner(&self.metrics);
99 }
100 }
101
102 pub(crate) fn next_record_batch(&mut self) -> error::Result<Option<RecordBatch>> {
104 let start = Instant::now();
105
106 let Some(current) = &mut self.current_reader else {
107 self.metrics.scan_cost += start.elapsed();
109 return Ok(None);
110 };
111
112 for batch in current {
113 let batch = batch.context(DecodeArrowRowGroupSnafu)?;
114 if let Some(batch) = apply_combined_filters(
115 &self.context,
116 &self.sequence,
117 batch,
118 self.current_skip_fields,
119 )? {
120 self.metrics.num_batches += 1;
122 self.metrics.num_rows += batch.num_rows();
123 self.metrics.scan_cost += start.elapsed();
124 return Ok(Some(batch));
125 }
126 }
127
128 while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
130 self.current_skip_fields = self
132 .builder
133 .compute_skip_fields(&self.context, next_row_group);
134
135 let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
136 let current = self.current_reader.insert(next_reader);
137
138 for batch in current {
139 let batch = batch.context(DecodeArrowRowGroupSnafu)?;
140 if let Some(batch) = apply_combined_filters(
141 &self.context,
142 &self.sequence,
143 batch,
144 self.current_skip_fields,
145 )? {
146 self.metrics.num_batches += 1;
148 self.metrics.num_rows += batch.num_rows();
149 self.metrics.scan_cost += start.elapsed();
150 return Ok(Some(batch));
151 }
152 }
153 }
154
155 self.metrics.scan_cost += start.elapsed();
156 Ok(None)
157 }
158}
159
160impl Iterator for EncodedBulkPartIter {
161 type Item = error::Result<RecordBatch>;
162
163 fn next(&mut self) -> Option<Self::Item> {
164 let result = self.next_record_batch().transpose();
165
166 if result.is_none() {
168 self.report_mem_scan_metrics();
169 }
170
171 result
172 }
173}
174
175impl Drop for EncodedBulkPartIter {
176 fn drop(&mut self) {
177 common_telemetry::debug!(
178 "EncodedBulkPartIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}",
179 self.context.region_id(),
180 self.metrics.total_series,
181 self.metrics.num_rows,
182 self.metrics.num_batches,
183 self.metrics.scan_cost
184 );
185
186 self.report_mem_scan_metrics();
188
189 READ_ROWS_TOTAL
190 .with_label_values(&["bulk_memtable"])
191 .inc_by(self.metrics.num_rows as u64);
192 READ_STAGE_ELAPSED
193 .with_label_values(&["scan_memtable"])
194 .observe(self.metrics.scan_cost.as_secs_f64());
195 }
196}
197
198pub struct BulkPartBatchIter {
202 batches: VecDeque<RecordBatch>,
204 context: BulkIterContextRef,
206 sequence: Option<SequenceRange>,
208 metrics: MemScanMetricsData,
210 mem_scan_metrics: Option<MemScanMetrics>,
212}
213
214impl BulkPartBatchIter {
215 pub fn new(
217 batches: Vec<RecordBatch>,
218 context: BulkIterContextRef,
219 sequence: Option<SequenceRange>,
220 series_count: usize,
221 mem_scan_metrics: Option<MemScanMetrics>,
222 ) -> Self {
223 assert!(context.read_format().as_flat().is_some());
224
225 Self {
226 batches: VecDeque::from(batches),
227 context,
228 sequence,
229 metrics: MemScanMetricsData {
230 total_series: series_count,
231 ..Default::default()
232 },
233 mem_scan_metrics,
234 }
235 }
236
237 pub fn from_single(
239 record_batch: RecordBatch,
240 context: BulkIterContextRef,
241 sequence: Option<SequenceRange>,
242 series_count: usize,
243 mem_scan_metrics: Option<MemScanMetrics>,
244 ) -> Self {
245 Self::new(
246 vec![record_batch],
247 context,
248 sequence,
249 series_count,
250 mem_scan_metrics,
251 )
252 }
253
254 fn report_mem_scan_metrics(&mut self) {
255 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
256 mem_scan_metrics.merge_inner(&self.metrics);
257 }
258 }
259
260 fn apply_projection(&self, record_batch: RecordBatch) -> error::Result<RecordBatch> {
262 let projection_indices = self.context.read_format().projection_indices();
263 if projection_indices.len() == record_batch.num_columns() {
264 return Ok(record_batch);
265 }
266
267 record_batch
268 .project(projection_indices)
269 .context(ComputeArrowSnafu)
270 }
271
272 fn process_batch(&mut self, record_batch: RecordBatch) -> error::Result<Option<RecordBatch>> {
273 let start = Instant::now();
274
275 let projected_batch = self.apply_projection(record_batch)?;
277
278 let skip_fields = match self.context.pre_filter_mode() {
280 PreFilterMode::All => false,
281 PreFilterMode::SkipFields => true,
282 PreFilterMode::SkipFieldsOnDelete => true,
283 };
284
285 let Some(filtered_batch) =
286 apply_combined_filters(&self.context, &self.sequence, projected_batch, skip_fields)?
287 else {
288 self.metrics.scan_cost += start.elapsed();
289 return Ok(None);
290 };
291
292 self.metrics.num_batches += 1;
294 self.metrics.num_rows += filtered_batch.num_rows();
295 self.metrics.scan_cost += start.elapsed();
296
297 Ok(Some(filtered_batch))
298 }
299}
300
301impl Iterator for BulkPartBatchIter {
302 type Item = error::Result<RecordBatch>;
303
304 fn next(&mut self) -> Option<Self::Item> {
305 while let Some(batch) = self.batches.pop_front() {
307 match self.process_batch(batch) {
308 Ok(Some(result)) => return Some(Ok(result)),
309 Ok(None) => continue, Err(e) => {
311 self.report_mem_scan_metrics();
312 return Some(Err(e));
313 }
314 }
315 }
316
317 self.report_mem_scan_metrics();
319 None
320 }
321}
322
323impl Drop for BulkPartBatchIter {
324 fn drop(&mut self) {
325 common_telemetry::debug!(
326 "BulkPartBatchIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}",
327 self.context.region_id(),
328 self.metrics.total_series,
329 self.metrics.num_rows,
330 self.metrics.num_batches,
331 self.metrics.scan_cost
332 );
333
334 self.report_mem_scan_metrics();
336
337 READ_ROWS_TOTAL
338 .with_label_values(&["bulk_memtable"])
339 .inc_by(self.metrics.num_rows as u64);
340 READ_STAGE_ELAPSED
341 .with_label_values(&["scan_memtable"])
342 .observe(self.metrics.scan_cost.as_secs_f64());
343 }
344}
345
346fn apply_combined_filters(
352 context: &BulkIterContext,
353 sequence: &Option<SequenceRange>,
354 record_batch: RecordBatch,
355 skip_fields: bool,
356) -> error::Result<Option<RecordBatch>> {
357 let format = context.read_format().as_flat().unwrap();
359 let record_batch = format.convert_batch(record_batch, None)?;
360
361 let num_rows = record_batch.num_rows();
362 let mut combined_filter = None;
363 let mut tag_decode_state = TagDecodeState::new();
364
365 if !context.base.filters.is_empty() {
367 let predicate_mask = context.base.compute_filter_mask_flat(
368 &record_batch,
369 skip_fields,
370 false,
371 &mut tag_decode_state,
372 )?;
373 let Some(mask) = predicate_mask else {
375 return Ok(None);
376 };
377 combined_filter = Some(BooleanArray::from(mask));
378 }
379
380 if let Some(sequence) = sequence {
382 let sequence_column =
383 record_batch.column(sequence_column_index(record_batch.num_columns()));
384 let sequence_filter = sequence
385 .filter(&sequence_column)
386 .context(ComputeArrowSnafu)?;
387 combined_filter = match combined_filter {
389 None => Some(sequence_filter),
390 Some(existing_filter) => {
391 let and_result = datatypes::arrow::compute::and(&existing_filter, &sequence_filter)
392 .context(ComputeArrowSnafu)?;
393 Some(and_result)
394 }
395 };
396 }
397
398 let Some(filter_array) = combined_filter else {
400 return Ok(Some(record_batch));
402 };
403 let select_count = filter_array.true_count();
404 if select_count == 0 {
405 return Ok(None);
406 }
407 if select_count == num_rows {
408 return Ok(Some(record_batch));
409 }
410 let filtered_batch =
411 datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array)
412 .context(ComputeArrowSnafu)?;
413
414 Ok(Some(filtered_batch))
415}
416
417#[cfg(test)]
418mod tests {
419 use std::sync::Arc;
420
421 use api::v1::SemanticType;
422 use datafusion_expr::{col, lit};
423 use datatypes::arrow::array::{
424 ArrayRef, BinaryArray, DictionaryArray, Int64Array, StringArray, UInt8Array, UInt32Array,
425 UInt64Array,
426 };
427 use datatypes::arrow::datatypes::{DataType, Field, Schema};
428 use datatypes::data_type::ConcreteDataType;
429 use datatypes::schema::ColumnSchema;
430 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
431 use store_api::storage::RegionId;
432 use table::predicate::Predicate;
433
434 use super::*;
435 use crate::memtable::bulk::context::BulkIterContext;
436
437 #[test]
438 fn test_bulk_part_batch_iter() {
439 let schema = Arc::new(Schema::new(vec![
441 Field::new("key1", DataType::Utf8, false),
442 Field::new("field1", DataType::Int64, false),
443 Field::new(
444 "timestamp",
445 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
446 false,
447 ),
448 Field::new(
449 "__primary_key",
450 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
451 false,
452 ),
453 Field::new("__sequence", DataType::UInt64, false),
454 Field::new("__op_type", DataType::UInt8, false),
455 ]));
456
457 let key1 = Arc::new(StringArray::from_iter_values(["key1", "key2", "key3"]));
459 let field1 = Arc::new(Int64Array::from(vec![11, 12, 13]));
460 let timestamp = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
461 vec![1000, 2000, 3000],
462 ));
463
464 use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array};
466 let values = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2", b"key3"]));
467 let keys = UInt32Array::from(vec![0, 1, 2]);
468 let primary_key = Arc::new(DictionaryArray::new(keys, values));
469
470 let sequence = Arc::new(UInt64Array::from(vec![1, 2, 3]));
471 let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1])); let record_batch = RecordBatch::try_new(
474 schema,
475 vec![
476 key1,
477 field1,
478 timestamp,
479 primary_key.clone(),
480 sequence,
481 op_type,
482 ],
483 )
484 .unwrap();
485
486 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
488 builder
489 .push_column_metadata(ColumnMetadata {
490 column_schema: ColumnSchema::new(
491 "key1",
492 ConcreteDataType::string_datatype(),
493 false,
494 ),
495 semantic_type: SemanticType::Tag,
496 column_id: 0,
497 })
498 .push_column_metadata(ColumnMetadata {
499 column_schema: ColumnSchema::new(
500 "field1",
501 ConcreteDataType::int64_datatype(),
502 false,
503 ),
504 semantic_type: SemanticType::Field,
505 column_id: 1,
506 })
507 .push_column_metadata(ColumnMetadata {
508 column_schema: ColumnSchema::new(
509 "timestamp",
510 ConcreteDataType::timestamp_millisecond_datatype(),
511 false,
512 ),
513 semantic_type: SemanticType::Timestamp,
514 column_id: 2,
515 })
516 .primary_key(vec![0]);
517
518 let region_metadata = builder.build().unwrap();
519
520 let context = Arc::new(
522 BulkIterContext::new(
523 Arc::new(region_metadata.clone()),
524 None, None, false,
527 )
528 .unwrap(),
529 );
530 let iter =
532 BulkPartBatchIter::from_single(record_batch.clone(), context.clone(), None, 0, None);
533 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
534 assert_eq!(1, result.len());
535 assert_eq!(3, result[0].num_rows());
536 assert_eq!(6, result[0].num_columns(),);
537
538 let iter = BulkPartBatchIter::from_single(
540 record_batch.clone(),
541 context,
542 Some(SequenceRange::LtEq { max: 2 }),
543 0,
544 None,
545 );
546 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
547 assert_eq!(1, result.len());
548 let expect_sequence = Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef;
549 assert_eq!(
550 &expect_sequence,
551 result[0].column(result[0].num_columns() - 2)
552 );
553 assert_eq!(6, result[0].num_columns());
554
555 let context = Arc::new(
556 BulkIterContext::new(
557 Arc::new(region_metadata),
558 Some(&[0, 2]),
559 Some(Predicate::new(vec![col("key1").eq(lit("key2"))])),
560 false,
561 )
562 .unwrap(),
563 );
564 let iter =
566 BulkPartBatchIter::from_single(record_batch.clone(), context.clone(), None, 0, None);
567 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
568 assert_eq!(1, result.len());
569 assert_eq!(1, result[0].num_rows());
570 assert_eq!(5, result[0].num_columns());
571 let expect_sequence = Arc::new(UInt64Array::from(vec![2])) as ArrayRef;
572 assert_eq!(
573 &expect_sequence,
574 result[0].column(result[0].num_columns() - 2)
575 );
576 }
577
578 #[test]
579 fn test_bulk_part_batch_iter_multiple_batches() {
580 let schema = Arc::new(Schema::new(vec![
582 Field::new("key1", DataType::Utf8, false),
583 Field::new("field1", DataType::Int64, false),
584 Field::new(
585 "timestamp",
586 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
587 false,
588 ),
589 Field::new(
590 "__primary_key",
591 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
592 false,
593 ),
594 Field::new("__sequence", DataType::UInt64, false),
595 Field::new("__op_type", DataType::UInt8, false),
596 ]));
597
598 let key1_1 = Arc::new(StringArray::from_iter_values(["key1", "key2"]));
600 let field1_1 = Arc::new(Int64Array::from(vec![11, 12]));
601 let timestamp_1 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
602 vec![1000, 2000],
603 ));
604 let values_1 = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2"]));
605 let keys_1 = UInt32Array::from(vec![0, 1]);
606 let primary_key_1 = Arc::new(DictionaryArray::new(keys_1, values_1));
607 let sequence_1 = Arc::new(UInt64Array::from(vec![1, 2]));
608 let op_type_1 = Arc::new(UInt8Array::from(vec![1, 1]));
609
610 let batch1 = RecordBatch::try_new(
611 schema.clone(),
612 vec![
613 key1_1,
614 field1_1,
615 timestamp_1,
616 primary_key_1,
617 sequence_1,
618 op_type_1,
619 ],
620 )
621 .unwrap();
622
623 let key1_2 = Arc::new(StringArray::from_iter_values(["key3", "key4", "key5"]));
625 let field1_2 = Arc::new(Int64Array::from(vec![13, 14, 15]));
626 let timestamp_2 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
627 vec![3000, 4000, 5000],
628 ));
629 let values_2 = Arc::new(BinaryArray::from_iter_values([b"key3", b"key4", b"key5"]));
630 let keys_2 = UInt32Array::from(vec![0, 1, 2]);
631 let primary_key_2 = Arc::new(DictionaryArray::new(keys_2, values_2));
632 let sequence_2 = Arc::new(UInt64Array::from(vec![3, 4, 5]));
633 let op_type_2 = Arc::new(UInt8Array::from(vec![1, 1, 1]));
634
635 let batch2 = RecordBatch::try_new(
636 schema.clone(),
637 vec![
638 key1_2,
639 field1_2,
640 timestamp_2,
641 primary_key_2,
642 sequence_2,
643 op_type_2,
644 ],
645 )
646 .unwrap();
647
648 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
650 builder
651 .push_column_metadata(ColumnMetadata {
652 column_schema: ColumnSchema::new(
653 "key1",
654 ConcreteDataType::string_datatype(),
655 false,
656 ),
657 semantic_type: SemanticType::Tag,
658 column_id: 0,
659 })
660 .push_column_metadata(ColumnMetadata {
661 column_schema: ColumnSchema::new(
662 "field1",
663 ConcreteDataType::int64_datatype(),
664 false,
665 ),
666 semantic_type: SemanticType::Field,
667 column_id: 1,
668 })
669 .push_column_metadata(ColumnMetadata {
670 column_schema: ColumnSchema::new(
671 "timestamp",
672 ConcreteDataType::timestamp_millisecond_datatype(),
673 false,
674 ),
675 semantic_type: SemanticType::Timestamp,
676 column_id: 2,
677 })
678 .primary_key(vec![0]);
679
680 let region_metadata = builder.build().unwrap();
681
682 let context = Arc::new(
684 BulkIterContext::new(
685 Arc::new(region_metadata),
686 None, None, false,
689 )
690 .unwrap(),
691 );
692
693 let expect_batches = vec![batch1, batch2];
695 let iter = BulkPartBatchIter::new(expect_batches.clone(), context.clone(), None, 0, None);
696
697 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
699 assert_eq!(expect_batches, result);
700 }
701}