diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs index f6d8674d4c..1f3591635f 100644 --- a/src/cmd/src/datanode/objbench.rs +++ b/src/cmd/src/datanode/objbench.rs @@ -211,7 +211,6 @@ impl ObjbenchCommand { object_store.clone(), ) .expected_metadata(Some(region_meta.clone())) - .flat_format(true) .build() .await .map_err(|e| { diff --git a/src/cmd/src/datanode/scanbench.rs b/src/cmd/src/datanode/scanbench.rs index 51064126fe..a93aca430a 100644 --- a/src/cmd/src/datanode/scanbench.rs +++ b/src/cmd/src/datanode/scanbench.rs @@ -102,10 +102,6 @@ pub struct ScanbenchCommand { #[clap(long, value_name = "FILE")] pprof_file: Option, - /// Force reading the region in flat format. - #[clap(long, default_value_t = false)] - force_flat_format: bool, - /// Enable WAL replay when opening the region. #[clap(long, default_value_t = false)] enable_wal: bool, @@ -580,12 +576,11 @@ impl ScanbenchCommand { }; println!( - "{} Scanner: {}, Parallelism: {}, Iterations: {}, Force flat format: {}", + "{} Scanner: {}, Parallelism: {}, Iterations: {}", "ℹ".blue(), self.scanner, self.parallelism, self.iterations, - self.force_flat_format, ); // Start profiling if pprof_file is specified (unless pprof_after_warmup is set) @@ -626,7 +621,6 @@ impl ScanbenchCommand { filters: filters.clone(), series_row_selector, distribution, - force_flat_format: self.force_flat_format, ..Default::default() }; diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index a43fa8a0a6..944c51ebd6 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -848,7 +848,7 @@ impl CompactionSstReaderBuilder<'_> { } fn build_scan_input(self) -> Result { - let mapper = ProjectionMapper::all(&self.metadata, true)?; + let mapper = ProjectionMapper::all(&self.metadata)?; let mut scan_input = ScanInput::new(self.sst_layer, mapper) .with_files(self.inputs.to_vec()) .with_append_mode(self.append_mode) @@ -857,8 +857,7 @@ impl CompactionSstReaderBuilder<'_> { .with_filter_deleted(self.filter_deleted) // We ignore file not found error during compaction. .with_ignore_file_not_found(true) - .with_merge_mode(self.merge_mode) - .with_flat_format(true); + .with_merge_mode(self.merge_mode); // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944 // by converting time ranges into predicate. diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs index 46f4cc6cf2..6357f01775 100644 --- a/src/mito2/src/engine/scan_test.rs +++ b/src/mito2/src/engine/scan_test.rs @@ -12,11 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; + use api::v1::Rows; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datatypes::arrow::array::AsArray; +use datatypes::arrow::datatypes::{Float64Type, TimestampMillisecondType}; use futures::TryStreamExt; use store_api::region_engine::{PrepareRequest, RegionEngine, RegionScanner}; use store_api::region_request::RegionRequest; @@ -222,11 +226,16 @@ async fn test_max_concurrent_scan_files_with_format(flat_format: bool) { } #[tokio::test] -async fn test_series_scan_primarykey() { +async fn test_series_scan() { + test_series_scan_with_format(false).await; + test_series_scan_with_format(true).await; +} + +async fn test_series_scan_with_format(flat_format: bool) { let mut env = TestEnv::with_prefix("test_series_scan").await; let engine = env .create_engine(MitoConfig { - default_experimental_flat_format: false, + default_experimental_flat_format: flat_format, ..Default::default() }) .await; @@ -295,10 +304,27 @@ async fn test_series_scan_primarykey() { }) .unwrap(); + let actual_rows = collect_partition_rows_round_robin(&scanner, 3).await; + + let mut expected_rows = Vec::new(); + for value in [0_i64, 1, 2, 3, 4, 5, 3600, 3601, 3602, 7200, 7201, 7202] { + expected_rows.push((value.to_string(), value as f64, value * 1000)); + } + expected_rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.2.cmp(&b.2))); + + assert_eq!(expected_rows, actual_rows); +} + +/// Scans all partitions in round-robin fashion and returns rows sorted by (tag, ts). +/// Also asserts that each series appears in only one partition. +async fn collect_partition_rows_round_robin( + scanner: &dyn RegionScanner, + num_partitions: usize, +) -> Vec<(String, f64, i64)> { let metrics_set = ExecutionPlanMetricsSet::default(); - let mut partition_batches = vec![vec![]; 3]; - let mut streams: Vec<_> = (0..3) + let mut partition_batches = vec![vec![]; num_partitions]; + let mut streams: Vec<_> = (0..num_partitions) .map(|partition| { let stream = scanner .scan_partition(&Default::default(), &metrics_set, partition) @@ -309,11 +335,11 @@ async fn test_series_scan_primarykey() { let mut num_done = 0; let mut schema = None; // Pull streams in round-robin fashion to get the consistent output from the sender. - while num_done < 3 { + while num_done < num_partitions { if schema.is_none() { schema = Some(streams[0].as_ref().unwrap().schema().clone()); } - for i in 0..3 { + for i in 0..num_partitions { let Some(mut stream) = streams[i].take() else { continue; }; @@ -326,189 +352,54 @@ async fn test_series_scan_primarykey() { } } - let mut check_result = |expected| { - let batches = - RecordBatches::try_new(schema.clone().unwrap(), partition_batches.remove(0)).unwrap(); - assert_eq!(expected, batches.pretty_print().unwrap()); - }; - - // Output series order is 0, 1, 2, 3, 3600, 3601, 3602, 4, 5, 7200, 7201, 7202 - let expected = "\ -+-------+---------+---------------------+ -| tag_0 | field_0 | ts | -+-------+---------+---------------------+ -| 0 | 0.0 | 1970-01-01T00:00:00 | -| 3 | 3.0 | 1970-01-01T00:00:03 | -| 3602 | 3602.0 | 1970-01-01T01:00:02 | -| 7200 | 7200.0 | 1970-01-01T02:00:00 | -+-------+---------+---------------------+"; - check_result(expected); - - let expected = "\ -+-------+---------+---------------------+ -| tag_0 | field_0 | ts | -+-------+---------+---------------------+ -| 1 | 1.0 | 1970-01-01T00:00:01 | -| 3600 | 3600.0 | 1970-01-01T01:00:00 | -| 4 | 4.0 | 1970-01-01T00:00:04 | -| 7201 | 7201.0 | 1970-01-01T02:00:01 | -+-------+---------+---------------------+"; - check_result(expected); - - let expected = "\ -+-------+---------+---------------------+ -| tag_0 | field_0 | ts | -+-------+---------+---------------------+ -| 2 | 2.0 | 1970-01-01T00:00:02 | -| 3601 | 3601.0 | 1970-01-01T01:00:01 | -| 5 | 5.0 | 1970-01-01T00:00:05 | -| 7202 | 7202.0 | 1970-01-01T02:00:02 | -+-------+---------+---------------------+"; - check_result(expected); + let schema = schema.unwrap(); + collect_and_assert_partition_rows(schema, partition_batches) } -#[tokio::test] -async fn test_series_scan_flat() { - let mut env = TestEnv::with_prefix("test_series_scan").await; - let engine = env - .create_engine(MitoConfig { - default_experimental_flat_format: true, - ..Default::default() - }) - .await; +/// Collects rows sorted by (tag, ts) from partition batches. +/// Also asserts that each series appears in only one partition. +fn collect_and_assert_partition_rows( + schema: datatypes::schema::SchemaRef, + partition_batches: Vec>, +) -> Vec<(String, f64, i64)> { + let mut series_to_partition = BTreeMap::new(); + let mut actual_rows = Vec::new(); - let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new() - .insert_option("compaction.type", "twcs") - .insert_option("compaction.twcs.time_window", "1h") - .build(); - let column_schemas = test_util::rows_schema(&request); + for (partition, batches) in partition_batches.into_iter().enumerate() { + let batches = RecordBatches::try_new(schema.clone(), batches).unwrap(); + let mut partition_series = Vec::new(); - engine - .handle_request(region_id, RegionRequest::Create(request)) - .await - .unwrap(); + for batch in batches.iter() { + let tags = batch.column_by_name("tag_0").unwrap().as_string::(); + let fields = batch + .column_by_name("field_0") + .unwrap() + .as_primitive::(); + let ts = batch + .column_by_name("ts") + .unwrap() + .as_primitive::(); - let put_flush_rows = async |start, end| { - let rows = Rows { - schema: column_schemas.clone(), - rows: test_util::build_rows(start, end), - }; - test_util::put_rows(&engine, region_id, rows).await; - test_util::flush_region(&engine, region_id, None).await; - }; - // generates 3 SST files - put_flush_rows(0, 3).await; - put_flush_rows(2, 6).await; - put_flush_rows(3600, 3603).await; - // Put to memtable. - let rows = Rows { - schema: column_schemas.clone(), - rows: test_util::build_rows(7200, 7203), - }; - test_util::put_rows(&engine, region_id, rows).await; - - let request = ScanRequest { - distribution: Some(TimeSeriesDistribution::PerSeries), - ..Default::default() - }; - let scanner = engine.scanner(region_id, request).await.unwrap(); - let Scanner::Series(mut scanner) = scanner else { - panic!("Scanner should be series scan"); - }; - // 3 partition ranges for 3 time window. - assert_eq!( - 3, - scanner.properties().partitions[0].len(), - "unexpected ranges: {:?}", - scanner.properties().partitions - ); - let raw_ranges: Vec<_> = scanner - .properties() - .partitions - .iter() - .flatten() - .cloned() - .collect(); - let mut new_ranges = Vec::with_capacity(3); - for range in raw_ranges { - new_ranges.push(vec![range]); - } - scanner - .prepare(PrepareRequest { - ranges: Some(new_ranges), - ..Default::default() - }) - .unwrap(); - - let metrics_set = ExecutionPlanMetricsSet::default(); - - let mut partition_batches = vec![vec![]; 3]; - let mut streams: Vec<_> = (0..3) - .map(|partition| { - let stream = scanner - .scan_partition(&Default::default(), &metrics_set, partition) - .unwrap(); - Some(stream) - }) - .collect(); - let mut num_done = 0; - let mut schema = None; - // Pull streams in round-robin fashion to get the consistent output from the sender. - while num_done < 3 { - if schema.is_none() { - schema = Some(streams[0].as_ref().unwrap().schema().clone()); + for row in 0..batch.num_rows() { + let tag = tags.value(row).to_string(); + let field = fields.value(row); + let ts = ts.value(row); + partition_series.push(tag.clone()); + actual_rows.push((tag, field, ts)); + } } - for i in 0..3 { - let Some(mut stream) = streams[i].take() else { - continue; - }; - let Some(rb) = stream.try_next().await.unwrap() else { - num_done += 1; - continue; - }; - partition_batches[i].push(rb); - streams[i] = Some(stream); + + partition_series.sort(); + partition_series.dedup(); + for tag in partition_series { + let prev = series_to_partition.insert(tag.clone(), partition); + assert_eq!( + None, prev, + "series {tag} appears in multiple partitions: {prev:?} and {partition}" + ); } } - let mut check_result = |expected| { - let batches = - RecordBatches::try_new(schema.clone().unwrap(), partition_batches.remove(0)).unwrap(); - assert_eq!(expected, batches.pretty_print().unwrap()); - }; - - // Output series order is 0, 1, 2, 3, 3600, 3601, 3602, 4, 5, 7200, 7201, 7202 - let expected = "\ -+-------+---------+---------------------+ -| tag_0 | field_0 | ts | -+-------+---------+---------------------+ -| 0 | 0.0 | 1970-01-01T00:00:00 | -| 1 | 1.0 | 1970-01-01T00:00:01 | -| 2 | 2.0 | 1970-01-01T00:00:02 | -| 3 | 3.0 | 1970-01-01T00:00:03 | -| 7200 | 7200.0 | 1970-01-01T02:00:00 | -| 7201 | 7201.0 | 1970-01-01T02:00:01 | -| 7202 | 7202.0 | 1970-01-01T02:00:02 | -+-------+---------+---------------------+"; - check_result(expected); - - let expected = "\ -+-------+---------+---------------------+ -| tag_0 | field_0 | ts | -+-------+---------+---------------------+ -| 3600 | 3600.0 | 1970-01-01T01:00:00 | -| 3601 | 3601.0 | 1970-01-01T01:00:01 | -| 3602 | 3602.0 | 1970-01-01T01:00:02 | -+-------+---------+---------------------+"; - check_result(expected); - - let expected = "\ -+-------+---------+---------------------+ -| tag_0 | field_0 | ts | -+-------+---------+---------------------+ -| 4 | 4.0 | 1970-01-01T00:00:04 | -| 5 | 5.0 | 1970-01-01T00:00:05 | -+-------+---------+---------------------+"; - check_result(expected); + actual_rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.2.cmp(&b.2))); + actual_rows } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 84931b9f37..db7dfd1958 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -175,6 +175,7 @@ impl Batch { } /// Create an empty [`Batch`]. + #[allow(dead_code)] pub(crate) fn empty() -> Self { Self { primary_key: vec![], @@ -677,6 +678,7 @@ impl Batch { /// Checks the batch is monotonic by timestamps. #[cfg(debug_assertions)] + #[allow(dead_code)] pub(crate) fn check_monotonic(&self) -> Result<(), String> { use std::cmp::Ordering; if self.timestamps_native().is_none() { @@ -719,6 +721,7 @@ impl Batch { /// Returns Ok if the given batch is behind the current batch. #[cfg(debug_assertions)] + #[allow(dead_code)] pub(crate) fn check_next_batch(&self, other: &Batch) -> Result<(), String> { // Checks the primary key if self.primary_key() < other.primary_key() { @@ -798,6 +801,7 @@ impl Batch { /// A struct to check the batch is monotonic. #[cfg(debug_assertions)] #[derive(Default)] +#[allow(dead_code)] pub(crate) struct BatchChecker { last_batch: Option, start: Option, @@ -805,6 +809,7 @@ pub(crate) struct BatchChecker { } #[cfg(debug_assertions)] +#[allow(dead_code)] impl BatchChecker { /// Attaches the given start timestamp to the checker. pub(crate) fn with_start(mut self, start: Option) -> Self { diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index fd88749827..90d664a4bd 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -98,6 +98,7 @@ pub(crate) enum CompatBatch { impl CompatBatch { /// Returns the inner primary key batch adapter if this is a PrimaryKey format. + #[allow(dead_code)] pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> { match self { CompatBatch::PrimaryKey(batch) => Some(batch), @@ -980,7 +981,6 @@ mod tests { use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::value::ValueRef; - use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector}; use mito_codec::row_converter::{ DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec, }; @@ -992,7 +992,6 @@ mod tests { use crate::read::flat_projection::FlatProjectionMapper; use crate::sst::parquet::flat_format::FlatReadFormat; use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; - use crate::test_util::{VecBatchReader, check_reader_result}; /// Creates a new [RegionMetadata]. fn new_metadata( @@ -1053,44 +1052,6 @@ mod tests { buffer } - /// Creates a batch for specific primary `key`. - /// - /// `fields`: [(column_id of the field, is null)] - fn new_batch( - primary_key: &[u8], - fields: &[(ColumnId, bool)], - start_ts: i64, - num_rows: usize, - ) -> Batch { - let timestamps = Arc::new(TimestampMillisecondVector::from_values( - start_ts..start_ts + num_rows as i64, - )); - let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64)); - let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows])); - let field_columns = fields - .iter() - .map(|(id, is_null)| { - let data = if *is_null { - Arc::new(Int64Vector::from(vec![None; num_rows])) - } else { - Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows])) - }; - BatchColumn { - column_id: *id, - data, - } - }) - .collect(); - Batch::new( - primary_key.to_vec(), - timestamps, - sequences, - op_types, - field_columns, - ) - .unwrap() - } - #[test] fn test_invalid_pk_len() { let reader_meta = new_metadata( @@ -1213,311 +1174,6 @@ mod tests { assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none()) } - #[tokio::test] - async fn test_compat_reader() { - let reader_meta = Arc::new(new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1], - )); - let expect_meta = Arc::new(new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Field, ConcreteDataType::int64_datatype()), - (3, SemanticType::Tag, ConcreteDataType::string_datatype()), - (4, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1, 3], - )); - let mapper = ProjectionMapper::all(&expect_meta, false).unwrap(); - let k1 = encode_key(&[Some("a")]); - let k2 = encode_key(&[Some("b")]); - let source_reader = VecBatchReader::new(&[ - new_batch(&k1, &[(2, false)], 1000, 3), - new_batch(&k2, &[(2, false)], 1000, 3), - ]); - - let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap(); - let k1 = encode_key(&[Some("a"), None]); - let k2 = encode_key(&[Some("b"), None]); - check_reader_result( - &mut compat_reader, - &[ - new_batch(&k1, &[(2, false), (4, true)], 1000, 3), - new_batch(&k2, &[(2, false), (4, true)], 1000, 3), - ], - ) - .await; - } - - #[tokio::test] - async fn test_compat_reader_different_order() { - let reader_meta = Arc::new(new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1], - )); - let expect_meta = Arc::new(new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (3, SemanticType::Field, ConcreteDataType::int64_datatype()), - (2, SemanticType::Field, ConcreteDataType::int64_datatype()), - (4, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1], - )); - let mapper = ProjectionMapper::all(&expect_meta, false).unwrap(); - let k1 = encode_key(&[Some("a")]); - let k2 = encode_key(&[Some("b")]); - let source_reader = VecBatchReader::new(&[ - new_batch(&k1, &[(2, false)], 1000, 3), - new_batch(&k2, &[(2, false)], 1000, 3), - ]); - - let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap(); - check_reader_result( - &mut compat_reader, - &[ - new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3), - new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3), - ], - ) - .await; - } - - #[tokio::test] - async fn test_compat_reader_different_types() { - let actual_meta = Arc::new(new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1], - )); - let expect_meta = Arc::new(new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Field, ConcreteDataType::string_datatype()), - ], - &[1], - )); - let mapper = ProjectionMapper::all(&expect_meta, false).unwrap(); - let k1 = encode_key(&[Some("a")]); - let k2 = encode_key(&[Some("b")]); - let source_reader = VecBatchReader::new(&[ - new_batch(&k1, &[(2, false)], 1000, 3), - new_batch(&k2, &[(2, false)], 1000, 3), - ]); - - let fn_batch_cast = |batch: Batch| { - let mut new_fields = batch.fields.clone(); - new_fields[0].data = new_fields[0] - .data - .cast(&ConcreteDataType::string_datatype()) - .unwrap(); - - batch.with_fields(new_fields).unwrap() - }; - let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap(); - check_reader_result( - &mut compat_reader, - &[ - fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)), - fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)), - ], - ) - .await; - } - - #[tokio::test] - async fn test_compat_reader_projection() { - let reader_meta = Arc::new(new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1], - )); - let expect_meta = Arc::new(new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (3, SemanticType::Field, ConcreteDataType::int64_datatype()), - (2, SemanticType::Field, ConcreteDataType::int64_datatype()), - (4, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1], - )); - // tag_1, field_2, field_3 - let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter(), false).unwrap(); - let k1 = encode_key(&[Some("a")]); - let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]); - - let mut compat_reader = - CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap(); - check_reader_result( - &mut compat_reader, - &[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)], - ) - .await; - - // tag_1, field_4, field_3 - let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter(), false).unwrap(); - let k1 = encode_key(&[Some("a")]); - let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]); - - let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap(); - check_reader_result( - &mut compat_reader, - &[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)], - ) - .await; - } - - #[tokio::test] - async fn test_compat_reader_projection_read_superset() { - let reader_meta = Arc::new(new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1], - )); - let expect_meta = Arc::new(new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (3, SemanticType::Field, ConcreteDataType::int64_datatype()), - (2, SemanticType::Field, ConcreteDataType::int64_datatype()), - (4, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1], - )); - // Output: tag_1, field_3, field_2. Read also includes field_4. - let mapper = ProjectionMapper::new_with_read_columns( - &expect_meta, - [1, 3, 2].into_iter(), - false, - vec![1, 3, 2, 4], - ) - .unwrap(); - let k1 = encode_key(&[Some("a")]); - let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]); - - let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap(); - check_reader_result( - &mut compat_reader, - &[new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3)], - ) - .await; - } - - #[tokio::test] - async fn test_compat_reader_different_pk_encoding() { - let mut reader_meta = new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1], - ); - reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense; - let reader_meta = Arc::new(reader_meta); - let mut expect_meta = new_metadata( - &[ - ( - 0, - SemanticType::Timestamp, - ConcreteDataType::timestamp_millisecond_datatype(), - ), - (1, SemanticType::Tag, ConcreteDataType::string_datatype()), - (2, SemanticType::Field, ConcreteDataType::int64_datatype()), - (3, SemanticType::Tag, ConcreteDataType::string_datatype()), - (4, SemanticType::Field, ConcreteDataType::int64_datatype()), - ], - &[1, 3], - ); - expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse; - let expect_meta = Arc::new(expect_meta); - - let mapper = ProjectionMapper::all(&expect_meta, false).unwrap(); - let k1 = encode_key(&[Some("a")]); - let k2 = encode_key(&[Some("b")]); - let source_reader = VecBatchReader::new(&[ - new_batch(&k1, &[(2, false)], 1000, 3), - new_batch(&k2, &[(2, false)], 1000, 3), - ]); - - let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap(); - let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]); - let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]); - check_reader_result( - &mut compat_reader, - &[ - new_batch(&k1, &[(2, false), (4, true)], 1000, 3), - new_batch(&k2, &[(2, false), (4, true)], 1000, 3), - ], - ) - .await; - } - /// Creates a primary key array for flat format testing. fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef { let mut builder = BinaryDictionaryBuilder::::new(); diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index 1dc4102311..e087e12094 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -45,6 +45,7 @@ use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupRea /// /// This reader is different from the [MergeMode](crate::region::options::MergeMode) as /// it focus on time series (the same key). +#[allow(dead_code)] pub(crate) struct LastRowReader { /// Inner reader. reader: BoxedBatchReader, @@ -52,6 +53,7 @@ pub(crate) struct LastRowReader { selector: LastRowSelector, } +#[allow(dead_code)] impl LastRowReader { /// Creates a new `LastRowReader`. pub(crate) fn new(reader: BoxedBatchReader) -> Self { diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index b5b6904521..d22c87bcc2 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -52,51 +52,27 @@ impl ProjectionMapper { pub fn new( metadata: &RegionMetadataRef, projection: impl Iterator + Clone, - flat_format: bool, ) -> Result { - if flat_format { - Ok(ProjectionMapper::Flat(FlatProjectionMapper::new( - metadata, projection, - )?)) - } else { - Ok(ProjectionMapper::PrimaryKey( - PrimaryKeyProjectionMapper::new(metadata, projection)?, - )) - } + Ok(ProjectionMapper::Flat(FlatProjectionMapper::new( + metadata, projection, + )?)) } /// Returns a new mapper with output projection and explicit read columns. pub fn new_with_read_columns( metadata: &RegionMetadataRef, projection: impl Iterator, - flat_format: bool, read_column_ids: Vec, ) -> Result { let projection: Vec<_> = projection.collect(); - if flat_format { - Ok(ProjectionMapper::Flat( - FlatProjectionMapper::new_with_read_columns(metadata, projection, read_column_ids)?, - )) - } else { - Ok(ProjectionMapper::PrimaryKey( - PrimaryKeyProjectionMapper::new_with_read_columns( - metadata, - projection, - read_column_ids, - )?, - )) - } + Ok(ProjectionMapper::Flat( + FlatProjectionMapper::new_with_read_columns(metadata, projection, read_column_ids)?, + )) } /// Returns a new mapper without projection. - pub fn all(metadata: &RegionMetadataRef, flat_format: bool) -> Result { - if flat_format { - Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?)) - } else { - Ok(ProjectionMapper::PrimaryKey( - PrimaryKeyProjectionMapper::all(metadata)?, - )) - } + pub fn all(metadata: &RegionMetadataRef) -> Result { + Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?)) } /// Returns the metadata that created the mapper. @@ -159,6 +135,7 @@ impl ProjectionMapper { } /// Handles projection and converts a projected [Batch] to a projected [RecordBatch]. +#[allow(dead_code)] pub struct PrimaryKeyProjectionMapper { /// Metadata of the region. metadata: RegionMetadataRef, @@ -178,6 +155,7 @@ pub struct PrimaryKeyProjectionMapper { is_empty_projection: bool, } +#[allow(dead_code)] impl PrimaryKeyProjectionMapper { /// Returns a new mapper with projection. /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count. @@ -413,6 +391,7 @@ pub(crate) fn read_column_ids_from_projection( /// Index of a vector in a [Batch]. #[derive(Debug, Clone, Copy)] +#[allow(dead_code)] enum BatchIndex { /// Index in primary keys. Tag((usize, ColumnId)), @@ -480,53 +459,6 @@ mod tests { }; use super::*; - use crate::cache::CacheManager; - use crate::read::BatchBuilder; - - fn new_batch( - ts_start: i64, - tags: &[i64], - fields: &[(ColumnId, i64)], - num_rows: usize, - ) -> Batch { - let converter = DensePrimaryKeyCodec::with_fields( - (0..tags.len()) - .map(|idx| { - ( - idx as u32, - SortField::new(ConcreteDataType::int64_datatype()), - ) - }) - .collect(), - ); - let primary_key = converter - .encode(tags.iter().map(|v| ValueRef::Int64(*v))) - .unwrap(); - - let mut builder = BatchBuilder::new(primary_key); - builder - .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values( - (0..num_rows).map(|i| ts_start + i as i64 * 1000), - ))) - .unwrap() - .sequences_array(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64))) - .unwrap() - .op_types_array(Arc::new(UInt8Array::from_iter_values( - (0..num_rows).map(|_| OpType::Put as u8), - ))) - .unwrap(); - for (column_id, field) in fields { - builder - .push_field_array( - *column_id, - Arc::new(Int64Array::from_iter_values(std::iter::repeat_n( - *field, num_rows, - ))), - ) - .unwrap(); - } - builder.build().unwrap() - } fn print_record_batch(record_batch: RecordBatch) -> String { pretty::pretty_format_batches(&[record_batch.into_df_record_batch()]) @@ -534,166 +466,6 @@ mod tests { .to_string() } - #[test] - fn test_projection_mapper_all() { - let metadata = Arc::new( - TestRegionMetadataBuilder::default() - .num_tags(2) - .num_fields(2) - .build(), - ); - // Create the enum wrapper with default format (primary key) - let mapper = ProjectionMapper::all(&metadata, false).unwrap(); - assert_eq!([0, 1, 2, 3, 4], mapper.column_ids()); - assert_eq!( - [ - (3, ConcreteDataType::int64_datatype()), - (4, ConcreteDataType::int64_datatype()) - ], - mapper.as_primary_key().unwrap().batch_fields() - ); - - // With vector cache. - let cache = CacheManager::builder().vector_cache_size(1024).build(); - let cache = CacheStrategy::EnableAll(Arc::new(cache)); - let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3); - let record_batch = mapper - .as_primary_key() - .unwrap() - .convert(&batch, &cache) - .unwrap(); - let expect = "\ -+---------------------+----+----+----+----+ -| ts | k0 | k1 | v0 | v1 | -+---------------------+----+----+----+----+ -| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 | -| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 | -| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 | -+---------------------+----+----+----+----+"; - assert_eq!(expect, print_record_batch(record_batch)); - - assert!( - cache - .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1)) - .is_some() - ); - assert!( - cache - .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2)) - .is_some() - ); - assert!( - cache - .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3)) - .is_none() - ); - let record_batch = mapper - .as_primary_key() - .unwrap() - .convert(&batch, &cache) - .unwrap(); - assert_eq!(expect, print_record_batch(record_batch)); - } - - #[test] - fn test_projection_mapper_with_projection() { - let metadata = Arc::new( - TestRegionMetadataBuilder::default() - .num_tags(2) - .num_fields(2) - .build(), - ); - // Columns v1, k0 - let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), false).unwrap(); - assert_eq!([4, 1], mapper.column_ids()); - assert_eq!( - [(4, ConcreteDataType::int64_datatype())], - mapper.as_primary_key().unwrap().batch_fields() - ); - - let batch = new_batch(0, &[1, 2], &[(4, 4)], 3); - let cache = CacheManager::builder().vector_cache_size(1024).build(); - let cache = CacheStrategy::EnableAll(Arc::new(cache)); - let record_batch = mapper - .as_primary_key() - .unwrap() - .convert(&batch, &cache) - .unwrap(); - let expect = "\ -+----+----+ -| v1 | k0 | -+----+----+ -| 4 | 1 | -| 4 | 1 | -| 4 | 1 | -+----+----+"; - assert_eq!(expect, print_record_batch(record_batch)); - } - - #[test] - fn test_projection_mapper_read_superset() { - let metadata = Arc::new( - TestRegionMetadataBuilder::default() - .num_tags(2) - .num_fields(2) - .build(), - ); - // Output columns v1, k0. Read also includes v0. - let mapper = ProjectionMapper::new_with_read_columns( - &metadata, - [4, 1].into_iter(), - false, - vec![4, 1, 3], - ) - .unwrap(); - assert_eq!([4, 1, 3], mapper.column_ids()); - - let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3); - let cache = CacheManager::builder().vector_cache_size(1024).build(); - let cache = CacheStrategy::EnableAll(Arc::new(cache)); - let record_batch = mapper - .as_primary_key() - .unwrap() - .convert(&batch, &cache) - .unwrap(); - let expect = "\ -+----+----+ -| v1 | k0 | -+----+----+ -| 4 | 1 | -| 4 | 1 | -| 4 | 1 | -+----+----+"; - assert_eq!(expect, print_record_batch(record_batch)); - } - - #[test] - fn test_projection_mapper_empty_projection() { - let metadata = Arc::new( - TestRegionMetadataBuilder::default() - .num_tags(2) - .num_fields(2) - .build(), - ); - // Empty projection - let mapper = ProjectionMapper::new(&metadata, [].into_iter(), false).unwrap(); - assert_eq!([0], mapper.column_ids()); // Should still read the time index column - assert!(mapper.output_schema().is_empty()); - let pk_mapper = mapper.as_primary_key().unwrap(); - assert!(pk_mapper.batch_fields().is_empty()); - assert!(!pk_mapper.has_tags); - assert!(pk_mapper.batch_indices.is_empty()); - assert!(pk_mapper.is_empty_projection); - - let batch = new_batch(0, &[1, 2], &[], 3); - let cache = CacheManager::builder().vector_cache_size(1024).build(); - let cache = CacheStrategy::EnableAll(Arc::new(cache)); - let record_batch = pk_mapper.convert(&batch, &cache).unwrap(); - assert_eq!(3, record_batch.num_rows()); - assert_eq!(0, record_batch.num_columns()); - assert!(record_batch.schema.is_empty()); - } - fn new_flat_batch( ts_start: Option, idx_tags: &[(usize, i64)], @@ -809,7 +581,7 @@ mod tests { .build(), ); let cache = CacheStrategy::Disabled; - let mapper = ProjectionMapper::all(&metadata, true).unwrap(); + let mapper = ProjectionMapper::all(&metadata).unwrap(); assert_eq!([0, 1, 2, 3, 4], mapper.column_ids()); assert_eq!( [ @@ -845,7 +617,7 @@ mod tests { ); let cache = CacheStrategy::Disabled; // Columns v1, k0 - let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap(); + let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap(); assert_eq!([4, 1], mapper.column_ids()); assert_eq!( [ @@ -879,13 +651,9 @@ mod tests { ); let cache = CacheStrategy::Disabled; // Output columns v1, k0. Read also includes v0. - let mapper = ProjectionMapper::new_with_read_columns( - &metadata, - [4, 1].into_iter(), - true, - vec![4, 1, 3], - ) - .unwrap(); + let mapper = + ProjectionMapper::new_with_read_columns(&metadata, [4, 1].into_iter(), vec![4, 1, 3]) + .unwrap(); assert_eq!([4, 1, 3], mapper.column_ids()); let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3); @@ -911,7 +679,7 @@ mod tests { ); let cache = CacheStrategy::Disabled; // Empty projection - let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap(); + let mapper = ProjectionMapper::new(&metadata, [].into_iter()).unwrap(); assert_eq!([0], mapper.column_ids()); // Should still read the time index column assert!(mapper.output_schema().is_empty()); let flat_mapper = mapper.as_flat().unwrap(); diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 6766bf3f38..55ad504e6f 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -30,11 +30,13 @@ use crate::sst::file::FileTimeRange; use crate::sst::parquet::file_range::FileRangeContextRef; use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader}; +#[allow(dead_code)] pub enum Source { RowGroup(RowGroupReader), LastRow(RowGroupLastRowCachedReader), } +#[allow(dead_code)] impl Source { async fn next_batch(&mut self) -> Result> { match self { @@ -44,6 +46,7 @@ impl Source { } } +#[allow(dead_code)] pub struct PruneReader { /// Context for file ranges. context: FileRangeContextRef, @@ -53,6 +56,7 @@ pub struct PruneReader { skip_fields: bool, } +#[allow(dead_code)] impl PruneReader { pub(crate) fn new_with_row_group_reader( ctx: FileRangeContextRef, diff --git a/src/mito2/src/read/range_cache.rs b/src/mito2/src/read/range_cache.rs index 5fc8931691..2431a21f6a 100644 --- a/src/mito2/src/read/range_cache.rs +++ b/src/mito2/src/read/range_cache.rs @@ -515,7 +515,7 @@ mod tests { ) -> (StreamContext, PartitionRange) { let env = SchedulerEnv::new().await; let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); - let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap(); + let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap(); let file_id = FileId::random(); let file = sst_file_handle_with_file_id( @@ -527,8 +527,7 @@ mod tests { .with_predicate(predicate) .with_time_range(query_time_range) .with_files(vec![file]) - .with_cache(test_cache_strategy()) - .with_flat_format(true); + .with_cache(test_cache_strategy()); let range_meta = RangeMeta { time_range: partition_time_range, indices: smallvec![SourceIndex { diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index e7cae7e7b8..f56c807af3 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -63,7 +63,6 @@ use crate::read::unordered_scan::UnorderedScan; use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source}; use crate::region::options::MergeMode; use crate::region::version::VersionRef; -use crate::sst::FormatType; use crate::sst::file::FileHandle; use crate::sst::index::bloom_filter::applier::{ BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef, @@ -77,8 +76,6 @@ use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexAp use crate::sst::parquet::file_range::PreFilterMode; use crate::sst::parquet::reader::ReaderMetrics; -/// Parallel scan channel size for flat format. -const FLAT_SCAN_CHANNEL_SIZE: usize = 2; #[cfg(feature = "vector_index")] const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2; @@ -399,19 +396,12 @@ impl ScanRegion { self.request.distribution == Some(TimeSeriesDistribution::PerSeries) } - /// Returns true if the region use flat format. - fn use_flat_format(&self) -> bool { - self.request.force_flat_format - || self.version.options.sst_format.unwrap_or_default() == FormatType::Flat - } - /// Creates a scan input. #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))] - async fn scan_input(mut self) -> Result { + async fn scan_input(self) -> Result { let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new); let time_range = self.build_time_range_predicate(); let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?; - let flat_format = self.use_flat_format(); let read_column_ids = match &self.request.projection { Some(p) => self.build_read_column_ids(p, &predicate)?, @@ -429,10 +419,9 @@ impl ScanRegion { Some(p) => ProjectionMapper::new_with_read_columns( &self.version.metadata, p.iter().copied(), - flat_format, read_column_ids.clone(), )?, - None => ProjectionMapper::all(&self.version.metadata, flat_format)?, + None => ProjectionMapper::all(&self.version.metadata)?, }; let ssts = &self.version.ssts; @@ -496,14 +485,13 @@ impl ScanRegion { let region_id = self.region_id(); debug!( - "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}", + "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}", region_id, self.request, time_range, mem_range_builders.len(), files.len(), self.version.options.append_mode, - flat_format, ); let (non_field_filters, field_filters) = self.partition_by_field_filters(); @@ -530,11 +518,6 @@ impl ScanRegion { } }); - if flat_format { - // The batch is already large enough so we use a small channel size here. - self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE; - } - let input = ScanInput::new(self.access_layer, mapper) .with_time_range(Some(time_range)) .with_predicate(predicate) @@ -552,7 +535,9 @@ impl ScanRegion { .with_merge_mode(self.version.options.merge_mode()) .with_series_row_selector(self.request.series_row_selector) .with_distribution(self.request.distribution) - .with_flat_format(flat_format); + .with_explain_flat_format( + self.version.options.sst_format == Some(crate::sst::FormatType::Flat), + ); #[cfg(feature = "vector_index")] let input = input .with_vector_index_applier(vector_index_applier) @@ -855,8 +840,8 @@ pub struct ScanInput { pub(crate) series_row_selector: Option, /// Hint for the required distribution of the scanner. pub(crate) distribution: Option, - /// Whether to use flat format. - pub(crate) flat_format: bool, + /// Whether the region's configured SST format is flat. + explain_flat_format: bool, /// Whether this scan is for compaction. pub(crate) compaction: bool, #[cfg(feature = "enterprise")] @@ -893,7 +878,7 @@ impl ScanInput { merge_mode: MergeMode::default(), series_row_selector: None, distribution: None, - flat_format: false, + explain_flat_format: false, compaction: false, #[cfg(feature = "enterprise")] extension_ranges: Vec::new(), @@ -1049,6 +1034,13 @@ impl ScanInput { self } + /// Sets whether the region's configured SST format is flat for explain output. + #[must_use] + pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self { + self.explain_flat_format = explain_flat_format; + self + } + /// Sets the time series row selector. #[must_use] pub(crate) fn with_series_row_selector( @@ -1059,13 +1051,6 @@ impl ScanInput { self } - /// Sets whether to use flat format. - #[must_use] - pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self { - self.flat_format = flat_format; - self - } - /// Sets whether this scan is for compaction. #[must_use] pub(crate) fn with_compaction(mut self, compaction: bool) -> Self { @@ -1165,7 +1150,6 @@ impl ScanInput { }; let res = reader .expected_metadata(Some(self.mapper.metadata().clone())) - .flat_format(self.flat_format) .compaction(self.compaction) .pre_filter_mode(filter_mode) .decode_primary_key_values(decode_pk_values) @@ -1421,8 +1405,7 @@ fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode { /// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible /// for partition range caching. pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option { - let eligible = input.flat_format - && !input.compaction + let eligible = !input.compaction && !input.files.is_empty() && matches!(input.cache_strategy, CacheStrategy::EnableAll(_)); @@ -1709,8 +1692,7 @@ impl StreamContext { .entries(self.input.files.iter().map(|file| FileWrapper { file })) .finish()?; } - write!(f, ", \"flat_format\": {}", self.input.flat_format)?; - + write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?; #[cfg(feature = "enterprise")] self.format_extension_ranges(f)?; @@ -1881,9 +1863,7 @@ mod tests { use crate::cache::CacheManager; use crate::memtable::time_partition::TimePartitions; use crate::read::range_cache::ScanRequestFingerprintBuilder; - use crate::region::options::RegionOptions; use crate::region::version::VersionBuilder; - use crate::sst::FormatType; use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key}; use crate::test_util::scheduler_util::SchedulerEnv; @@ -1897,30 +1877,9 @@ mod tests { Arc::new(VersionBuilder::new(metadata, mutable).build()) } - fn new_version_with_sst_format( - metadata: RegionMetadataRef, - sst_format: Option, - ) -> VersionRef { - let mutable = Arc::new(TimePartitions::new( - metadata.clone(), - Arc::new(EmptyMemtableBuilder::default()), - 0, - None, - )); - let options = RegionOptions { - sst_format, - ..Default::default() - }; - Arc::new( - VersionBuilder::new(metadata, mutable) - .options(options) - .build(), - ) - } - async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec) -> ScanInput { let env = SchedulerEnv::new().await; - let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap(); + let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap(); let file = FileHandle::new( crate::sst::file::FileMeta::default(), @@ -1934,7 +1893,6 @@ mod tests { .range_result_cache_size(1024) .build(), ))) - .with_flat_format(true) .with_files(vec![file]) } @@ -2018,45 +1976,6 @@ mod tests { assert_eq!(vec![4, 1, 3], read_ids); } - #[tokio::test] - async fn test_use_flat_format_honors_request_override() { - let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); - let env = SchedulerEnv::new().await; - - let primary_key_version = - new_version_with_sst_format(metadata.clone(), Some(FormatType::PrimaryKey)); - let request = ScanRequest::default(); - let scan_region = ScanRegion::new( - primary_key_version.clone(), - env.access_layer.clone(), - request, - CacheStrategy::Disabled, - ); - assert!(!scan_region.use_flat_format()); - - let request = ScanRequest { - force_flat_format: true, - ..Default::default() - }; - let scan_region = ScanRegion::new( - primary_key_version, - env.access_layer.clone(), - request, - CacheStrategy::Disabled, - ); - assert!(scan_region.use_flat_format()); - - let flat_version = new_version_with_sst_format(metadata, Some(FormatType::Flat)); - let request = ScanRequest::default(); - let scan_region = ScanRegion::new( - flat_version, - env.access_layer.clone(), - request, - CacheStrategy::Disabled, - ); - assert!(scan_region.use_flat_format()); - } - /// Helper to create a timestamp millisecond literal. fn ts_lit(val: i64) -> datafusion_expr::Expr { lit(ScalarValue::TimestampMillisecond(Some(val), None)) @@ -2128,17 +2047,11 @@ mod tests { let disabled = ScanInput::new( SchedulerEnv::new().await.access_layer.clone(), - ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap(), + ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(), ) - .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap()) - .with_flat_format(true); + .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap()); assert!(build_scan_fingerprint(&disabled).is_none()); - let non_flat = new_scan_input(metadata.clone(), filters.clone()) - .await - .with_flat_format(false); - assert!(build_scan_fingerprint(&non_flat).is_none()); - let compaction = new_scan_input(metadata.clone(), filters.clone()) .await .with_compaction(true); diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index d065657242..eee32e7835 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -43,7 +43,7 @@ use crate::read::merge::{MergeMetrics, MergeMetricsReport}; use crate::read::pruner::PartitionPruner; use crate::read::range::{RangeMeta, RowGroupIndex}; use crate::read::scan_region::StreamContext; -use crate::read::{Batch, BoxedBatchStream, BoxedRecordBatchStream, ScannerMetrics, Source}; +use crate::read::{BoxedRecordBatchStream, ScannerMetrics}; use crate::sst::file::{FileTimeRange, RegionFileId}; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplyMetrics; use crate::sst::index::fulltext_index::applier::FulltextIndexApplyMetrics; @@ -1186,45 +1186,6 @@ pub(crate) struct SeriesDistributorMetrics { pub(crate) divider_cost: Duration, } -/// Scans memtable ranges at `index`. -#[tracing::instrument( - skip_all, - fields( - region_id = %stream_ctx.input.region_metadata().region_id, - file_or_mem_index = %index.index, - row_group_index = %index.row_group_index, - source = "mem" - ) -)] -pub(crate) fn scan_mem_ranges( - stream_ctx: Arc, - part_metrics: PartitionMetrics, - index: RowGroupIndex, - time_range: FileTimeRange, -) -> impl Stream> { - try_stream! { - let ranges = stream_ctx.input.build_mem_ranges(index); - part_metrics.inc_num_mem_ranges(ranges.len()); - for range in ranges { - let build_reader_start = Instant::now(); - let mem_scan_metrics = Some(MemScanMetrics::default()); - let iter = range.build_prune_iter(time_range, mem_scan_metrics.clone())?; - part_metrics.inc_build_reader_cost(build_reader_start.elapsed()); - - let mut source = Source::Iter(iter); - while let Some(batch) = source.next_batch().await? { - yield batch; - } - - // Report the memtable scan metrics to partition metrics - if let Some(ref metrics) = mem_scan_metrics { - let data = metrics.data(); - part_metrics.report_mem_scan_metrics(&data); - } - } - } -} - /// Scans memtable ranges at `index` using flat format that returns RecordBatch. #[tracing::instrument( skip_all, @@ -1342,59 +1303,6 @@ fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics { } } -/// Scans file ranges at `index`. -#[tracing::instrument( - skip_all, - fields( - region_id = %stream_ctx.input.region_metadata().region_id, - row_group_index = %index.index, - source = read_type - ) -)] -pub(crate) async fn scan_file_ranges( - stream_ctx: Arc, - part_metrics: PartitionMetrics, - index: RowGroupIndex, - read_type: &'static str, - partition_pruner: Arc, -) -> Result>> { - let mut reader_metrics = ReaderMetrics { - filter_metrics: new_filter_metrics(part_metrics.explain_verbose()), - ..Default::default() - }; - let ranges = partition_pruner - .build_file_ranges(index, &part_metrics, &mut reader_metrics) - .await?; - part_metrics.inc_num_file_ranges(ranges.len()); - part_metrics.merge_reader_metrics(&reader_metrics, None); - - // Creates initial per-file metrics with build_part_cost. - let init_per_file_metrics = if part_metrics.explain_verbose() { - let file = stream_ctx.input.file_from_index(index); - let file_id = file.file_id(); - - let mut map = HashMap::new(); - map.insert( - file_id, - FileScanMetrics { - build_part_cost: reader_metrics.build_cost, - ..Default::default() - }, - ); - Some(map) - } else { - None - }; - - Ok(build_file_range_scan_stream( - stream_ctx, - part_metrics, - read_type, - ranges, - init_per_file_metrics, - )) -} - /// Scans file ranges at `index` using flat reader that returns RecordBatch. #[tracing::instrument( skip_all, @@ -1448,70 +1356,6 @@ pub(crate) async fn scan_flat_file_ranges( )) } -/// Build the stream of scanning the input [`FileRange`]s. -#[tracing::instrument( - skip_all, - fields(read_type = read_type, range_count = ranges.len()) -)] -pub fn build_file_range_scan_stream( - stream_ctx: Arc, - part_metrics: PartitionMetrics, - read_type: &'static str, - ranges: SmallVec<[FileRange; 2]>, - mut per_file_metrics: Option>, -) -> impl Stream> { - try_stream! { - let fetch_metrics = if part_metrics.explain_verbose() { - Some(Arc::new(ParquetFetchMetrics::default())) - } else { - None - }; - let reader_metrics = &mut ReaderMetrics { - fetch_metrics: fetch_metrics.clone(), - ..Default::default() - }; - for range in ranges { - let build_reader_start = Instant::now(); - let Some(reader) = range.reader(stream_ctx.input.series_row_selector, fetch_metrics.as_deref()).await? else { - continue; - }; - let build_cost = build_reader_start.elapsed(); - part_metrics.inc_build_reader_cost(build_cost); - let compat_batch = range.compat_batch(); - let mut source = Source::PruneReader(reader); - while let Some(mut batch) = source.next_batch().await? { - if let Some(compact_batch) = compat_batch { - batch = compact_batch.as_primary_key().unwrap().compat_batch(batch)?; - } - yield batch; - } - if let Source::PruneReader(reader) = source { - let prune_metrics = reader.metrics(); - - // Update per-file metrics if tracking is enabled - if let Some(file_metrics_map) = per_file_metrics.as_mut() { - let file_id = range.file_handle().file_id(); - let file_metrics = file_metrics_map - .entry(file_id) - .or_insert_with(FileScanMetrics::default); - - file_metrics.num_ranges += 1; - file_metrics.num_rows += prune_metrics.num_rows; - file_metrics.build_reader_cost += build_cost; - file_metrics.scan_cost += prune_metrics.scan_cost; - } - - reader_metrics.merge_from(&prune_metrics); - } - } - - // Reports metrics. - reader_metrics.observe_rows(read_type); - reader_metrics.filter_metrics.observe(); - part_metrics.merge_reader_metrics(reader_metrics, per_file_metrics.as_ref()); - } -} - /// Build the stream of scanning the input [`FileRange`]s using flat reader that returns RecordBatch. #[tracing::instrument( skip_all, @@ -1591,47 +1435,6 @@ pub fn build_flat_file_range_scan_stream( } } -/// Build the stream of scanning the extension range denoted by the [`RowGroupIndex`]. -#[cfg(feature = "enterprise")] -pub(crate) async fn scan_extension_range( - context: Arc, - index: RowGroupIndex, - partition_metrics: PartitionMetrics, -) -> Result { - use snafu::ResultExt; - - let range = context.input.extension_range(index.index); - let reader = range.reader(context.as_ref()); - let stream = reader - .read(context, partition_metrics, index) - .await - .context(crate::error::ScanExternalRangeSnafu)?; - Ok(stream) -} - -pub(crate) async fn maybe_scan_other_ranges( - context: &Arc, - index: RowGroupIndex, - metrics: &PartitionMetrics, -) -> Result { - #[cfg(feature = "enterprise")] - { - scan_extension_range(context.clone(), index, metrics.clone()).await - } - - #[cfg(not(feature = "enterprise"))] - { - let _ = context; - let _ = index; - let _ = metrics; - - crate::error::UnexpectedSnafu { - reason: "no other ranges scannable", - } - .fail() - } -} - /// Build the stream of scanning the extension range in flat format denoted by the [`RowGroupIndex`]. #[cfg(feature = "enterprise")] pub(crate) async fn scan_flat_extension_range( diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index a1b3b8f350..49f173e7c9 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -27,7 +27,7 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use futures::{StreamExt, TryStreamExt}; -use snafu::{OptionExt, ensure}; +use snafu::ensure; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, @@ -35,24 +35,19 @@ use store_api::region_engine::{ use store_api::storage::TimeSeriesRowSelector; use tokio::sync::Semaphore; -use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu}; -use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; +use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu}; use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow}; use crate::read::flat_merge::FlatMergeReader; -use crate::read::last_row::{FlatLastRowReader, LastRowReader}; -use crate::read::merge::MergeReaderBuilder; +use crate::read::last_row::FlatLastRowReader; use crate::read::pruner::{PartitionPruner, Pruner}; use crate::read::range::RangeMeta; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{ - PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges, - scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges, - should_split_flat_batches_for_merge, + PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_flat_file_ranges, + scan_flat_mem_ranges, should_split_flat_batches_for_merge, }; use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream}; -use crate::read::{ - Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util, -}; +use crate::read::{BoxedRecordBatchStream, ScannerMetrics, scan_util}; use crate::region::options::MergeMode; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; @@ -121,7 +116,7 @@ impl SeqScan { let streams = (0..self.properties.partitions.len()) .map(|partition| { let metrics = self.new_partition_metrics(false, &metrics_set, partition); - self.scan_batch_in_partition(partition, metrics) + self.scan_flat_batch_in_partition(partition, metrics) }) .collect::>>()?; @@ -184,57 +179,6 @@ impl SeqScan { Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await } - /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel - /// if possible. - #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] - pub(crate) async fn build_reader_from_sources( - stream_ctx: &StreamContext, - mut sources: Vec, - semaphore: Option>, - part_metrics: Option<&PartitionMetrics>, - ) -> Result { - if let Some(semaphore) = semaphore.as_ref() { - // Read sources in parallel. - if sources.len() > 1 { - sources = stream_ctx - .input - .create_parallel_sources(sources, semaphore.clone())?; - } - } - - let mut builder = MergeReaderBuilder::from_sources(sources); - if let Some(metrics) = part_metrics { - builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter())); - } - let reader = builder.build().await?; - - let dedup = !stream_ctx.input.append_mode; - let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter()); - let reader = if dedup { - match stream_ctx.input.merge_mode { - MergeMode::LastRow => Box::new(DedupReader::new( - reader, - LastRow::new(stream_ctx.input.filter_deleted), - dedup_metrics_reporter, - )) as _, - MergeMode::LastNonNull => Box::new(DedupReader::new( - reader, - LastNonNull::new(stream_ctx.input.filter_deleted), - dedup_metrics_reporter, - )) as _, - } - } else { - Box::new(reader) as _ - }; - - let reader = match &stream_ctx.input.series_row_selector { - Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _, - None => reader, - }; - - Ok(reader) - } - /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel /// if possible. #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] @@ -318,13 +262,7 @@ impl SeqScan { let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition); let input = &self.stream_ctx.input; - let batch_stream = if input.flat_format { - // Use flat scan for bulk memtables - self.scan_flat_batch_in_partition(partition, metrics.clone())? - } else { - // Use regular batch scan for normal memtables - self.scan_batch_in_partition(partition, metrics.clone())? - }; + let batch_stream = self.scan_flat_batch_in_partition(partition, metrics.clone())?; let record_batch_stream = ConvertBatchStream::new( batch_stream, input.mapper.clone(), @@ -338,125 +276,6 @@ impl SeqScan { ))) } - #[tracing::instrument( - skip_all, - fields( - region_id = %self.stream_ctx.input.mapper.metadata().region_id, - partition = partition - ) - )] - fn scan_batch_in_partition( - &self, - partition: usize, - part_metrics: PartitionMetrics, - ) -> Result { - ensure!( - partition < self.properties.partitions.len(), - PartitionOutOfRangeSnafu { - given: partition, - all: self.properties.partitions.len(), - } - ); - - if self.properties.partitions[partition].is_empty() { - return Ok(Box::pin(futures::stream::empty())); - } - - let stream_ctx = self.stream_ctx.clone(); - let semaphore = self.new_semaphore(); - let partition_ranges = self.properties.partitions[partition].clone(); - let compaction = self.stream_ctx.input.compaction; - let distinguish_range = self.properties.distinguish_partition_range; - let file_scan_semaphore = if compaction { None } else { semaphore.clone() }; - let pruner = self.pruner.clone(); - // Initializes ref counts for the pruner. - // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream, - // then the ref count won't be decremented. - // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache. - pruner.add_partition_ranges(&partition_ranges); - let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges)); - - let stream = try_stream! { - part_metrics.on_first_poll(); - // Start fetch time before building sources so scan cost contains - // build part cost. - let mut fetch_start = Instant::now(); - - let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu { - reason: "Unexpected format", - })?; - // Scans each part. - for part_range in partition_ranges { - let mut sources = Vec::new(); - build_sources( - &stream_ctx, - &part_range, - compaction, - &part_metrics, - partition_pruner.clone(), - &mut sources, - file_scan_semaphore.clone(), - ).await?; - - let mut reader = - Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics)) - .await?; - #[cfg(debug_assertions)] - let mut checker = crate::read::BatchChecker::default() - .with_start(Some(part_range.start)) - .with_end(Some(part_range.end)); - - let mut metrics = ScannerMetrics { - scan_cost: fetch_start.elapsed(), - ..Default::default() - }; - fetch_start = Instant::now(); - - while let Some(batch) = reader.next_batch().await? { - metrics.scan_cost += fetch_start.elapsed(); - metrics.num_batches += 1; - metrics.num_rows += batch.num_rows(); - - debug_assert!(!batch.is_empty()); - if batch.is_empty() { - fetch_start = Instant::now(); - continue; - } - - #[cfg(debug_assertions)] - checker.ensure_part_range_batch( - "SeqScan", - _mapper.metadata().region_id, - partition, - part_range, - &batch, - ); - - let yield_start = Instant::now(); - yield ScanBatch::Normal(batch); - metrics.yield_cost += yield_start.elapsed(); - - fetch_start = Instant::now(); - } - - // Yields an empty part to indicate this range is terminated. - // The query engine can use this to optimize some queries. - if distinguish_range { - let yield_start = Instant::now(); - yield ScanBatch::Normal(Batch::empty()); - metrics.yield_cost += yield_start.elapsed(); - } - - metrics.scan_cost += fetch_start.elapsed(); - fetch_start = Instant::now(); - part_metrics.merge_metrics(&metrics); - } - - part_metrics.on_finish(); - }; - Ok(Box::pin(stream)) - } - #[tracing::instrument( skip_all, fields( @@ -709,108 +528,6 @@ impl fmt::Debug for SeqScan { } } -/// Builds sources for the partition range and push them to the `sources` vector. -pub(crate) async fn build_sources( - stream_ctx: &Arc, - part_range: &PartitionRange, - compaction: bool, - part_metrics: &PartitionMetrics, - partition_pruner: Arc, - sources: &mut Vec, - semaphore: Option>, -) -> Result<()> { - // Gets range meta. - let range_meta = &stream_ctx.ranges[part_range.identifier]; - #[cfg(debug_assertions)] - if compaction { - // Compaction expects input sources are not been split. - debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len()); - for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() { - // It should scan all row groups. - debug_assert_eq!( - -1, row_group_idx.row_group_index, - "Expect {} range scan all row groups, given: {}", - i, row_group_idx.row_group_index, - ); - } - } - - let read_type = if compaction { - "compaction" - } else { - "seq_scan_files" - }; - let num_indices = range_meta.row_group_indices.len(); - if num_indices == 0 { - return Ok(()); - } - - sources.reserve(num_indices); - let mut ordered_sources = Vec::with_capacity(num_indices); - ordered_sources.resize_with(num_indices, || None); - let mut file_scan_tasks = Vec::new(); - - for (position, index) in range_meta.row_group_indices.iter().enumerate() { - if stream_ctx.is_mem_range_index(*index) { - let stream = scan_mem_ranges( - stream_ctx.clone(), - part_metrics.clone(), - *index, - range_meta.time_range, - ); - ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _)); - } else if stream_ctx.is_file_range_index(*index) { - if let Some(semaphore_ref) = semaphore.as_ref() { - // run in parallel, controlled by semaphore - let stream_ctx = stream_ctx.clone(); - let part_metrics = part_metrics.clone(); - let partition_pruner = partition_pruner.clone(); - let semaphore = Arc::clone(semaphore_ref); - let row_group_index = *index; - file_scan_tasks.push(async move { - let _permit = semaphore.acquire().await.unwrap(); - let stream = scan_file_ranges( - stream_ctx, - part_metrics, - row_group_index, - read_type, - partition_pruner, - ) - .await?; - Ok((position, Source::Stream(Box::pin(stream) as _))) - }); - } else { - // no semaphore, run sequentially - let stream = scan_file_ranges( - stream_ctx.clone(), - part_metrics.clone(), - *index, - read_type, - partition_pruner.clone(), - ) - .await?; - ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _)); - } - } else { - let stream = - scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?; - ordered_sources[position] = Some(Source::Stream(stream)); - } - } - - if !file_scan_tasks.is_empty() { - let results = futures::future::try_join_all(file_scan_tasks).await?; - for (position, source) in results { - ordered_sources[position] = Some(source); - } - } - - for source in ordered_sources.into_iter().flatten() { - sources.push(source); - } - Ok(()) -} - /// Builds flat sources for the partition range and push them to the `sources` vector. pub(crate) async fn build_flat_sources( stream_ctx: &Arc, diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index 5109120d92..d2e37af66a 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -30,7 +30,7 @@ use datatypes::arrow::array::BinaryArray; use datatypes::arrow::record_batch::RecordBatch; use datatypes::schema::SchemaRef; use futures::{StreamExt, TryStreamExt}; -use smallvec::{SmallVec, smallvec}; +use smallvec::SmallVec; use snafu::{OptionExt, ResultExt, ensure}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ @@ -44,12 +44,12 @@ use crate::error::{ Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu, ScanSeriesSnafu, TooManyFilesToReadSnafu, }; +use crate::read::ScannerMetrics; use crate::read::pruner::{PartitionPruner, Pruner}; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics}; -use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources}; +use crate::read::seq_scan::{SeqScan, build_flat_sources}; use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream}; -use crate::read::{Batch, ScannerMetrics}; use crate::sst::parquet::flat_format::primary_key_column_index; use crate::sst::parquet::format::PrimaryKeyArray; @@ -443,11 +443,7 @@ impl SeriesDistributor { fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id) )] async fn execute(&mut self) { - let result = if self.stream_ctx.input.flat_format { - self.scan_partitions_flat().await - } else { - self.scan_partitions().await - }; + let result = self.scan_partitions_flat().await; if let Err(e) = result { self.senders.send_error(e).await; @@ -559,151 +555,11 @@ impl SeriesDistributor { Ok(()) } - - /// Scans all parts. - #[tracing::instrument( - skip_all, - fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id) - )] - async fn scan_partitions(&mut self) -> Result<()> { - // Initialize reference counts for all partition ranges. - for partition_ranges in &self.partitions { - self.pruner.add_partition_ranges(partition_ranges); - } - - // Create PartitionPruner covering all partitions - let all_partition_ranges: Vec<_> = self.partitions.iter().flatten().cloned().collect(); - let partition_pruner = Arc::new(PartitionPruner::new( - self.pruner.clone(), - &all_partition_ranges, - )); - - let part_metrics = new_partition_metrics( - &self.stream_ctx, - self.explain_verbose, - &self.metrics_set, - self.partitions.len(), - &self.metrics_list, - ); - part_metrics.on_first_poll(); - // Start fetch time before building sources so scan cost contains - // build part cost. - let mut fetch_start = Instant::now(); - - // Scans all parts. - let mut sources = Vec::with_capacity(self.partitions.len()); - for partition in &self.partitions { - sources.reserve(partition.len()); - for part_range in partition { - build_sources( - &self.stream_ctx, - part_range, - false, - &part_metrics, - partition_pruner.clone(), - &mut sources, - self.semaphore.clone(), - ) - .await?; - } - } - - // Builds a reader that merge sources from all parts. - let mut reader = SeqScan::build_reader_from_sources( - &self.stream_ctx, - sources, - self.semaphore.clone(), - Some(&part_metrics), - ) - .await?; - let mut metrics = SeriesDistributorMetrics::default(); - - let mut current_series = PrimaryKeySeriesBatch::default(); - while let Some(batch) = reader.next_batch().await? { - metrics.scan_cost += fetch_start.elapsed(); - metrics.num_batches += 1; - metrics.num_rows += batch.num_rows(); - - debug_assert!(!batch.is_empty()); - if batch.is_empty() { - fetch_start = Instant::now(); - continue; - } - - let Some(last_key) = current_series.current_key() else { - current_series.push(batch); - fetch_start = Instant::now(); - continue; - }; - - if last_key == batch.primary_key() { - current_series.push(batch); - fetch_start = Instant::now(); - continue; - } - - // We find a new series, send the current one. - let to_send = - std::mem::replace(&mut current_series, PrimaryKeySeriesBatch::single(batch)); - let yield_start = Instant::now(); - self.senders - .send_batch(SeriesBatch::PrimaryKey(to_send)) - .await?; - metrics.yield_cost += yield_start.elapsed(); - fetch_start = Instant::now(); - } - - if !current_series.is_empty() { - let yield_start = Instant::now(); - self.senders - .send_batch(SeriesBatch::PrimaryKey(current_series)) - .await?; - metrics.yield_cost += yield_start.elapsed(); - } - - metrics.scan_cost += fetch_start.elapsed(); - metrics.num_series_send_timeout = self.senders.num_timeout; - metrics.num_series_send_full = self.senders.num_full; - part_metrics.set_distributor_metrics(&metrics); - - part_metrics.on_finish(); - - Ok(()) - } -} - -/// Batches of the same series in primary key format. -#[derive(Default, Debug)] -pub struct PrimaryKeySeriesBatch { - pub batches: SmallVec<[Batch; 4]>, -} - -impl PrimaryKeySeriesBatch { - /// Creates a new [PrimaryKeySeriesBatch] from a single [Batch]. - fn single(batch: Batch) -> Self { - Self { - batches: smallvec![batch], - } - } - - fn current_key(&self) -> Option<&[u8]> { - self.batches.first().map(|batch| batch.primary_key()) - } - - fn push(&mut self, batch: Batch) { - self.batches.push(batch); - } - - /// Returns true if there is no batch. - fn is_empty(&self) -> bool { - self.batches.is_empty() - } } /// Batches of the same series. #[derive(Debug)] pub enum SeriesBatch { - PrimaryKey(PrimaryKeySeriesBatch), Flat(FlatSeriesBatch), } @@ -711,7 +567,6 @@ impl SeriesBatch { /// Returns the number of batches. pub fn num_batches(&self) -> usize { match self { - SeriesBatch::PrimaryKey(primary_key_batch) => primary_key_batch.batches.len(), SeriesBatch::Flat(flat_batch) => flat_batch.batches.len(), } } @@ -719,9 +574,6 @@ impl SeriesBatch { /// Returns the total number of rows across all batches. pub fn num_rows(&self) -> usize { match self { - SeriesBatch::PrimaryKey(primary_key_batch) => { - primary_key_batch.batches.iter().map(|x| x.num_rows()).sum() - } SeriesBatch::Flat(flat_batch) => flat_batch.batches.iter().map(|x| x.num_rows()).sum(), } } diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index 80002147ea..c8547fdf0c 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -27,14 +27,12 @@ use snafu::ResultExt; use crate::cache::CacheStrategy; use crate::error::Result; -use crate::read::Batch; use crate::read::projection::ProjectionMapper; use crate::read::scan_util::PartitionMetrics; use crate::read::series_scan::SeriesBatch; /// All kinds of [`Batch`]es to produce in scanner. pub enum ScanBatch { - Normal(Batch), Series(SeriesBatch), RecordBatch(DfRecordBatch), } @@ -45,6 +43,7 @@ pub type ScanBatchStream = BoxStream<'static, Result>; pub(crate) struct ConvertBatchStream { inner: ScanBatchStream, projection_mapper: Arc, + #[allow(dead_code)] cache_strategy: CacheStrategy, partition_metrics: PartitionMetrics, pending: VecDeque, @@ -68,41 +67,19 @@ impl ConvertBatchStream { fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result { match batch { - ScanBatch::Normal(batch) => { - // Safety: Only primary key format returns this batch. - let mapper = self.projection_mapper.as_primary_key().unwrap(); - - if batch.is_empty() { - Ok(mapper.empty_record_batch()) - } else { - mapper.convert(&batch, &self.cache_strategy) - } - } ScanBatch::Series(series) => { debug_assert!( self.pending.is_empty(), "ConvertBatchStream should not convert a new SeriesBatch when pending batches exist" ); - match series { - SeriesBatch::PrimaryKey(primary_key_batch) => { - // Safety: Only primary key format returns this batch. - let mapper = self.projection_mapper.as_primary_key().unwrap(); + let SeriesBatch::Flat(flat_batch) = series; + // Safety: Only flat format returns this batch. + let mapper = self.projection_mapper.as_flat().unwrap(); - for batch in primary_key_batch.batches { - self.pending - .push_back(mapper.convert(&batch, &self.cache_strategy)?); - } - } - SeriesBatch::Flat(flat_batch) => { - // Safety: Only flat format returns this batch. - let mapper = self.projection_mapper.as_flat().unwrap(); - - for batch in flat_batch.batches { - self.pending - .push_back(mapper.convert(&batch, &self.cache_strategy)?); - } - } + for batch in flat_batch.batches { + self.pending + .push_back(mapper.convert(&batch, &self.cache_strategy)?); } let output_schema = self.projection_mapper.output_schema(); diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 2d557e8871..9763d14cd2 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -37,11 +37,10 @@ use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::read::pruner::{PartitionPruner, Pruner}; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{ - PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges, - scan_flat_mem_ranges, scan_mem_ranges, + PartitionMetrics, PartitionMetricsList, scan_flat_file_ranges, scan_flat_mem_ranges, }; use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream}; -use crate::read::{Batch, ScannerMetrics, scan_util}; +use crate::read::{ScannerMetrics, scan_util}; /// Scans a region without providing any output ordering guarantee. /// @@ -103,59 +102,6 @@ impl UnorderedScan { Ok(stream) } - /// Scans a [PartitionRange] by its `identifier` and returns a stream. - #[tracing::instrument( - skip_all, - fields( - region_id = %stream_ctx.input.region_metadata().region_id, - part_range_id = part_range_id - ) - )] - fn scan_partition_range( - stream_ctx: Arc, - part_range_id: usize, - part_metrics: PartitionMetrics, - partition_pruner: Arc, - ) -> impl Stream> { - try_stream! { - // Gets range meta. - let range_meta = &stream_ctx.ranges[part_range_id]; - for index in &range_meta.row_group_indices { - if stream_ctx.is_mem_range_index(*index) { - let stream = scan_mem_ranges( - stream_ctx.clone(), - part_metrics.clone(), - *index, - range_meta.time_range, - ); - for await batch in stream { - yield batch?; - } - } else if stream_ctx.is_file_range_index(*index) { - let stream = scan_file_ranges( - stream_ctx.clone(), - part_metrics.clone(), - *index, - "unordered_scan_files", - partition_pruner.clone(), - ).await?; - for await batch in stream { - yield batch?; - } - } else { - let stream = scan_util::maybe_scan_other_ranges( - &stream_ctx, - *index, - &part_metrics, - ).await?; - for await batch in stream { - yield batch?; - } - } - } - } - } - /// Scans a [PartitionRange] by its `identifier` and returns a flat stream of RecordBatch. #[tracing::instrument( skip_all, @@ -216,7 +162,7 @@ impl UnorderedScan { let streams = (0..self.properties.partitions.len()) .map(|partition| { let metrics = self.partition_metrics(false, partition, &metrics_set); - self.scan_batch_in_partition(partition, metrics) + self.scan_flat_batch_in_partition(partition, metrics) }) .collect::>>()?; @@ -265,13 +211,7 @@ impl UnorderedScan { let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set); let input = &self.stream_ctx.input; - let batch_stream = if input.flat_format { - // Use flat scan for bulk memtables - self.scan_flat_batch_in_partition(partition, metrics.clone())? - } else { - // Use regular batch scan for normal memtables - self.scan_batch_in_partition(partition, metrics.clone())? - }; + let batch_stream = self.scan_flat_batch_in_partition(partition, metrics.clone())?; let record_batch_stream = ConvertBatchStream::new( batch_stream, @@ -286,100 +226,6 @@ impl UnorderedScan { ))) } - #[tracing::instrument( - skip_all, - fields( - region_id = %self.stream_ctx.input.mapper.metadata().region_id, - partition = partition - ) - )] - fn scan_batch_in_partition( - &self, - partition: usize, - part_metrics: PartitionMetrics, - ) -> Result { - ensure!( - partition < self.properties.partitions.len(), - PartitionOutOfRangeSnafu { - given: partition, - all: self.properties.partitions.len(), - } - ); - - let stream_ctx = self.stream_ctx.clone(); - let part_ranges = self.properties.partitions[partition].clone(); - let distinguish_range = self.properties.distinguish_partition_range; - let pruner = self.pruner.clone(); - // Initializes ref counts for the pruner. - // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream, - // then the ref count won't be decremented. - // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache. - pruner.add_partition_ranges(&part_ranges); - let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges)); - - let stream = try_stream! { - part_metrics.on_first_poll(); - - // Scans each part. - for part_range in part_ranges { - let mut metrics = ScannerMetrics::default(); - let mut fetch_start = Instant::now(); - let _mapper = &stream_ctx.input.mapper; - #[cfg(debug_assertions)] - let mut checker = crate::read::BatchChecker::default() - .with_start(Some(part_range.start)) - .with_end(Some(part_range.end)); - - let stream = Self::scan_partition_range( - stream_ctx.clone(), - part_range.identifier, - part_metrics.clone(), - partition_pruner.clone(), - ); - for await batch in stream { - let batch = batch?; - metrics.scan_cost += fetch_start.elapsed(); - metrics.num_batches += 1; - metrics.num_rows += batch.num_rows(); - - debug_assert!(!batch.is_empty()); - if batch.is_empty() { - continue; - } - - #[cfg(debug_assertions)] - checker.ensure_part_range_batch( - "UnorderedScan", - _mapper.metadata().region_id, - partition, - part_range, - &batch, - ); - - let yield_start = Instant::now(); - yield ScanBatch::Normal(batch); - metrics.yield_cost += yield_start.elapsed(); - - fetch_start = Instant::now(); - } - - // Yields an empty part to indicate this range is terminated. - // The query engine can use this to optimize some queries. - if distinguish_range { - let yield_start = Instant::now(); - yield ScanBatch::Normal(Batch::empty()); - metrics.yield_cost += yield_start.elapsed(); - } - - metrics.scan_cost += fetch_start.elapsed(); - part_metrics.merge_metrics(&metrics); - } - - part_metrics.on_finish(); - }; - Ok(Box::pin(stream)) - } - #[tracing::instrument( skip_all, fields( diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 4a3466a29c..2ca83ca8cf 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -895,7 +895,6 @@ mod tests { handle.clone(), object_store.clone(), ) - .flat_format(true) .predicate(Some(Predicate::new(preds))) .inverted_index_appliers([inverted_index_applier.clone(), None]) .bloom_filter_index_appliers([bloom_filter_applier.clone(), None]) @@ -960,7 +959,6 @@ mod tests { handle.clone(), object_store.clone(), ) - .flat_format(true) .predicate(Some(Predicate::new(preds))) .inverted_index_appliers([inverted_index_applier.clone(), None]) .bloom_filter_index_appliers([bloom_filter_applier.clone(), None]) @@ -1015,7 +1013,6 @@ mod tests { handle.clone(), object_store.clone(), ) - .flat_format(true) .predicate(Some(Predicate::new(preds))) .inverted_index_appliers([inverted_index_applier.clone(), None]) .bloom_filter_index_appliers([bloom_filter_applier.clone(), None]) @@ -1549,7 +1546,6 @@ mod tests { handle.clone(), object_store.clone(), ) - .flat_format(true) .predicate(Some(Predicate::new(preds))) .inverted_index_appliers([inverted_index_applier.clone(), None]) .cache(CacheStrategy::EnableAll(cache.clone())); @@ -1652,7 +1648,6 @@ mod tests { handle.clone(), object_store.clone(), ) - .flat_format(true) .predicate(Some(Predicate::new(preds))) .bloom_filter_index_appliers([None, bloom_filter_applier.clone()]) .cache(CacheStrategy::EnableAll(cache.clone())); @@ -1712,7 +1707,6 @@ mod tests { let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store) - .flat_format(true) .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))]))); let mut metrics = ReaderMetrics::default(); @@ -1774,7 +1768,6 @@ mod tests { let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), PathType::Bare, handle, object_store) - .flat_format(true) .predicate(Some(Predicate::new(vec![col("tag_0").eq(lit("a"))]))); let mut metrics = ReaderMetrics::default(); @@ -1884,7 +1877,6 @@ mod tests { handle.clone(), object_store.clone(), ) - .flat_format(true) .predicate(Some(Predicate::new(preds))) .inverted_index_appliers([inverted_index_applier.clone(), None]) .cache(CacheStrategy::EnableAll(cache.clone())); @@ -1991,7 +1983,6 @@ mod tests { handle.clone(), object_store.clone(), ) - .flat_format(true) .predicate(Some(Predicate::new(preds))) .bloom_filter_index_appliers([None, bloom_filter_applier.clone()]) .cache(CacheStrategy::EnableAll(cache.clone())); @@ -2255,7 +2246,6 @@ mod tests { handle.clone(), object_store.clone(), ) - .flat_format(true) .predicate(Some(Predicate::new(preds))) .fulltext_index_appliers([None, fulltext_applier.clone()]) .cache(CacheStrategy::EnableAll(cache.clone())); @@ -2304,7 +2294,6 @@ mod tests { handle.clone(), object_store.clone(), ) - .flat_format(true) .predicate(Some(Predicate::new(preds))) .fulltext_index_appliers([None, fulltext_applier.clone()]) .cache(CacheStrategy::EnableAll(cache.clone())); diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 8b4a61acb7..bf86e4a764 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -175,6 +175,7 @@ impl FileRange { } /// Returns a reader to read the [FileRange]. + #[allow(dead_code)] pub(crate) async fn reader( &self, selector: Option, diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 8832cd4a16..73ca7748e9 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -141,8 +141,6 @@ pub struct ParquetReaderBuilder { /// This is usually the latest metadata of the region. The reader use /// it get the correct column id of a column by name. expected_metadata: Option, - /// Whether to use flat format for reading. - flat_format: bool, /// Whether this reader is for compaction. compaction: bool, /// Mode to pre-filter columns. @@ -176,7 +174,6 @@ impl ParquetReaderBuilder { #[cfg(feature = "vector_index")] vector_index_k: None, expected_metadata: None, - flat_format: false, compaction: false, pre_filter_mode: PreFilterMode::All, decode_primary_key_values: false, @@ -257,13 +254,6 @@ impl ParquetReaderBuilder { self } - /// Sets the flat format flag. - #[must_use] - pub fn flat_format(mut self, flat_format: bool) -> Self { - self.flat_format = flat_format; - self - } - /// Sets the compaction flag. #[must_use] pub fn compaction(mut self, compaction: bool) -> Self { @@ -304,8 +294,7 @@ impl ParquetReaderBuilder { pub async fn build(&self) -> Result> { let mut metrics = ReaderMetrics::default(); - let Some((context, selection)) = self.build_reader_input_inner(&mut metrics, true).await? - else { + let Some((context, selection)) = self.build_reader_input_inner(&mut metrics).await? else { return Ok(None); }; ParquetReader::new(Arc::new(context), selection) @@ -327,14 +316,12 @@ impl ParquetReaderBuilder { &self, metrics: &mut ReaderMetrics, ) -> Result> { - self.build_reader_input_inner(metrics, self.flat_format) - .await + self.build_reader_input_inner(metrics).await } async fn build_reader_input_inner( &self, metrics: &mut ReaderMetrics, - flat_format: bool, ) -> Result> { let start = Instant::now(); @@ -376,7 +363,6 @@ impl ParquetReaderBuilder { // before compat handling. let compaction_projection_mapper = if self.compaction && !is_same_region_partition - && flat_format && region_meta.primary_key_encoding == PrimaryKeyEncoding::Sparse { Some(CompactionProjectionMapper::try_new(®ion_meta)?) @@ -388,7 +374,7 @@ impl ParquetReaderBuilder { ReadFormat::new( region_meta.clone(), Some(column_ids), - flat_format, + true, // Always reads as flat format. Some(parquet_meta.file_metadata().schema_descr().num_columns()), &file_path, skip_auto_convert, @@ -404,7 +390,7 @@ impl ParquetReaderBuilder { ReadFormat::new( region_meta.clone(), Some(&column_ids), - flat_format, + true, // Always reads as flat format. Some(parquet_meta.file_metadata().schema_descr().num_columns()), &file_path, skip_auto_convert, @@ -2060,6 +2046,7 @@ impl RowGroupReaderContext for FileRangeContextRef { /// [RowGroupReader] that reads from [FileRange]. pub(crate) type RowGroupReader = RowGroupReaderBase; +#[allow(dead_code)] impl RowGroupReader { /// Creates a new reader from file range. pub(crate) fn new( @@ -2084,6 +2071,7 @@ pub(crate) struct RowGroupReaderBase { override_sequence: Option, } +#[allow(dead_code)] impl RowGroupReaderBase where T: RowGroupReaderContext, diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index d072ec1b39..db3fb0388a 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -128,8 +128,6 @@ pub struct ScanRequest { /// Optional hint for KNN vector search. When set, the scan should use /// vector index to find the k nearest neighbors. pub vector_search: Option, - /// Whether to force reading region data in flat format. - pub force_flat_format: bool, } impl Display for ScanRequest { @@ -220,14 +218,6 @@ impl Display for ScanRequest { vector_search.metric )?; } - if self.force_flat_format { - write!( - f, - "{}force_flat_format: {}", - delimiter.as_str(), - self.force_flat_format - )?; - } write!(f, " }}") } } @@ -282,15 +272,6 @@ mod tests { "ScanRequest { projection: [1, 2], limit: 10 }" ); - let request = ScanRequest { - force_flat_format: true, - ..Default::default() - }; - assert_eq!( - request.to_string(), - "ScanRequest { force_flat_format: true }" - ); - let request = ScanRequest { snapshot_on_scan: true, ..Default::default()