diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index d8f53b9d71..f6d8674d4c 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -20,13 +20,14 @@ use clap::Parser; use colored::Colorize; use datanode::config::RegionEngineConfig; use datanode::store; -use either::Either; +use futures::stream; use mito2::access_layer::{ AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType, }; use mito2::cache::{CacheManager, CacheManagerRef}; use mito2::config::{FulltextIndexConfig, MitoConfig, Mode}; -use mito2::read::Source; +use mito2::read::FlatSource; +use mito2::sst::FormatType; use mito2::sst::file::{FileHandle, FileMeta}; use mito2::sst::file_purger::{FilePurger, FilePurgerRef}; use mito2::sst::index::intermediate::IntermediateManager; @@ -210,6 +211,7 @@ impl ObjbenchCommand { object_store.clone(), ) .expected_metadata(Some(region_meta.clone())) + .flat_format(true) .build() .await .map_err(|e| { @@ -231,6 +233,10 @@ impl ObjbenchCommand { let reader_build_elapsed = reader_build_start.elapsed(); let total_rows = reader.parquet_metadata().file_metadata().num_rows(); println!("{} Reader built in {:?}", "✓".green(), reader_build_elapsed); + let reader_stream = Box::pin(stream::try_unfold(reader, |mut reader| async move { + let batch = reader.next_record_batch().await?; + Ok(batch.map(|batch| (batch, reader))) + })); // Build write request let fulltext_index_config = FulltextIndexConfig { @@ -241,10 +247,11 @@ impl ObjbenchCommand { let write_req = SstWriteRequest { op_type: OperationType::Flush, metadata: region_meta, - source: Either::Left(Source::Reader(Box::new(reader))), + source: FlatSource::Stream(reader_stream), cache_manager, storage: None, max_sequence: None, + sst_write_format: FormatType::PrimaryKey, index_options: Default::default(), index_config: mito_engine_config.index.clone(), inverted_index_config: MitoConfig::default().inverted_index, diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 92c8a3bc36..231285215e 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -17,7 +17,6 @@ use std::time::{Duration, Instant}; use async_stream::try_stream; use common_time::Timestamp; -use either::Either; use futures::{Stream, TryStreamExt}; use object_store::services::Fs; use object_store::util::{join_dir, with_instrument_layers}; @@ -37,7 +36,7 @@ use crate::error::{ CleanDirSnafu, DeleteIndexSnafu, DeleteIndexesSnafu, DeleteSstsSnafu, OpenDalSnafu, Result, }; use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED}; -use crate::read::{FlatSource, Source}; +use crate::read::FlatSource; use crate::region::options::IndexOptions; use crate::sst::file::{FileHandle, RegionFileId, RegionIndexId}; use crate::sst::index::IndexerBuilderImpl; @@ -47,7 +46,7 @@ use crate::sst::location::{self, region_dir_from_table_dir}; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::parquet::{SstInfo, WriteOptions}; -use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; +use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FormatType}; pub type AccessLayerRef = Arc; /// SST write results. @@ -391,15 +390,19 @@ impl AccessLayer { ) .await .with_file_cleaner(cleaner); - match request.source { - Either::Left(source) => { + match request.sst_write_format { + FormatType::PrimaryKey => { writer - .write_all(source, request.max_sequence, write_opts) + .write_all_flat_as_primary_key( + request.source, + request.max_sequence, + write_opts, + ) .await? } - Either::Right(flat_source) => { + FormatType::Flat => { writer - .write_all_flat(flat_source, request.max_sequence, write_opts) + .write_all_flat(request.source, request.max_sequence, write_opts) .await? } } @@ -520,11 +523,12 @@ pub enum OperationType { pub struct SstWriteRequest { pub op_type: OperationType, pub metadata: RegionMetadataRef, - pub source: Either, + pub source: FlatSource, pub cache_manager: CacheManagerRef, #[allow(dead_code)] pub storage: Option, pub max_sequence: Option, + pub sst_write_format: FormatType, /// Configs for index pub index_options: IndexOptions, diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index a28df3f54c..3d373efe91 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -244,15 +244,19 @@ impl WriteCache { .await .with_file_cleaner(cleaner); - let sst_info = match write_request.source { - either::Left(source) => { + let sst_info = match write_request.sst_write_format { + crate::sst::FormatType::PrimaryKey => { writer - .write_all(source, write_request.max_sequence, write_opts) + .write_all_flat_as_primary_key( + write_request.source, + write_request.max_sequence, + write_opts, + ) .await? } - either::Right(flat_source) => { + crate::sst::FormatType::Flat => { writer - .write_all_flat(flat_source, write_request.max_sequence, write_opts) + .write_all_flat(write_request.source, write_request.max_sequence, write_opts) .await? } }; @@ -509,12 +513,13 @@ mod tests { use crate::cache::test_util::{assert_parquet_metadata_equal, new_fs_store}; use crate::cache::{CacheManager, CacheStrategy}; use crate::error::InvalidBatchSnafu; - use crate::read::Source; + use crate::read::FlatSource; use crate::region::options::IndexOptions; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::test_util::TestEnv; use crate::test_util::sst_util::{ - new_batch_by_range, new_source, sst_file_handle_with_file_id, sst_region_metadata, + new_flat_source_from_record_batches, new_record_batch_by_range, + sst_file_handle_with_file_id, sst_region_metadata, }; #[tokio::test] @@ -532,21 +537,22 @@ mod tests { .create_write_cache(local_store.clone(), ReadableSize::mb(10)) .await; - // Create Source + // Create source. let metadata = Arc::new(sst_region_metadata()); let region_id = metadata.region_id; - let source = new_source(&[ - new_batch_by_range(&["a", "d"], 0, 60), - new_batch_by_range(&["b", "f"], 0, 40), - new_batch_by_range(&["b", "h"], 100, 200), + let source = new_flat_source_from_record_batches(vec![ + new_record_batch_by_range(&["a", "d"], 0, 60), + new_record_batch_by_range(&["b", "f"], 0, 40), + new_record_batch_by_range(&["b", "h"], 100, 200), ]); let write_request = SstWriteRequest { op_type: OperationType::Flush, metadata, - source: either::Left(source), + source, storage: None, max_sequence: None, + sst_write_format: Default::default(), cache_manager: Default::default(), index_options: IndexOptions::default(), index_config: Default::default(), @@ -636,19 +642,20 @@ mod tests { // Create source let metadata = Arc::new(sst_region_metadata()); - let source = new_source(&[ - new_batch_by_range(&["a", "d"], 0, 60), - new_batch_by_range(&["b", "f"], 0, 40), - new_batch_by_range(&["b", "h"], 100, 200), + let source = new_flat_source_from_record_batches(vec![ + new_record_batch_by_range(&["a", "d"], 0, 60), + new_record_batch_by_range(&["b", "f"], 0, 40), + new_record_batch_by_range(&["b", "h"], 100, 200), ]); // Write to local cache and upload sst to mock remote store let write_request = SstWriteRequest { op_type: OperationType::Flush, metadata, - source: either::Left(source), + source, storage: None, max_sequence: None, + sst_write_format: Default::default(), cache_manager: cache_manager.clone(), index_options: IndexOptions::default(), index_config: Default::default(), @@ -715,9 +722,9 @@ mod tests { let metadata = Arc::new(sst_region_metadata()); // Creates a source that can return an error to abort the writer. - let source = Source::Iter(Box::new( + let source = FlatSource::Iter(Box::new( [ - Ok(new_batch_by_range(&["a", "d"], 0, 60)), + Ok(new_record_batch_by_range(&["a", "d"], 0, 60)), InvalidBatchSnafu { reason: "Abort the writer", } @@ -730,9 +737,10 @@ mod tests { let write_request = SstWriteRequest { op_type: OperationType::Flush, metadata, - source: either::Left(source), + source, storage: None, max_sequence: None, + sst_write_format: Default::default(), cache_manager: cache_manager.clone(), index_options: IndexOptions::default(), index_config: Default::default(), diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 6d51d1dd59..ba6957fdae 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -58,10 +58,10 @@ use crate::error::{ TimeRangePredicateOverflowSnafu, TimeoutSnafu, }; use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT}; +use crate::read::BoxedRecordBatchStream; use crate::read::projection::ProjectionMapper; use crate::read::scan_region::{PredicateGroup, ScanInput}; use crate::read::seq_scan::SeqScan; -use crate::read::{BoxedBatchReader, BoxedRecordBatchStream}; use crate::region::options::{MergeMode, RegionOptions}; use crate::region::version::VersionControlRef; use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState}; @@ -828,7 +828,7 @@ pub struct SerializedCompactionOutput { output_time_range: Option, } -/// Builders to create [BoxedBatchReader] for compaction. +/// Builders to create [BoxedRecordBatchStream] for compaction. struct CompactionSstReaderBuilder<'a> { metadata: RegionMetadataRef, sst_layer: AccessLayerRef, @@ -841,24 +841,17 @@ struct CompactionSstReaderBuilder<'a> { } impl CompactionSstReaderBuilder<'_> { - /// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order. - async fn build_sst_reader(self) -> Result { - let scan_input = self.build_scan_input(false)?.with_compaction(true); - - SeqScan::new(scan_input).build_reader_for_compaction().await - } - /// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction. async fn build_flat_sst_reader(self) -> Result { - let scan_input = self.build_scan_input(true)?.with_compaction(true); + let scan_input = self.build_scan_input()?.with_compaction(true); SeqScan::new(scan_input) .build_flat_reader_for_compaction() .await } - fn build_scan_input(self, flat_format: bool) -> Result { - let mapper = ProjectionMapper::all(&self.metadata, flat_format)?; + fn build_scan_input(self) -> Result { + let mapper = ProjectionMapper::all(&self.metadata, true)?; let mut scan_input = ScanInput::new(self.sst_layer, mapper) .with_files(self.inputs.to_vec()) .with_append_mode(self.append_mode) @@ -868,7 +861,7 @@ impl CompactionSstReaderBuilder<'_> { // We ignore file not found error during compaction. .with_ignore_file_not_found(true) .with_merge_mode(self.merge_mode) - .with_flat_format(flat_format); + .with_flat_format(true); // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944 // by converting time ranges into predicate. diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 1876972b0d..b03e6415e8 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -43,7 +43,7 @@ use crate::error::{ use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::metrics; -use crate::read::{FlatSource, Source}; +use crate::read::FlatSource; use crate::region::options::RegionOptions; use crate::region::version::VersionRef; use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState}; @@ -356,13 +356,8 @@ impl DefaultCompactor { time_range: output.output_time_range, merge_mode, }; - let source = if flat_format { - let reader = builder.build_flat_sst_reader().await?; - Either::Right(FlatSource::Stream(reader)) - } else { - let reader = builder.build_sst_reader().await?; - Either::Left(Source::Reader(reader)) - }; + let reader = builder.build_flat_sst_reader().await?; + let source = FlatSource::Stream(reader); let mut metrics = Metrics::new(WriteType::Compaction); let region_metadata = compaction_region.region_metadata.clone(); let sst_infos = compaction_region @@ -375,6 +370,11 @@ impl DefaultCompactor { cache_manager: compaction_region.cache_manager.clone(), storage, max_sequence: max_sequence.map(NonZero::get), + sst_write_format: if flat_format { + FormatType::Flat + } else { + FormatType::PrimaryKey + }, index_options, index_config, inverted_index_config, diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 0c16544b6e..fedac95d27 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -22,7 +22,6 @@ use std::time::Instant; use common_telemetry::{debug, error, info}; use datatypes::arrow::datatypes::SchemaRef; -use either::Either; use partition::expr::PartitionExpr; use smallvec::{SmallVec, smallvec}; use snafu::ResultExt; @@ -41,18 +40,14 @@ use crate::error::{ }; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::memtable::bulk::ENCODE_ROW_THRESHOLD; -use crate::memtable::{ - BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges, RangesOptions, -}; +use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, MemtableRanges, RangesOptions}; use crate::metrics::{ FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_FILE_TOTAL, FLUSH_REQUESTS_TOTAL, INFLIGHT_FLUSH_COUNT, }; -use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; +use crate::read::FlatSource; use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow}; use crate::read::flat_merge::FlatMergeIterator; -use crate::read::merge::MergeReaderBuilder; -use crate::read::{FlatSource, Source}; use crate::region::options::{IndexOptions, MergeMode, RegionOptions}; use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState, parse_partition_expr}; @@ -62,8 +57,10 @@ use crate::request::{ }; use crate::schedule::scheduler::{Job, SchedulerRef}; use crate::sst::file::FileMeta; -use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions}; -use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; +use crate::sst::parquet::{ + DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions, flat_format, +}; +use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema}; use crate::worker::WorkerListener; /// Global write buffer (memtable) manager. @@ -480,78 +477,29 @@ impl RegionFlushTask { // the counter may have more series than the actual series count. series_count += memtable_series_count; - if mem_ranges.is_record_batch() { - let flush_start = Instant::now(); - let FlushFlatMemResult { - num_encoded, - num_sources, - results, - } = self - .flush_flat_mem_ranges(version, &write_opts, mem_ranges) - .await?; - encoded_part_count += num_encoded; - for (source_idx, result) in results.into_iter().enumerate() { - let (max_sequence, ssts_written, metrics) = result?; - if ssts_written.is_empty() { - // No data written. - continue; - } - - common_telemetry::debug!( - "Region {} flush one memtable {} {}/{}, metrics: {:?}", - self.region_id, - memtable_id, - source_idx, - num_sources, - metrics - ); - - flush_metrics = flush_metrics.merge(metrics); - - file_metas.extend(ssts_written.into_iter().map(|sst_info| { - flushed_bytes += sst_info.file_size; - Self::new_file_meta( - self.region_id, - max_sequence, - sst_info, - partition_expr.clone(), - ) - })); - } - - common_telemetry::debug!( - "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}", - self.region_id, - num_sources, - memtable_id, - num_mem_ranges, - num_encoded, - num_mem_rows, - flush_start.elapsed(), - compact_cost, - ); - } else { - let max_sequence = mem_ranges.max_sequence(); - let source = memtable_source(mem_ranges, &version.options).await?; - - // Flush to level 0. - let source = Either::Left(source); - let write_request = self.new_write_request(version, max_sequence, source); - - let mut metrics = Metrics::new(WriteType::Flush); - let ssts_written = self - .access_layer - .write_sst(write_request, &write_opts, &mut metrics) - .await?; - FLUSH_FILE_TOTAL.inc_by(ssts_written.len() as u64); + let flush_start = Instant::now(); + let FlushFlatMemResult { + num_encoded, + num_sources, + results, + } = self + .flush_flat_mem_ranges(version, &write_opts, mem_ranges) + .await?; + encoded_part_count += num_encoded; + for (source_idx, result) in results.into_iter().enumerate() { + let (max_sequence, ssts_written, metrics) = result?; if ssts_written.is_empty() { // No data written. continue; } - debug!( - "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}", - self.region_id, num_mem_ranges, num_mem_rows, metrics + common_telemetry::debug!( + "Region {} flush one memtable {} {}/{}, metrics: {:?}", + self.region_id, + memtable_id, + source_idx, + num_sources, + metrics ); flush_metrics = flush_metrics.merge(metrics); @@ -565,7 +513,19 @@ impl RegionFlushTask { partition_expr.clone(), ) })); - }; + } + + common_telemetry::debug!( + "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}", + self.region_id, + num_sources, + memtable_id, + num_mem_ranges, + num_encoded, + num_mem_rows, + flush_start.elapsed(), + compact_cost, + ); } Ok(DoFlushMemtablesResult { @@ -587,16 +547,17 @@ impl RegionFlushTask { &version.metadata, &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding), ); + let field_column_start = + flat_format::field_column_start(&version.metadata, batch_schema.fields().len()); let flat_sources = memtable_flat_sources( batch_schema, mem_ranges, &version.options, - version.metadata.primary_key.len(), + field_column_start, )?; let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len()); let num_encoded = flat_sources.encoded.len(); for (source, max_sequence) in flat_sources.sources { - let source = Either::Right(source); let write_request = self.new_write_request(version, max_sequence, source); let access_layer = self.access_layer.clone(); let write_opts = write_opts.clone(); @@ -667,8 +628,13 @@ impl RegionFlushTask { &self, version: &VersionRef, max_sequence: u64, - source: Either, + source: FlatSource, ) -> SstWriteRequest { + let flat_format = version + .options + .sst_format + .map(|f| f == FormatType::Flat) + .unwrap_or(self.engine_config.default_experimental_flat_format); SstWriteRequest { op_type: OperationType::Flush, metadata: version.metadata.clone(), @@ -676,6 +642,11 @@ impl RegionFlushTask { cache_manager: self.cache_manager.clone(), storage: version.options.storage.clone(), max_sequence: Some(max_sequence), + sst_write_format: if flat_format { + FormatType::Flat + } else { + FormatType::PrimaryKey + }, index_options: self.index_options.clone(), index_config: self.engine_config.index.clone(), inverted_index_config: self.engine_config.inverted_index.clone(), @@ -722,41 +693,6 @@ struct DoFlushMemtablesResult { flush_metrics: Metrics, } -/// Returns a [Source] for the given memtable. -async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result { - let source = if mem_ranges.ranges.len() == 1 { - let only_range = mem_ranges.ranges.into_values().next().unwrap(); - let iter = only_range.build_iter()?; - Source::Iter(iter) - } else { - // todo(hl): a workaround since sync version of MergeReader is wip. - let sources = mem_ranges - .ranges - .into_values() - .map(|r| r.build_iter().map(Source::Iter)) - .collect::>>()?; - let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?; - let maybe_dedup = if options.append_mode { - // no dedup in append mode - Box::new(merge_reader) as _ - } else { - // dedup according to merge mode - match options.merge_mode.unwrap_or(MergeMode::LastRow) { - MergeMode::LastRow => { - Box::new(DedupReader::new(merge_reader, LastRow::new(false), None)) as _ - } - MergeMode::LastNonNull => Box::new(DedupReader::new( - merge_reader, - LastNonNull::new(false), - None, - )) as _, - } - }; - Source::Reader(maybe_dedup) - }; - Ok(source) -} - struct FlatSources { sources: SmallVec<[(FlatSource, SequenceNumber); 4]>, encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>, diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index cf2ced06fe..6056a42013 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -57,7 +57,7 @@ use crate::memtable::{ use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow}; use crate::read::flat_merge::FlatMergeIterator; use crate::region::options::MergeMode; -use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM; +use crate::sst::parquet::flat_format::field_column_start; use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE}; use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; @@ -1186,13 +1186,8 @@ impl MemtableCompactor { Box::new(dedup_iter) } MergeMode::LastNonNull => { - // Calculates field column start: total columns - fixed columns - field columns - // Field column count = total metadata columns - time index column - primary key columns - let field_column_count = - metadata.column_metadatas.len() - 1 - metadata.primary_key.len(); - let total_columns = arrow_schema.fields().len(); let field_column_start = - total_columns - FIXED_POS_COLUMN_NUM - field_column_count; + field_column_start(metadata, arrow_schema.fields().len()); let dedup_iter = FlatDedupIterator::new( merged_iter, diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 29ded3d49a..2f9fa002d4 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -80,11 +80,6 @@ impl PruneReader { } } - pub(crate) fn reset_source(&mut self, source: Source, skip_fields: bool) { - self.source = source; - self.skip_fields = skip_fields; - } - /// Merge metrics with the inner reader and return the merged metrics. pub(crate) fn metrics(&self) -> ReaderMetrics { let mut metrics = self.metrics.clone(); diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index c13b40d111..d2be17cc83 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -128,28 +128,6 @@ impl SeqScan { Ok(Box::pin(futures::stream::iter(streams).flatten())) } - /// Builds a [BoxedBatchReader] from sequential scan for compaction. - /// - /// # Panics - /// Panics if the compaction flag is not set. - pub async fn build_reader_for_compaction(&self) -> Result { - assert!(self.stream_ctx.input.compaction); - - let metrics_set = ExecutionPlanMetricsSet::new(); - let part_metrics = self.new_partition_metrics(false, &metrics_set, 0); - debug_assert_eq!(1, self.properties.partitions.len()); - let partition_ranges = &self.properties.partitions[0]; - - let reader = Self::merge_all_ranges_for_compaction( - &self.stream_ctx, - partition_ranges, - &part_metrics, - self.pruner.clone(), - ) - .await?; - Ok(Box::new(reader)) - } - /// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction. /// /// # Panics @@ -172,40 +150,6 @@ impl SeqScan { Ok(reader) } - /// Builds a merge reader that reads all ranges. - /// Callers MUST not split ranges before calling this method. - async fn merge_all_ranges_for_compaction( - stream_ctx: &Arc, - partition_ranges: &[PartitionRange], - part_metrics: &PartitionMetrics, - pruner: Arc, - ) -> Result { - pruner.add_partition_ranges(partition_ranges); - let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges)); - - let mut sources = Vec::new(); - for part_range in partition_ranges { - build_sources( - stream_ctx, - part_range, - true, - part_metrics, - partition_pruner.clone(), - &mut sources, - None, - ) - .await?; - } - - common_telemetry::debug!( - "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}", - stream_ctx.input.mapper.metadata().region_id, - partition_ranges.len(), - sources.len() - ); - Self::build_reader_from_sources(stream_ctx, sources, None, None).await - } - /// Builds a merge reader that reads all flat ranges. /// Callers MUST not split ranges before calling this method. async fn merge_all_flat_ranges_for_compaction( diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index 78e4c563b1..94bc1feea8 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -31,7 +31,6 @@ use store_api::storage::consts::{ OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME, }; -use crate::read::Batch; use crate::sst::parquet::flat_format::time_index_column_index; pub mod file; @@ -260,33 +259,6 @@ pub(crate) struct SeriesEstimator { } impl SeriesEstimator { - /// Updates the estimator with a new Batch. - /// - /// Since each Batch contains only one series, this increments the series count - /// and updates the last timestamp. - pub(crate) fn update(&mut self, batch: &Batch) { - let Some(last_ts) = batch.last_timestamp() else { - return; - }; - - // Checks if there's a boundary between the last batch and this batch - if let Some(prev_last_ts) = self.last_timestamp { - // If the first timestamp of this batch is less than the last timestamp - // we've seen, it indicates a new series - if let Some(first_ts) = batch.first_timestamp() - && first_ts.value() <= prev_last_ts - { - self.series_count += 1; - } - } else { - // First batch, counts as first series - self.series_count = 1; - } - - // Updates the last timestamp - self.last_timestamp = Some(last_ts.value()); - } - /// Updates the estimator with a new record batch in flat format. /// /// This method examines the time index column to detect series boundaries. @@ -340,43 +312,14 @@ impl SeriesEstimator { mod tests { use std::sync::Arc; - use api::v1::OpType; use datatypes::arrow::array::{ - BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt8Builder, - UInt32Array, UInt64Array, + BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, + UInt64Array, }; use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit}; use datatypes::arrow::record_batch::RecordBatch; use super::*; - use crate::read::{Batch, BatchBuilder}; - - fn new_batch( - primary_key: &[u8], - timestamps: &[i64], - sequences: &[u64], - op_types: &[OpType], - ) -> Batch { - let timestamps = Arc::new(TimestampMillisecondArray::from(timestamps.to_vec())); - let sequences = Arc::new(UInt64Array::from(sequences.to_vec())); - let mut op_type_builder = UInt8Builder::with_capacity(op_types.len()); - for op_type in op_types { - op_type_builder.append_value(*op_type as u8); - } - let op_types = Arc::new(UInt8Array::from( - op_types.iter().map(|op| *op as u8).collect::>(), - )); - - let mut builder = BatchBuilder::new(primary_key.to_vec()); - builder - .timestamps_array(timestamps) - .unwrap() - .sequences_array(sequences) - .unwrap() - .op_types_array(op_types) - .unwrap(); - builder.build().unwrap() - } fn new_flat_record_batch(timestamps: &[i64]) -> RecordBatch { // Flat format has: [fields..., time_index, __primary_key, __sequence, __op_type] @@ -411,128 +354,6 @@ mod tests { RecordBatch::try_new(schema, vec![time_array, pk_array, seq_array, op_array]).unwrap() } - #[test] - fn test_series_estimator_empty_batch() { - let mut estimator = SeriesEstimator::default(); - let batch = new_batch(b"test", &[], &[], &[]); - estimator.update(&batch); - assert_eq!(0, estimator.finish()); - } - - #[test] - fn test_series_estimator_single_batch() { - let mut estimator = SeriesEstimator::default(); - let batch = new_batch( - b"test", - &[1, 2, 3], - &[1, 2, 3], - &[OpType::Put, OpType::Put, OpType::Put], - ); - estimator.update(&batch); - assert_eq!(1, estimator.finish()); - } - - #[test] - fn test_series_estimator_multiple_batches_same_series() { - let mut estimator = SeriesEstimator::default(); - - // First batch with timestamps 1, 2, 3 - let batch1 = new_batch( - b"test", - &[1, 2, 3], - &[1, 2, 3], - &[OpType::Put, OpType::Put, OpType::Put], - ); - estimator.update(&batch1); - - // Second batch with timestamps 4, 5, 6 (continuation) - let batch2 = new_batch( - b"test", - &[4, 5, 6], - &[4, 5, 6], - &[OpType::Put, OpType::Put, OpType::Put], - ); - estimator.update(&batch2); - - assert_eq!(1, estimator.finish()); - } - - #[test] - fn test_series_estimator_new_series_detected() { - let mut estimator = SeriesEstimator::default(); - - // First batch with timestamps 1, 2, 3 - let batch1 = new_batch( - b"pk0", - &[1, 2, 3], - &[1, 2, 3], - &[OpType::Put, OpType::Put, OpType::Put], - ); - estimator.update(&batch1); - - // Second batch with timestamps 2, 3, 4 (timestamp goes back, new series) - let batch2 = new_batch( - b"pk1", - &[2, 3, 4], - &[4, 5, 6], - &[OpType::Put, OpType::Put, OpType::Put], - ); - estimator.update(&batch2); - - assert_eq!(2, estimator.finish()); - } - - #[test] - fn test_series_estimator_equal_timestamp_boundary() { - let mut estimator = SeriesEstimator::default(); - - // First batch ending at timestamp 5 - let batch1 = new_batch( - b"test", - &[1, 2, 5], - &[1, 2, 3], - &[OpType::Put, OpType::Put, OpType::Put], - ); - estimator.update(&batch1); - - // Second batch starting at timestamp 5 (equal, indicates new series) - let batch2 = new_batch( - b"test", - &[5, 6, 7], - &[4, 5, 6], - &[OpType::Put, OpType::Put, OpType::Put], - ); - estimator.update(&batch2); - - assert_eq!(2, estimator.finish()); - } - - #[test] - fn test_series_estimator_finish_resets_state() { - let mut estimator = SeriesEstimator::default(); - - let batch1 = new_batch( - b"test", - &[1, 2, 3], - &[1, 2, 3], - &[OpType::Put, OpType::Put, OpType::Put], - ); - estimator.update(&batch1); - - assert_eq!(1, estimator.finish()); - - // After finish, state should be reset - let batch2 = new_batch( - b"test", - &[4, 5, 6], - &[4, 5, 6], - &[OpType::Put, OpType::Put, OpType::Put], - ); - estimator.update(&batch2); - - assert_eq!(1, estimator.finish()); - } - #[test] fn test_series_estimator_flat_empty_batch() { let mut estimator = SeriesEstimator::default(); diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 0df3229e9c..88aebfc001 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -58,7 +58,7 @@ use crate::error::{ }; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::INDEX_CREATE_MEMORY_USAGE; -use crate::read::{Batch, BatchReader}; +use crate::read::Batch; use crate::region::options::IndexOptions; use crate::region::version::VersionControlRef; use crate::region::{ManifestContextRef, RegionLeaderState}; @@ -802,9 +802,9 @@ impl IndexBuildTask { if let Some(mut parquet_reader) = parquet_reader { // TODO(SNC123): optimize index batch loop { - match parquet_reader.next_batch().await { - Ok(Some(mut batch)) => { - indexer.update(&mut batch).await; + match parquet_reader.next_record_batch().await { + Ok(Some(batch)) => { + indexer.update_flat(&batch).await; } Ok(None) => break, Err(e) => { @@ -1227,7 +1227,9 @@ mod tests { use crate::sst::parquet::WriteOptions; use crate::test_util::memtable_util::EmptyMemtableBuilder; use crate::test_util::scheduler_util::SchedulerEnv; - use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata}; + use crate::test_util::sst_util::{ + new_flat_source_from_record_batches, new_record_batch_by_range, sst_region_metadata, + }; struct MetaConfig { with_inverted: bool, @@ -1358,19 +1360,20 @@ mod tests { env: &SchedulerEnv, build_mode: IndexBuildMode, ) -> SstInfo { - let source = new_source(&[ - new_batch_by_range(&["a", "d"], 0, 60), - new_batch_by_range(&["b", "f"], 0, 40), - new_batch_by_range(&["b", "h"], 100, 200), + let source = new_flat_source_from_record_batches(vec![ + new_record_batch_by_range(&["a", "d"], 0, 60), + new_record_batch_by_range(&["b", "f"], 0, 40), + new_record_batch_by_range(&["b", "h"], 100, 200), ]); let mut index_config = MitoConfig::default().index; index_config.build_mode = build_mode; let write_request = SstWriteRequest { op_type: OperationType::Flush, metadata: metadata.clone(), - source: either::Left(source), + source, storage: None, max_sequence: None, + sst_write_format: Default::default(), cache_manager: Default::default(), index_options: IndexOptions::default(), index_config, diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index aa98b69176..1c5bfd9db0 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -110,6 +110,7 @@ mod tests { TimestampMillisecondArray, UInt8Array, UInt64Array, }; use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type}; + use datatypes::arrow::util::pretty::pretty_format_batches; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions}; use object_store::ObjectStore; @@ -129,7 +130,7 @@ mod tests { use crate::cache::test_util::assert_parquet_metadata_equal; use crate::cache::{CacheManager, CacheStrategy, PageKey}; use crate::config::IndexConfig; - use crate::read::{BatchBuilder, BatchReader, FlatSource}; + use crate::read::FlatSource; use crate::region::options::{IndexOptions, InvertedIndexOptions}; use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId}; use crate::sst::file_purger::NoopFilePurger; @@ -137,19 +138,19 @@ mod tests { use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl}; - use crate::sst::parquet::format::PrimaryKeyWriteFormat; + use crate::sst::parquet::flat_format::FlatWriteFormat; use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics}; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::{ DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, location, to_flat_sst_arrow_schema, }; + use crate::test_util::TestEnv; use crate::test_util::sst_util::{ - build_test_binary_test_region_metadata, new_batch_by_range, new_batch_with_binary, - new_batch_with_custom_sequence, new_primary_key, new_source, new_sparse_primary_key, - sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata, + build_test_binary_test_region_metadata, new_flat_source_from_record_batches, + new_primary_key, new_record_batch_by_range, new_record_batch_with_custom_sequence, + new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata, sst_region_metadata_with_encoding, }; - use crate::test_util::{TestEnv, check_reader_result}; const FILE_DIR: &str = "/"; const REGION_ID: RegionId = RegionId::new(0, 0); @@ -191,10 +192,10 @@ mod tests { region_file_id: handle.file_id(), }; let metadata = Arc::new(sst_region_metadata()); - let source = new_source(&[ - new_batch_by_range(&["a", "d"], 0, 60), - new_batch_by_range(&["b", "f"], 0, 40), - new_batch_by_range(&["b", "h"], 100, 200), + let source = new_flat_source_from_record_batches(vec![ + new_record_batch_by_range(&["a", "d"], 0, 60), + new_record_batch_by_range(&["b", "f"], 0, 40), + new_record_batch_by_range(&["b", "h"], 100, 200), ]); // Use a small row group size for test. let write_opts = WriteOptions { @@ -214,7 +215,7 @@ mod tests { .await; let info = writer - .write_all(source, None, &write_opts) + .write_all_flat_as_primary_key(source, None, &write_opts) .await .unwrap() .remove(0); @@ -235,14 +236,14 @@ mod tests { object_store, ); let mut reader = builder.build().await.unwrap().unwrap(); - check_reader_result( + check_record_batch_reader_result( &mut reader, &[ - new_batch_by_range(&["a", "d"], 0, 50), - new_batch_by_range(&["a", "d"], 50, 60), - new_batch_by_range(&["b", "f"], 0, 40), - new_batch_by_range(&["b", "h"], 100, 150), - new_batch_by_range(&["b", "h"], 150, 200), + new_record_batch_by_range(&["a", "d"], 0, 50), + new_record_batch_by_range(&["a", "d"], 50, 60), + new_record_batch_by_range(&["b", "f"], 0, 40), + new_record_batch_by_range(&["b", "h"], 100, 150), + new_record_batch_by_range(&["b", "h"], 150, 200), ], ) .await; @@ -254,10 +255,10 @@ mod tests { let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let metadata = Arc::new(sst_region_metadata()); - let source = new_source(&[ - new_batch_by_range(&["a", "d"], 0, 60), - new_batch_by_range(&["b", "f"], 0, 40), - new_batch_by_range(&["b", "h"], 100, 200), + let source = new_flat_source_from_record_batches(vec![ + new_record_batch_by_range(&["a", "d"], 0, 60), + new_record_batch_by_range(&["b", "f"], 0, 40), + new_record_batch_by_range(&["b", "h"], 100, 200), ]); // Use a small row group size for test. let write_opts = WriteOptions { @@ -279,7 +280,7 @@ mod tests { .await; let sst_info = writer - .write_all(source, None, &write_opts) + .write_all_flat_as_primary_key(source, None, &write_opts) .await .unwrap() .remove(0); @@ -299,14 +300,14 @@ mod tests { .cache(cache.clone()); for _ in 0..3 { let mut reader = builder.build().await.unwrap().unwrap(); - check_reader_result( + check_record_batch_reader_result( &mut reader, &[ - new_batch_by_range(&["a", "d"], 0, 50), - new_batch_by_range(&["a", "d"], 50, 60), - new_batch_by_range(&["b", "f"], 0, 40), - new_batch_by_range(&["b", "h"], 100, 150), - new_batch_by_range(&["b", "h"], 150, 200), + new_record_batch_by_range(&["a", "d"], 0, 50), + new_record_batch_by_range(&["a", "d"], 50, 60), + new_record_batch_by_range(&["b", "f"], 0, 40), + new_record_batch_by_range(&["b", "h"], 100, 150), + new_record_batch_by_range(&["b", "h"], 150, 200), ], ) .await; @@ -340,10 +341,10 @@ mod tests { let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let metadata = Arc::new(sst_region_metadata()); - let source = new_source(&[ - new_batch_by_range(&["a", "d"], 0, 60), - new_batch_by_range(&["b", "f"], 0, 40), - new_batch_by_range(&["b", "h"], 100, 200), + let source = new_flat_source_from_record_batches(vec![ + new_record_batch_by_range(&["a", "d"], 0, 60), + new_record_batch_by_range(&["b", "f"], 0, 40), + new_record_batch_by_range(&["b", "h"], 100, 200), ]); let write_opts = WriteOptions { row_group_size: 50, @@ -366,7 +367,7 @@ mod tests { .await; let sst_info = writer - .write_all(source, None, &write_opts) + .write_all_flat_as_primary_key(source, None, &write_opts) .await .unwrap() .remove(0); @@ -392,10 +393,10 @@ mod tests { let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let metadata = Arc::new(sst_region_metadata()); - let source = new_source(&[ - new_batch_by_range(&["a", "d"], 0, 60), - new_batch_by_range(&["b", "f"], 0, 40), - new_batch_by_range(&["b", "h"], 100, 200), + let source = new_flat_source_from_record_batches(vec![ + new_record_batch_by_range(&["a", "d"], 0, 60), + new_record_batch_by_range(&["b", "f"], 0, 40), + new_record_batch_by_range(&["b", "h"], 100, 200), ]); // Use a small row group size for test. let write_opts = WriteOptions { @@ -416,7 +417,7 @@ mod tests { ) .await; writer - .write_all(source, None, &write_opts) + .write_all_flat_as_primary_key(source, None, &write_opts) .await .unwrap() .remove(0); @@ -436,11 +437,11 @@ mod tests { ) .predicate(predicate); let mut reader = builder.build().await.unwrap().unwrap(); - check_reader_result( + check_record_batch_reader_result( &mut reader, &[ - new_batch_by_range(&["a", "d"], 0, 50), - new_batch_by_range(&["a", "d"], 50, 60), + new_record_batch_by_range(&["a", "d"], 0, 50), + new_record_batch_by_range(&["a", "d"], 50, 60), ], ) .await; @@ -452,10 +453,10 @@ mod tests { let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let metadata = Arc::new(sst_region_metadata()); - let source = new_source(&[ - new_batch_by_range(&["a", "z"], 0, 0), - new_batch_by_range(&["a", "z"], 100, 100), - new_batch_by_range(&["a", "z"], 200, 230), + let source = new_flat_source_from_record_batches(vec![ + new_record_batch_by_range(&["a", "z"], 0, 0), + new_record_batch_by_range(&["a", "z"], 100, 100), + new_record_batch_by_range(&["a", "z"], 200, 230), ]); // Use a small row group size for test. let write_opts = WriteOptions { @@ -476,7 +477,7 @@ mod tests { ) .await; writer - .write_all(source, None, &write_opts) + .write_all_flat_as_primary_key(source, None, &write_opts) .await .unwrap() .remove(0); @@ -488,7 +489,11 @@ mod tests { object_store, ); let mut reader = builder.build().await.unwrap().unwrap(); - check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await; + check_record_batch_reader_result( + &mut reader, + &[new_record_batch_by_range(&["a", "z"], 200, 230)], + ) + .await; } #[tokio::test] @@ -497,10 +502,10 @@ mod tests { let object_store = env.init_object_store_manager(); let handle = sst_file_handle(0, 1000); let metadata = Arc::new(sst_region_metadata()); - let source = new_source(&[ - new_batch_by_range(&["a", "d"], 0, 60), - new_batch_by_range(&["b", "f"], 0, 40), - new_batch_by_range(&["b", "h"], 100, 200), + let source = new_flat_source_from_record_batches(vec![ + new_record_batch_by_range(&["a", "d"], 0, 60), + new_record_batch_by_range(&["b", "f"], 0, 40), + new_record_batch_by_range(&["b", "h"], 100, 200), ]); // Use a small row group size for test. let write_opts = WriteOptions { @@ -522,7 +527,7 @@ mod tests { .await; writer - .write_all(source, None, &write_opts) + .write_all_flat_as_primary_key(source, None, &write_opts) .await .unwrap() .remove(0); @@ -542,7 +547,11 @@ mod tests { ) .predicate(predicate); let mut reader = builder.build().await.unwrap().unwrap(); - check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await; + check_record_batch_reader_result( + &mut reader, + &[new_record_batch_by_range(&["b", "h"], 150, 200)], + ) + .await; } #[tokio::test] @@ -569,7 +578,7 @@ mod tests { let writer_props = props_builder.build(); - let write_format = PrimaryKeyWriteFormat::new(metadata); + let write_format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default()); let fields: Vec<_> = write_format .arrow_schema() .fields() @@ -603,9 +612,8 @@ mod tests { ) .unwrap(); - let batch = new_batch_with_binary(&["a"], 0, 60); - let arrow_batch = write_format.convert_batch(&batch).unwrap(); - let arrays: Vec<_> = arrow_batch + let batch = new_record_batch_with_binary(&["a"], 0, 60); + let arrays: Vec<_> = batch .columns() .iter() .map(|array| { @@ -629,11 +637,11 @@ mod tests { object_store, ); let mut reader = builder.build().await.unwrap().unwrap(); - check_reader_result( + check_record_batch_reader_result( &mut reader, &[ - new_batch_with_binary(&["a"], 0, 50), - new_batch_with_binary(&["a"], 50, 60), + new_record_batch_with_binary(&["a"], 0, 50), + new_record_batch_with_binary(&["a"], 50, 60), ], ) .await; @@ -646,17 +654,17 @@ mod tests { let mut env = TestEnv::new().await; let object_store = env.init_object_store_manager(); let metadata = Arc::new(sst_region_metadata()); - let batches = &[ - new_batch_by_range(&["a", "d"], 0, 1000), - new_batch_by_range(&["b", "f"], 0, 1000), - new_batch_by_range(&["c", "g"], 0, 1000), - new_batch_by_range(&["b", "h"], 100, 200), - new_batch_by_range(&["b", "h"], 200, 300), - new_batch_by_range(&["b", "h"], 300, 1000), + let batches = vec![ + new_record_batch_by_range(&["a", "d"], 0, 1000), + new_record_batch_by_range(&["b", "f"], 0, 1000), + new_record_batch_by_range(&["c", "g"], 0, 1000), + new_record_batch_by_range(&["b", "h"], 100, 200), + new_record_batch_by_range(&["b", "h"], 200, 300), + new_record_batch_by_range(&["b", "h"], 300, 1000), ]; let total_rows: usize = batches.iter().map(|batch| batch.num_rows()).sum(); - let source = new_source(batches); + let source = new_flat_source_from_record_batches(batches); let write_opts = WriteOptions { row_group_size: 50, max_file_size: Some(1024 * 16), @@ -678,7 +686,10 @@ mod tests { ) .await; - let files = writer.write_all(source, None, &write_opts).await.unwrap(); + let files = writer + .write_all_flat_as_primary_key(source, None, &write_opts) + .await + .unwrap(); assert_eq!(2, files.len()); let mut rows_read = 0; @@ -695,7 +706,7 @@ mod tests { object_store.clone(), ); let mut reader = builder.build().await.unwrap().unwrap(); - while let Some(batch) = reader.next_batch().await.unwrap() { + while let Some(batch) = reader.next_record_batch().await.unwrap() { rows_read += batch.num_rows(); } } @@ -710,12 +721,12 @@ mod tests { let metadata = Arc::new(sst_region_metadata()); let row_group_size = 50; - let source = new_source(&[ - new_batch_by_range(&["a", "d"], 0, 20), - new_batch_by_range(&["b", "d"], 0, 20), - new_batch_by_range(&["c", "d"], 0, 20), - new_batch_by_range(&["c", "f"], 0, 40), - new_batch_by_range(&["c", "h"], 100, 200), + let source = new_flat_source_from_record_batches(vec![ + new_record_batch_by_range(&["a", "d"], 0, 20), + new_record_batch_by_range(&["b", "d"], 0, 20), + new_record_batch_by_range(&["c", "d"], 0, 20), + new_record_batch_by_range(&["c", "f"], 0, 40), + new_record_batch_by_range(&["c", "h"], 100, 200), ]); // Use a small row group size for test. let write_opts = WriteOptions { @@ -760,7 +771,7 @@ mod tests { .await; let info = writer - .write_all(source, None, &write_opts) + .write_all_flat_as_primary_key(source, None, &write_opts) .await .unwrap() .remove(0); @@ -877,6 +888,7 @@ mod tests { handle.clone(), object_store.clone(), ) + .flat_format(true) .predicate(Some(Predicate::new(preds))) .inverted_index_appliers([inverted_index_applier.clone(), None]) .bloom_filter_index_appliers([bloom_filter_applier.clone(), None]) @@ -891,7 +903,11 @@ mod tests { let mut reader = ParquetReader::new(Arc::new(context), selection) .await .unwrap(); - check_reader_result(&mut reader, &[new_batch_by_range(&["b", "d"], 0, 20)]).await; + check_record_batch_reader_result( + &mut reader, + &[new_record_batch_by_range(&["b", "d"], 0, 20)], + ) + .await; assert_eq!(metrics.filter_metrics.rg_total, 4); assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 3); @@ -937,6 +953,7 @@ mod tests { handle.clone(), object_store.clone(), ) + .flat_format(true) .predicate(Some(Predicate::new(preds))) .inverted_index_appliers([inverted_index_applier.clone(), None]) .bloom_filter_index_appliers([bloom_filter_applier.clone(), None]) @@ -991,6 +1008,7 @@ mod tests { handle.clone(), object_store.clone(), ) + .flat_format(true) .predicate(Some(Predicate::new(preds))) .inverted_index_appliers([inverted_index_applier.clone(), None]) .bloom_filter_index_appliers([bloom_filter_applier.clone(), None]) @@ -1005,13 +1023,13 @@ mod tests { let mut reader = ParquetReader::new(Arc::new(context), selection) .await .unwrap(); - check_reader_result( + check_record_batch_reader_result( &mut reader, &[ - new_batch_by_range(&["a", "d"], 0, 20), - new_batch_by_range(&["b", "d"], 0, 20), - new_batch_by_range(&["c", "d"], 0, 10), - new_batch_by_range(&["c", "d"], 10, 20), + new_record_batch_by_range(&["a", "d"], 0, 20), + new_record_batch_by_range(&["b", "d"], 0, 20), + new_record_batch_by_range(&["c", "d"], 0, 10), + new_record_batch_by_range(&["c", "d"], 10, 20), ], ) .await; @@ -1032,37 +1050,32 @@ mod tests { assert!(cached.contains_row_group(3)); } - /// Creates a flat format RecordBatch for testing. - /// Similar to `new_batch_by_range` but returns a RecordBatch in flat format. - fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch { + fn new_record_batch_with_binary(tags: &[&str], start: usize, end: usize) -> RecordBatch { assert!(end >= start); - let metadata = Arc::new(sst_region_metadata()); + let metadata = build_test_binary_test_region_metadata(); let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); let num_rows = end - start; let mut columns = Vec::new(); - // Add primary key columns (tag_0, tag_1) as dictionary arrays let mut tag_0_builder = StringDictionaryBuilder::::new(); - let mut tag_1_builder = StringDictionaryBuilder::::new(); - for _ in 0..num_rows { tag_0_builder.append_value(tags[0]); - tag_1_builder.append_value(tags[1]); } - columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef); - columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef); - // Add field column (field_0) - let field_values: Vec = (start..end).map(|v| v as u64).collect(); - columns.push(Arc::new(UInt64Array::from(field_values))); + let values = (0..num_rows) + .map(|_| "some data".as_bytes()) + .collect::>(); + columns.push( + Arc::new(datatypes::arrow::array::BinaryArray::from_iter_values( + values, + )) as ArrayRef, + ); - // Add time index column (ts) let timestamps: Vec = (start..end).map(|v| v as i64).collect(); columns.push(Arc::new(TimestampMillisecondArray::from(timestamps))); - // Add encoded primary key column let pk = new_primary_key(tags); let mut pk_builder = BinaryDictionaryBuilder::::new(); for _ in 0..num_rows { @@ -1070,10 +1083,7 @@ mod tests { } columns.push(Arc::new(pk_builder.finish())); - // Add sequence column columns.push(Arc::new(UInt64Array::from_value(1000, num_rows))); - - // Add op_type column columns.push(Arc::new(UInt8Array::from_value( OpType::Put as u8, num_rows, @@ -1082,9 +1092,19 @@ mod tests { RecordBatch::try_new(flat_schema, columns).unwrap() } - /// Creates a FlatSource from flat format RecordBatches. - fn new_flat_source_from_record_batches(batches: Vec) -> FlatSource { - FlatSource::Iter(Box::new(batches.into_iter().map(Ok))) + async fn check_record_batch_reader_result( + reader: &mut ParquetReader, + expected: &[RecordBatch], + ) { + let mut actual = Vec::new(); + while let Some(batch) = reader.next_record_batch().await.unwrap() { + actual.push(batch); + } + assert_eq!( + pretty_format_batches(expected).unwrap().to_string(), + pretty_format_batches(&actual).unwrap().to_string() + ); + assert!(reader.next_record_batch().await.unwrap().is_none()); } /// Creates a flat format RecordBatch for testing with sparse primary key encoding. @@ -1333,10 +1353,11 @@ mod tests { }; let metadata = Arc::new(sst_region_metadata()); - // Create batches with sequence 0 to trigger override functionality - let batch1 = new_batch_with_custom_sequence(&["a", "d"], 0, 60, 0); - let batch2 = new_batch_with_custom_sequence(&["b", "f"], 0, 40, 0); - let source = new_source(&[batch1, batch2]); + // Create batches with sequence 0 to trigger override functionality. + let source = new_flat_source_from_record_batches(vec![ + new_record_batch_with_custom_sequence(&["a", "d"], 0, 60, 0), + new_record_batch_with_custom_sequence(&["b", "f"], 0, 40, 0), + ]); let write_opts = WriteOptions { row_group_size: 50, @@ -1355,7 +1376,7 @@ mod tests { .await; writer - .write_all(source, None, &write_opts) + .write_all_flat_as_primary_key(source, None, &write_opts) .await .unwrap() .remove(0); @@ -1369,7 +1390,7 @@ mod tests { ); let mut reader = builder.build().await.unwrap().unwrap(); let mut normal_batches = Vec::new(); - while let Some(batch) = reader.next_batch().await.unwrap() { + while let Some(batch) = reader.next_record_batch().await.unwrap() { normal_batches.push(batch); } @@ -1391,22 +1412,19 @@ mod tests { ); let mut reader = builder.build().await.unwrap().unwrap(); let mut override_batches = Vec::new(); - while let Some(batch) = reader.next_batch().await.unwrap() { + while let Some(batch) = reader.next_record_batch().await.unwrap() { override_batches.push(batch); } // Compare the results assert_eq!(normal_batches.len(), override_batches.len()); for (normal, override_batch) in normal_batches.into_iter().zip(override_batches.iter()) { - // Create expected batch with override sequence let expected_batch = { - let num_rows = normal.num_rows(); - let mut builder = BatchBuilder::from(normal); - builder - .sequences_array(Arc::new(UInt64Array::from_value(custom_sequence, num_rows))) - .unwrap(); - - builder.build().unwrap() + let mut columns = normal.columns().to_vec(); + let num_cols = columns.len(); + columns[num_cols - 2] = + Arc::new(UInt64Array::from_value(custom_sequence, normal.num_rows())); + RecordBatch::try_new(normal.schema(), columns).unwrap() }; // Override batch should match expected batch diff --git a/src/mito2/src/sst/parquet/flat_format.rs b/src/mito2/src/sst/parquet/flat_format.rs index d6b061e468..8a59e9a97d 100644 --- a/src/mito2/src/sst/parquet/flat_format.rs +++ b/src/mito2/src/sst/parquet/flat_format.rs @@ -52,8 +52,8 @@ use crate::error::{ NewRecordBatchSnafu, Result, }; use crate::sst::parquet::format::{ - FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, PrimaryKeyReadFormat, ReadFormat, - StatValues, + FIXED_POS_COLUMN_NUM, FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, + PrimaryKeyReadFormat, ReadFormat, StatValues, }; use crate::sst::{ FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field, @@ -127,6 +127,21 @@ pub(crate) fn op_type_column_index(num_columns: usize) -> usize { num_columns - 1 } +/// Returns the start index of field columns in a flat batch. +/// +/// `num_columns` is the total number of columns in the flat batch schema, +/// including tag columns (if present), field columns, and fixed position columns +/// (time index, primary key, sequence, op type). +/// +/// For Dense encoding (raw PK columns included): field_column_start = primary_key.len() +/// For Sparse encoding (no raw PK columns): field_column_start = 0 +pub(crate) fn field_column_start(metadata: &RegionMetadata, num_columns: usize) -> usize { + // Calculates field column start: total columns - fixed columns - field columns + // Field column count = total metadata columns - time index column - primary key columns + let field_column_count = metadata.column_metadatas.len() - 1 - metadata.primary_key.len(); + num_columns - FIXED_POS_COLUMN_NUM - field_column_count +} + // TODO(yingwen): Add an option to skip reading internal columns if the region is // append only and doesn't use sparse encoding (We need to check the table id under // sparse encoding). @@ -765,3 +780,89 @@ impl FlatReadFormat { .unwrap() } } + +#[cfg(test)] +mod tests { + use api::v1::SemanticType; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use store_api::codec::PrimaryKeyEncoding; + use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + use super::field_column_start; + use crate::sst::{FlatSchemaOptions, flat_sst_arrow_schema_column_num}; + + /// Builds a `RegionMetadata` with the given number of tags and fields. + fn build_metadata( + num_tags: usize, + num_fields: usize, + encoding: PrimaryKeyEncoding, + ) -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0)); + let mut col_id = 0u32; + + for i in 0..num_tags { + builder.push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + format!("tag_{i}"), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: col_id, + }); + col_id += 1; + } + + for i in 0..num_fields { + builder.push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + format!("field_{i}"), + ConcreteDataType::uint64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: col_id, + }); + col_id += 1; + } + + builder.push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: col_id, + }); + + let primary_key: Vec = (0..num_tags as u32).collect(); + builder.primary_key(primary_key); + builder.primary_key_encoding(encoding); + builder.build().unwrap() + } + + #[test] + fn test_field_column_start() { + // (num_tags, num_fields, encoding, expected) + let cases = [ + (1, 1, PrimaryKeyEncoding::Dense, 1), + (2, 2, PrimaryKeyEncoding::Dense, 2), + (0, 2, PrimaryKeyEncoding::Dense, 0), + (2, 2, PrimaryKeyEncoding::Sparse, 0), + ]; + + for (num_tags, num_fields, encoding, expected) in cases { + let metadata = build_metadata(num_tags, num_fields, encoding); + let options = FlatSchemaOptions::from_encoding(encoding); + let num_columns = flat_sst_arrow_schema_column_num(&metadata, &options); + let result = field_column_start(&metadata, num_columns); + assert_eq!( + result, expected, + "num_tags={num_tags}, num_fields={num_fields}, encoding={encoding:?}" + ); + } + } +} diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 70d026e6db..ba64eac78b 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -34,12 +34,12 @@ use api::v1::SemanticType; use common_time::Timestamp; use datafusion_common::ScalarValue; use datatypes::arrow::array::{ - ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt32Array, UInt64Array, + ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt64Array, }; use datatypes::arrow::datatypes::{SchemaRef, UInt32Type}; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::DataType; -use datatypes::vectors::{Helper, Vector}; +use datatypes::vectors::Helper; use mito_codec::row_converter::{ CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec, build_primary_key_codec_with_fields, @@ -51,8 +51,7 @@ use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; use store_api::storage::{ColumnId, SequenceNumber}; use crate::error::{ - ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, - NewRecordBatchSnafu, Result, + ConvertVectorSnafu, DecodeSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result, }; use crate::read::{Batch, BatchBuilder, BatchColumn}; use crate::sst::file::{FileMeta, FileTimeRange}; @@ -73,7 +72,6 @@ pub(crate) const INTERNAL_COLUMN_NUM: usize = 3; /// Helper for writing the SST format with primary key. pub(crate) struct PrimaryKeyWriteFormat { - metadata: RegionMetadataRef, /// SST file schema. arrow_schema: SchemaRef, override_sequence: Option, @@ -84,7 +82,6 @@ impl PrimaryKeyWriteFormat { pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat { let arrow_schema = to_sst_arrow_schema(&metadata); PrimaryKeyWriteFormat { - metadata, arrow_schema, override_sequence: None, } @@ -104,40 +101,25 @@ impl PrimaryKeyWriteFormat { &self.arrow_schema } - /// Convert `batch` to a arrow record batch to store in parquet. - pub(crate) fn convert_batch(&self, batch: &Batch) -> Result { - debug_assert_eq!( - batch.fields().len() + FIXED_POS_COLUMN_NUM, - self.arrow_schema.fields().len() - ); - let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM); - // Store all fields first. - for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) { - ensure!( - column.column_id == column_metadata.column_id, - InvalidBatchSnafu { - reason: format!( - "Batch has column {} but metadata has column {}", - column.column_id, column_metadata.column_id - ), - } - ); - - columns.push(column.data.to_arrow_array()); - } - // Add time index column. - columns.push(batch.timestamps().to_arrow_array()); - // Add internal columns: primary key, sequences, op types. - columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows())); + /// Convert a flat `RecordBatch` to primary-key format, retaining only + /// field columns, time index, and internal columns. + /// + /// `num_fields` is the number of field columns. The method strips + /// leading tag columns: `num_tag_columns = batch.num_columns() - num_fields - FIXED_POS_COLUMN_NUM`. + pub(crate) fn convert_flat_batch( + &self, + batch: &RecordBatch, + num_fields: usize, + ) -> Result { + let num_tag_columns = batch.num_columns() - num_fields - FIXED_POS_COLUMN_NUM; + let mut columns: Vec = batch.columns()[num_tag_columns..].to_vec(); if let Some(override_sequence) = self.override_sequence { - let sequence_array = + let num_cols = columns.len(); + // sequence is at num_cols - 2 (before op_type) + columns[num_cols - 2] = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()])); - columns.push(sequence_array); - } else { - columns.push(batch.sequences().to_arrow_array()); } - columns.push(batch.op_types().to_arrow_array()); RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu) } @@ -926,15 +908,6 @@ pub(crate) fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result ArrayRef { - let values = Arc::new(BinaryArray::from_iter_values([primary_key])); - let keys = UInt32Array::from_value(0, num_rows); - - // Safety: The key index is valid. - Arc::new(DictionaryArray::new(keys, values)) -} - /// Gets the min/max time index of the row group from the parquet meta. /// It assumes the parquet is created by the mito engine. pub(crate) fn parquet_row_group_time_range( @@ -1017,7 +990,7 @@ mod tests { use api::v1::OpType; use datatypes::arrow::array::{ - Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt64Array, + Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array, }; use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit}; use datatypes::prelude::ConcreteDataType; @@ -1145,13 +1118,6 @@ mod tests { assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema()); } - #[test] - fn test_new_primary_key_array() { - let array = new_primary_key_array(b"test", 3); - let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef; - assert_eq!(&expect, &array); - } - fn build_test_pk_array(pk_row_nums: &[(Vec, usize)]) -> Arc { let values = Arc::new(BinaryArray::from_iter_values( pk_row_nums.iter().map(|v| &v.0), @@ -1164,49 +1130,6 @@ mod tests { Arc::new(DictionaryArray::new(keys, values)) } - #[test] - fn test_convert_batch() { - let metadata = build_test_region_metadata(); - let write_format = PrimaryKeyWriteFormat::new(metadata); - - let num_rows = 4; - let batch = new_batch(b"test", 1, 2, num_rows); - let columns: Vec = vec![ - Arc::new(Int64Array::from(vec![2; num_rows])), // field1 - Arc::new(Int64Array::from(vec![3; num_rows])), // field0 - Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts - build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key - Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence - Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type - ]; - let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap(); - - let actual = write_format.convert_batch(&batch).unwrap(); - assert_eq!(expect_record, actual); - } - - #[test] - fn test_convert_batch_with_override_sequence() { - let metadata = build_test_region_metadata(); - let write_format = - PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(415411)); - - let num_rows = 4; - let batch = new_batch(b"test", 1, 2, num_rows); - let columns: Vec = vec![ - Arc::new(Int64Array::from(vec![2; num_rows])), // field1 - Arc::new(Int64Array::from(vec![3; num_rows])), // field0 - Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts - build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key - Arc::new(UInt64Array::from(vec![415411; num_rows])), // sequence - Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type - ]; - let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap(); - - let actual = write_format.convert_batch(&batch).unwrap(); - assert_eq!(expect_record, actual); - } - #[test] fn test_projection_indices() { let metadata = build_test_region_metadata(); @@ -1867,4 +1790,100 @@ mod tests { let result = format.convert_batch(record_batch.clone(), None).unwrap(); assert_eq!(record_batch, result); } + + #[test] + fn test_convert_flat_batch() { + let metadata = build_test_region_metadata(); + let write_format = PrimaryKeyWriteFormat::new(metadata); + + let num_rows = 4; + // Build a flat record batch: tag0, tag1, field1, field0, ts, __primary_key, __sequence, __op_type + let flat_columns: Vec = input_columns_for_flat_batch(num_rows); + let flat_batch = RecordBatch::try_new(build_test_flat_sst_schema(), flat_columns).unwrap(); + + // num_fields = 2 (field1, field0) + let result = write_format.convert_flat_batch(&flat_batch, 2).unwrap(); + + // Expected: tag columns stripped, only field1, field0, ts, __primary_key, __sequence, __op_type + let expected_columns: Vec = vec![ + Arc::new(Int64Array::from(vec![2; num_rows])), // field1 + Arc::new(Int64Array::from(vec![3; num_rows])), // field0 + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts + build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key + Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // __sequence + Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // __op_type + ]; + let expected = RecordBatch::try_new(build_test_arrow_schema(), expected_columns).unwrap(); + + assert_eq!(expected, result); + } + + #[test] + fn test_convert_flat_batch_with_override_sequence() { + let metadata = build_test_region_metadata(); + let write_format = PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(999)); + + let num_rows = 4; + let flat_columns: Vec = input_columns_for_flat_batch(num_rows); + let flat_batch = RecordBatch::try_new(build_test_flat_sst_schema(), flat_columns).unwrap(); + + let result = write_format.convert_flat_batch(&flat_batch, 2).unwrap(); + + let expected_columns: Vec = vec![ + Arc::new(Int64Array::from(vec![2; num_rows])), // field1 + Arc::new(Int64Array::from(vec![3; num_rows])), // field0 + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts + build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key + Arc::new(UInt64Array::from(vec![999; num_rows])), // overridden __sequence + Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // __op_type + ]; + let expected = RecordBatch::try_new(build_test_arrow_schema(), expected_columns).unwrap(); + + assert_eq!(expected, result); + } + + #[test] + fn test_convert_flat_batch_no_tags() { + // Test with a region that has no primary key columns (no tags to strip). + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field0", + ConcreteDataType::int64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }); + let metadata = Arc::new(builder.build().unwrap()); + let write_format = PrimaryKeyWriteFormat::new(metadata); + + let num_rows = 3; + // No tag columns, so flat batch is: field0, ts, __primary_key, __sequence, __op_type + let sst_schema = write_format.arrow_schema().clone(); + let columns: Vec = vec![ + Arc::new(Int64Array::from(vec![10; num_rows])), // field0 + Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3])), // ts + build_test_pk_array(&[(b"".to_vec(), num_rows)]), // __primary_key + Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // __sequence + Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // __op_type + ]; + let flat_batch = RecordBatch::try_new(sst_schema.clone(), columns.clone()).unwrap(); + + // num_fields = 1, num_tag_columns = 5 - 1 - 4 = 0, so nothing is stripped + let result = write_format.convert_flat_batch(&flat_batch, 1).unwrap(); + let expected = RecordBatch::try_new(sst_schema, columns).unwrap(); + + assert_eq!(expected, result); + } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 500f32ae91..4d7122ccc6 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -21,9 +21,8 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use api::v1::SemanticType; -use async_trait::async_trait; use common_recordbatch::filter::SimpleFilterEvaluator; -use common_telemetry::{debug, tracing, warn}; +use common_telemetry::{tracing, warn}; use datafusion_expr::Expr; use datatypes::arrow::array::ArrayRef; use datatypes::arrow::datatypes::Field; @@ -57,7 +56,7 @@ use crate::metrics::{ READ_ROWS_TOTAL, READ_STAGE_ELAPSED, }; use crate::read::flat_projection::CompactionProjectionMapper; -use crate::read::prune::{PruneReader, Source}; +use crate::read::prune::FlatPruneReader; use crate::read::{Batch, BatchReader}; use crate::sst::file::FileHandle; use crate::sst::index::bloom_filter::applier::{ @@ -303,7 +302,8 @@ impl ParquetReaderBuilder { pub async fn build(&self) -> Result> { let mut metrics = ReaderMetrics::default(); - let Some((context, selection)) = self.build_reader_input(&mut metrics).await? else { + let Some((context, selection)) = self.build_reader_input_inner(&mut metrics, true).await? + else { return Ok(None); }; ParquetReader::new(Arc::new(context), selection) @@ -325,12 +325,14 @@ impl ParquetReaderBuilder { &self, metrics: &mut ReaderMetrics, ) -> Result> { - self.build_reader_input_inner(metrics).await + self.build_reader_input_inner(metrics, self.flat_format) + .await } async fn build_reader_input_inner( &self, metrics: &mut ReaderMetrics, + flat_format: bool, ) -> Result> { let start = Instant::now(); @@ -373,7 +375,7 @@ impl ParquetReaderBuilder { // before compat handling. let compaction_projection_mapper = if self.compaction && !is_same_region_partition - && self.flat_format + && flat_format && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse { Some(CompactionProjectionMapper::try_new(®ion_meta)?) @@ -385,7 +387,7 @@ impl ParquetReaderBuilder { ReadFormat::new( region_meta.clone(), Some(column_ids), - self.flat_format, + flat_format, Some(parquet_meta.file_metadata().schema_descr().num_columns()), &file_path, skip_auto_convert, @@ -401,7 +403,7 @@ impl ParquetReaderBuilder { ReadFormat::new( region_meta.clone(), Some(&column_ids), - self.flat_format, + flat_format, Some(parquet_meta.file_metadata().schema_descr().num_columns()), &file_path, skip_auto_convert, @@ -1751,24 +1753,6 @@ impl RowGroupReaderBuilder { } } -/// The state of a [ParquetReader]. -enum ReaderState { - /// The reader is reading a row group. - Readable(PruneReader), - /// The reader is exhausted. - Exhausted(ReaderMetrics), -} - -impl ReaderState { - /// Returns the metrics of the reader. - fn metrics(&self) -> ReaderMetrics { - match self { - ReaderState::Readable(reader) => reader.metrics(), - ReaderState::Exhausted(m) => m.clone(), - } - } -} - /// The filter to evaluate or the prune result of the default value. pub(crate) enum MaybeFilter { /// The filter to evaluate. @@ -1879,13 +1863,12 @@ pub struct ParquetReader { /// Row group selection to read. selection: RowGroupSelection, /// Reader of current row group. - reader_state: ReaderState, + reader: Option, /// Metrics for tracking row group fetch operations. fetch_metrics: ParquetFetchMetrics, } -#[async_trait] -impl BatchReader for ParquetReader { +impl ParquetReader { #[tracing::instrument( skip_all, fields( @@ -1893,18 +1876,20 @@ impl BatchReader for ParquetReader { file_id = %self.context.reader_builder().file_handle.file_id() ) )] - async fn next_batch(&mut self) -> Result> { - let ReaderState::Readable(reader) = &mut self.reader_state else { - return Ok(None); - }; + pub async fn next_record_batch(&mut self) -> Result> { + loop { + if let Some(reader) = &mut self.reader { + if let Some(batch) = reader.next_batch()? { + return Ok(Some(batch)); + } + self.reader = None; + continue; + } - // We don't collect the elapsed time if the reader returns an error. - if let Some(batch) = reader.next_batch().await? { - return Ok(Some(batch)); - } + let Some((row_group_idx, row_selection)) = self.selection.pop_first() else { + return Ok(None); + }; - // No more items in current row group, reads next row group. - while let Some((row_group_idx, row_selection)) = self.selection.pop_first() { let parquet_reader = self .context .reader_builder() @@ -1915,54 +1900,14 @@ impl BatchReader for ParquetReader { ) .await?; - // Resets the parquet reader. - // Compute skip_fields for this row group let skip_fields = self.context.should_skip_fields(row_group_idx); - reader.reset_source( - Source::RowGroup(RowGroupReader::new(self.context.clone(), parquet_reader)), + self.reader = Some(FlatPruneReader::new_with_row_group_reader( + self.context.clone(), + FlatRowGroupReader::new(self.context.clone(), parquet_reader), skip_fields, - ); - if let Some(batch) = reader.next_batch().await? { - return Ok(Some(batch)); - } + )); } - - // The reader is exhausted. - self.reader_state = ReaderState::Exhausted(reader.metrics().clone()); - Ok(None) } -} - -impl Drop for ParquetReader { - fn drop(&mut self) { - let metrics = self.reader_state.metrics(); - debug!( - "Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}", - self.context.reader_builder().file_handle.region_id(), - self.context.reader_builder().file_handle.file_id(), - self.context.reader_builder().file_handle.time_range(), - metrics.filter_metrics.rg_total - - metrics.filter_metrics.rg_inverted_filtered - - metrics.filter_metrics.rg_minmax_filtered - - metrics.filter_metrics.rg_fulltext_filtered - - metrics.filter_metrics.rg_bloom_filtered, - metrics.filter_metrics.rg_total, - metrics - ); - - // Report metrics. - READ_STAGE_ELAPSED - .with_label_values(&["build_parquet_reader"]) - .observe(metrics.build_cost.as_secs_f64()); - READ_STAGE_ELAPSED - .with_label_values(&["scan_row_groups"]) - .observe(metrics.scan_cost.as_secs_f64()); - metrics.observe_rows("parquet_reader"); - metrics.filter_metrics.observe(); - } -} - -impl ParquetReader { /// Creates a new reader. #[tracing::instrument( skip_all, @@ -1975,28 +1920,27 @@ impl ParquetReader { context: FileRangeContextRef, mut selection: RowGroupSelection, ) -> Result { + debug_assert!(context.read_format().as_flat().is_some()); let fetch_metrics = ParquetFetchMetrics::default(); - // No more items in current row group, reads next row group. - let reader_state = if let Some((row_group_idx, row_selection)) = selection.pop_first() { + let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() { let parquet_reader = context .reader_builder() .build(row_group_idx, Some(row_selection), Some(&fetch_metrics)) .await?; - // Compute skip_fields once for this row group let skip_fields = context.should_skip_fields(row_group_idx); - ReaderState::Readable(PruneReader::new_with_row_group_reader( + Some(FlatPruneReader::new_with_row_group_reader( context.clone(), - RowGroupReader::new(context.clone(), parquet_reader), + FlatRowGroupReader::new(context.clone(), parquet_reader), skip_fields, )) } else { - ReaderState::Exhausted(ReaderMetrics::default()) + None }; Ok(ParquetReader { context, selection, - reader_state, + reader, fetch_metrics, }) } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index b207f11ef8..4e75073e26 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -50,7 +50,7 @@ use crate::config::{IndexBuildMode, IndexConfig}; use crate::error::{ InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu, }; -use crate::read::{Batch, FlatSource, Source}; +use crate::read::FlatSource; use crate::sst::file::RegionFileId; use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder}; use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index}; @@ -60,6 +60,35 @@ use crate::sst::{ DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator, }; +/// Converts a flat RecordBatch for writing to parquet. +enum FlatBatchConverter { + /// Write as-is in flat format. + Flat(FlatWriteFormat), + /// Convert flat batch to primary-key format by stripping tag columns. + PrimaryKey { + format: PrimaryKeyWriteFormat, + num_fields: usize, + }, +} + +impl FlatBatchConverter { + fn arrow_schema(&self) -> &SchemaRef { + match self { + FlatBatchConverter::Flat(f) => f.arrow_schema(), + FlatBatchConverter::PrimaryKey { format, .. } => format.arrow_schema(), + } + } + + fn convert_batch(&self, batch: &RecordBatch) -> Result { + match self { + FlatBatchConverter::Flat(f) => f.convert_batch(batch), + FlatBatchConverter::PrimaryKey { format, num_fields } => { + format.convert_flat_batch(batch, *num_fields) + } + } + } +} + /// Parquet SST writer. pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> { /// Path provider that creates SST and index file paths according to file id. @@ -240,81 +269,6 @@ where Ok(()) } - /// Iterates source and writes all rows to Parquet file. - /// - /// Returns the [SstInfo] if the SST is written. - pub async fn write_all( - &mut self, - source: Source, - override_sequence: Option, // override the `sequence` field from `Source` - opts: &WriteOptions, - ) -> Result { - let res = self - .write_all_without_cleaning(source, override_sequence, opts) - .await; - if res.is_err() { - // Clean tmp files explicitly on failure. - let file_id = self.current_file; - if let Some(cleaner) = &self.file_cleaner { - cleaner.clean_by_file_id(file_id).await; - } - } - res - } - - async fn write_all_without_cleaning( - &mut self, - mut source: Source, - override_sequence: Option, // override the `sequence` field from `Source` - opts: &WriteOptions, - ) -> Result { - let mut results = smallvec![]; - let write_format = PrimaryKeyWriteFormat::new(self.metadata.clone()) - .with_override_sequence(override_sequence); - let mut stats = SourceStats::default(); - - while let Some(res) = self - .write_next_batch(&mut source, &write_format, opts) - .await - .transpose() - { - match res { - Ok(mut batch) => { - stats.update(&batch); - let start = Instant::now(); - // safety: self.current_indexer must be set when first batch has been written. - match self.index_config.build_mode { - IndexBuildMode::Sync => { - self.current_indexer - .as_mut() - .unwrap() - .update(&mut batch) - .await; - } - IndexBuildMode::Async => {} - } - self.metrics.update_index += start.elapsed(); - if let Some(max_file_size) = opts.max_file_size - && self.bytes_written.load(Ordering::Relaxed) > max_file_size - { - self.finish_current_file(&mut results, &mut stats).await?; - } - } - Err(e) => { - if let Some(indexer) = &mut self.current_indexer { - indexer.abort().await; - } - return Err(e); - } - } - } - - self.finish_current_file(&mut results, &mut stats).await?; - - // object_store.write will make sure all bytes are written or an error is raised. - Ok(results) - } - /// Iterates FlatSource and writes all RecordBatch in flat format to Parquet file. /// /// Returns the [SstInfo] if the SST is written. @@ -324,11 +278,15 @@ where override_sequence: Option, opts: &WriteOptions, ) -> Result { - let res = self - .write_all_flat_without_cleaning(source, override_sequence, opts) - .await; + let converter = FlatBatchConverter::Flat( + FlatWriteFormat::new( + self.metadata.clone(), + &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding), + ) + .with_override_sequence(override_sequence), + ); + let res = self.write_all_flat_inner(source, &converter, opts).await; if res.is_err() { - // Clean tmp files explicitly on failure. let file_id = self.current_file; if let Some(cleaner) = &self.file_cleaner { cleaner.clean_by_file_id(file_id).await; @@ -337,36 +295,58 @@ where res } - async fn write_all_flat_without_cleaning( + /// Iterates FlatSource and writes all RecordBatch in primary-key format to Parquet file. + /// + /// Returns the [SstInfo] if the SST is written. + pub async fn write_all_flat_as_primary_key( &mut self, - mut source: FlatSource, + source: FlatSource, override_sequence: Option, opts: &WriteOptions, + ) -> Result { + let num_fields = self.metadata.field_columns().count(); + let converter = FlatBatchConverter::PrimaryKey { + format: PrimaryKeyWriteFormat::new(self.metadata.clone()) + .with_override_sequence(override_sequence), + num_fields, + }; + let res = self.write_all_flat_inner(source, &converter, opts).await; + if res.is_err() { + let file_id = self.current_file; + if let Some(cleaner) = &self.file_cleaner { + cleaner.clean_by_file_id(file_id).await; + } + } + res + } + + async fn write_all_flat_inner( + &mut self, + mut source: FlatSource, + converter: &FlatBatchConverter, + opts: &WriteOptions, ) -> Result { let mut results = smallvec![]; - let flat_format = FlatWriteFormat::new( - self.metadata.clone(), - &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding), - ) - .with_override_sequence(override_sequence); let mut stats = SourceStats::default(); while let Some(record_batch) = self - .write_next_flat_batch(&mut source, &flat_format, opts) + .write_next_flat_batch(&mut source, converter, opts) .await .transpose() { match record_batch { Ok(batch) => { stats.update_flat(&batch)?; - let start = Instant::now(); - // safety: self.current_indexer must be set when first batch has been written. - self.current_indexer - .as_mut() - .unwrap() - .update_flat(&batch) - .await; - self.metrics.update_index += start.elapsed(); + if matches!(self.index_config.build_mode, IndexBuildMode::Sync) { + let start = Instant::now(); + // safety: self.current_indexer must be set when first batch has been written. + self.current_indexer + .as_mut() + .unwrap() + .update_flat(&batch) + .await; + self.metrics.update_index += start.elapsed(); + } if let Some(max_file_size) = opts.max_file_size && self.bytes_written.load(Ordering::Relaxed) > max_file_size { @@ -411,34 +391,10 @@ where .set_column_compression(op_type_col, Compression::UNCOMPRESSED) } - async fn write_next_batch( - &mut self, - source: &mut Source, - write_format: &PrimaryKeyWriteFormat, - opts: &WriteOptions, - ) -> Result> { - let start = Instant::now(); - let Some(batch) = source.next_batch().await? else { - return Ok(None); - }; - self.metrics.iter_source += start.elapsed(); - - let arrow_batch = write_format.convert_batch(&batch)?; - - let start = Instant::now(); - self.maybe_init_writer(write_format.arrow_schema(), opts) - .await? - .write(&arrow_batch) - .await - .context(WriteParquetSnafu)?; - self.metrics.write_batch += start.elapsed(); - Ok(Some(batch)) - } - async fn write_next_flat_batch( &mut self, source: &mut FlatSource, - flat_format: &FlatWriteFormat, + converter: &FlatBatchConverter, opts: &WriteOptions, ) -> Result> { let start = Instant::now(); @@ -447,15 +403,16 @@ where }; self.metrics.iter_source += start.elapsed(); - let arrow_batch = flat_format.convert_batch(&record_batch)?; + let arrow_batch = converter.convert_batch(&record_batch)?; let start = Instant::now(); - self.maybe_init_writer(flat_format.arrow_schema(), opts) + self.maybe_init_writer(converter.arrow_schema(), opts) .await? .write(&arrow_batch) .await .context(WriteParquetSnafu)?; self.metrics.write_batch += start.elapsed(); + // Return original flat batch for stats/indexer which use flat layout. Ok(Some(record_batch)) } @@ -515,26 +472,6 @@ struct SourceStats { } impl SourceStats { - fn update(&mut self, batch: &Batch) { - if batch.is_empty() { - return; - } - - self.num_rows += batch.num_rows(); - self.series_estimator.update(batch); - // Safety: batch is not empty. - let (min_in_batch, max_in_batch) = ( - batch.first_timestamp().unwrap(), - batch.last_timestamp().unwrap(), - ); - if let Some(time_range) = &mut self.time_range { - time_range.0 = time_range.0.min(min_in_batch); - time_range.1 = time_range.1.max(max_in_batch); - } else { - self.time_range = Some((min_in_batch, max_in_batch)); - } - } - fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> { if record_batch.num_rows() == 0 { return Ok(()); diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 389d9bf107..e9515030c0 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -18,7 +18,11 @@ use std::sync::Arc; use api::v1::{OpType, SemanticType}; use common_time::Timestamp; -use datatypes::arrow::array::{BinaryArray, TimestampMillisecondArray, UInt8Array, UInt64Array}; +use datatypes::arrow::array::{ + ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder, + TimestampMillisecondArray, UInt8Array, UInt64Array, +}; +use datatypes::arrow::datatypes::UInt32Type; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SkippingIndexOptions}; use datatypes::value::ValueRef; @@ -32,8 +36,9 @@ use store_api::metric_engine_consts::{ use store_api::storage::consts::ReservedColumnId; use store_api::storage::{FileId, RegionId}; -use crate::read::{Batch, BatchBuilder, Source}; +use crate::read::{Batch, FlatSource, Source}; use crate::sst::file::{FileHandle, FileMeta}; +use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; use crate::test_util::{VecBatchReader, new_batch_builder, new_noop_file_purger}; /// Test region id. @@ -246,34 +251,68 @@ pub fn new_batch_by_range(tags: &[&str], start: usize, end: usize) -> Batch { new_batch_with_custom_sequence(tags, start, end, 1000) } -pub fn new_batch_with_binary(tags: &[&str], start: usize, end: usize) -> Batch { +/// Creates a flat format RecordBatch for testing. +/// Similar to `new_batch_by_range` but returns a RecordBatch in flat format. +pub fn new_record_batch_by_range(tags: &[&str], start: usize, end: usize) -> RecordBatch { + new_record_batch_with_custom_sequence(tags, start, end, 1000) +} + +/// Creates a flat format RecordBatch for testing with a custom sequence. +pub fn new_record_batch_with_custom_sequence( + tags: &[&str], + start: usize, + end: usize, + sequence: u64, +) -> RecordBatch { assert!(end >= start); + let metadata = Arc::new(sst_region_metadata()); + let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + + let num_rows = end - start; + let mut columns = Vec::new(); + + // Add primary key columns (tag_0, tag_1) as dictionary arrays + let mut tag_0_builder = StringDictionaryBuilder::::new(); + let mut tag_1_builder = StringDictionaryBuilder::::new(); + + for _ in 0..num_rows { + tag_0_builder.append_value(tags[0]); + tag_1_builder.append_value(tags[1]); + } + + columns.push(Arc::new(tag_0_builder.finish()) as ArrayRef); + columns.push(Arc::new(tag_1_builder.finish()) as ArrayRef); + + // Add field column (field_0) + let field_values: Vec = (start..end).map(|v| v as u64).collect(); + columns.push(Arc::new(UInt64Array::from(field_values))); + + // Add time index column (ts) + let timestamps: Vec = (start..end).map(|v| v as i64).collect(); + columns.push(Arc::new(TimestampMillisecondArray::from(timestamps))); + + // Add encoded primary key column let pk = new_primary_key(tags); - let timestamps: Vec<_> = (start..end).map(|v| v as i64).collect(); - let sequences = vec![1000; end - start]; - let op_types = vec![OpType::Put; end - start]; + let mut pk_builder = BinaryDictionaryBuilder::::new(); + for _ in 0..num_rows { + pk_builder.append(&pk).unwrap(); + } + columns.push(Arc::new(pk_builder.finish())); - let field: Vec<_> = (start..end) - .map(|_v| "some data".as_bytes().to_vec()) - .collect(); + // Add sequence column + columns.push(Arc::new(UInt64Array::from_value(sequence, num_rows))); - let mut builder = BatchBuilder::new(pk); - builder - .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values( - timestamps.iter().copied(), - ))) - .unwrap() - .sequences_array(Arc::new(UInt64Array::from_iter_values( - sequences.iter().copied(), - ))) - .unwrap() - .op_types_array(Arc::new(UInt8Array::from_iter_values( - op_types.iter().map(|v| *v as u8), - ))) - .unwrap() - .push_field_array(1, Arc::new(BinaryArray::from_iter_values(field))) - .unwrap(); - builder.build().unwrap() + // Add op_type column + columns.push(Arc::new(UInt8Array::from_value( + OpType::Put as u8, + num_rows, + ))); + RecordBatch::try_new(flat_schema, columns).unwrap() +} + +/// Creates a FlatSource from flat format RecordBatches. +pub fn new_flat_source_from_record_batches(batches: Vec) -> FlatSource { + FlatSource::Iter(Box::new(batches.into_iter().map(Ok))) } /// Creates a new region metadata for testing SSTs with binary datatype.