diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 1d7cf7b6d7..a78bf079b0 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -108,6 +108,11 @@ name = "memtable_bench" harness = false required-features = ["test"] +[[bench]] +name = "bench_cache_stream" +harness = false +required-features = ["test"] + [[bench]] name = "bench_filter_time_partition" harness = false diff --git a/src/mito2/benches/bench_cache_stream.rs b/src/mito2/benches/bench_cache_stream.rs new file mode 100644 index 0000000000..f2314f2ccb --- /dev/null +++ b/src/mito2/benches/bench_cache_stream.rs @@ -0,0 +1,126 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Benchmarks for `cache_flat_range_stream` overhead. +//! +//! Compares consuming batches from a plain stream vs through the caching wrapper +//! that clones batches for the range cache. +//! +//! Run with: +//! ```sh +//! cargo bench -p mito2 --features test --bench bench_cache_stream +//! ``` + +use std::collections::VecDeque; +use std::sync::Arc; + +use criterion::{Criterion, criterion_group, criterion_main}; +use futures::TryStreamExt; +use mito_codec::row_converter::DensePrimaryKeyCodec; +use mito2::memtable::bulk::context::BulkIterContext; +use mito2::memtable::bulk::part::{BulkPartConverter, BulkPartEncoder}; +use mito2::memtable::bulk::part_reader::EncodedBulkPartIter; +use mito2::read::range_cache::bench_cache_flat_range_stream; +use mito2::sst::parquet::DEFAULT_ROW_GROUP_SIZE; +use mito2::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; +use mito2::test_util::bench_util::{CpuDataGenerator, cpu_metadata}; + +fn cache_flat_range_stream_bench(c: &mut Criterion) { + let metadata = Arc::new(cpu_metadata()); + let region_id = metadata.region_id; + let start_sec = 1710043200; + // 2000 hosts × 51 steps = 102,000 rows ≈ DEFAULT_ROW_GROUP_SIZE + let num_hosts = 2000; + let end_sec = start_sec + 510; + let generator = CpuDataGenerator::new(metadata.clone(), num_hosts, start_sec, end_sec); + + // Build a BulkPart from all the generated data + let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata)); + + let mut converter = BulkPartConverter::new( + &metadata, + schema, + DEFAULT_ROW_GROUP_SIZE, + codec, + true, // store_pk_columns + ); + for kvs in generator.iter() { + converter.append_key_values(&kvs).unwrap(); + } + let bulk_part = converter.convert().unwrap(); + + // Encode to parquet + let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE).unwrap(); + let encoded_part = encoder.encode_part(&bulk_part).unwrap().unwrap(); + + // Decode all record batches + let num_row_groups = encoded_part.metadata().parquet_metadata.num_row_groups(); + let context = Arc::new( + BulkIterContext::new( + metadata.clone(), + None, // No projection + None, // No predicate + false, + ) + .unwrap(), + ); + let row_groups: VecDeque = (0..num_row_groups).collect(); + + let rt = tokio::runtime::Runtime::new().unwrap(); + + let mut group = c.benchmark_group("cache_flat_range_stream"); + group.sample_size(10); + + group.bench_function("baseline_iter_stream", |b| { + b.iter(|| { + rt.block_on(async { + let iter = EncodedBulkPartIter::try_new( + &encoded_part, + context.clone(), + row_groups.clone(), + None, + None, + ) + .unwrap(); + let stream: mito2::read::BoxedRecordBatchStream = + Box::pin(futures::stream::iter(iter)); + let mut stream = stream; + while let Some(_batch) = stream.try_next().await.unwrap() {} + }); + }); + }); + + group.bench_function("cache_flat_range_stream", |b| { + b.iter(|| { + rt.block_on(async { + let iter = EncodedBulkPartIter::try_new( + &encoded_part, + context.clone(), + row_groups.clone(), + None, + None, + ) + .unwrap(); + let stream: mito2::read::BoxedRecordBatchStream = + Box::pin(futures::stream::iter(iter)); + let mut stream = bench_cache_flat_range_stream(stream, 64 * 1024 * 1024, region_id); + while let Some(_batch) = stream.try_next().await.unwrap() {} + }); + }); + }); +} + +criterion_group!(benches, cache_flat_range_stream_bench); +criterion_main!(benches); diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs index df991f6f92..8336625e3c 100644 --- a/src/mito2/benches/memtable_bench.rs +++ b/src/mito2/benches/memtable_bench.rs @@ -12,15 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Benchmarks for memtable operations: writes, full scans, filtered scans, +//! bulk part conversion, record batch iteration with filters, and flat merge. +//! +//! Run with: +//! ```sh +//! cargo bench -p mito2 --features test --bench memtable_bench +//! ``` + use std::sync::Arc; -use api::v1::value::ValueData; -use api::v1::{Row, Rows, SemanticType}; use criterion::{Criterion, criterion_group, criterion_main}; -use datafusion_common::Column; -use datafusion_expr::{Expr, lit}; -use datatypes::data_type::ConcreteDataType; -use datatypes::schema::ColumnSchema; use mito_codec::row_converter::DensePrimaryKeyCodec; use mito2::memtable::bulk::context::BulkIterContext; use mito2::memtable::bulk::part::BulkPartConverter; @@ -28,20 +30,13 @@ use mito2::memtable::bulk::part_reader::BulkPartBatchIter; use mito2::memtable::bulk::{BulkMemtable, BulkMemtableConfig}; use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable}; use mito2::memtable::time_series::TimeSeriesMemtable; -use mito2::memtable::{IterBuilder, KeyValues, Memtable, RangesOptions}; +use mito2::memtable::{IterBuilder, Memtable, RangesOptions}; use mito2::read::flat_merge::FlatMergeIterator; use mito2::read::scan_region::PredicateGroup; use mito2::region::options::MergeMode; use mito2::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema}; -use mito2::test_util::memtable_util::{self, region_metadata_to_row_schema}; -use rand::Rng; -use rand::rngs::ThreadRng; -use rand::seq::IndexedRandom; -use store_api::metadata::{ - ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, -}; -use store_api::storage::RegionId; -use table::predicate::Predicate; +use mito2::test_util::bench_util::{CpuDataGenerator, cpu_metadata}; +use mito2::test_util::memtable_util; /// Writes rows. fn write_rows(c: &mut Criterion) { @@ -216,224 +211,6 @@ fn filter_1_host(c: &mut Criterion) { }); } -struct Host { - hostname: String, - region: String, - datacenter: String, - rack: String, - os: String, - arch: String, - team: String, - service: String, - service_version: String, - service_environment: String, -} - -impl Host { - fn random_with_id(id: usize) -> Host { - let mut rng = rand::rng(); - let region = format!("ap-southeast-{}", rng.random_range(0..10)); - let datacenter = format!( - "{}{}", - region, - ['a', 'b', 'c', 'd', 'e'].choose(&mut rng).unwrap() - ); - Host { - hostname: format!("host_{id}"), - region, - datacenter, - rack: rng.random_range(0..100).to_string(), - os: "Ubuntu16.04LTS".to_string(), - arch: "x86".to_string(), - team: "CHI".to_string(), - service: rng.random_range(0..100).to_string(), - service_version: rng.random_range(0..10).to_string(), - service_environment: "test".to_string(), - } - } - - fn fill_values(&self, values: &mut Vec) { - let tags = [ - api::v1::Value { - value_data: Some(ValueData::StringValue(self.hostname.clone())), - }, - api::v1::Value { - value_data: Some(ValueData::StringValue(self.region.clone())), - }, - api::v1::Value { - value_data: Some(ValueData::StringValue(self.datacenter.clone())), - }, - api::v1::Value { - value_data: Some(ValueData::StringValue(self.rack.clone())), - }, - api::v1::Value { - value_data: Some(ValueData::StringValue(self.os.clone())), - }, - api::v1::Value { - value_data: Some(ValueData::StringValue(self.arch.clone())), - }, - api::v1::Value { - value_data: Some(ValueData::StringValue(self.team.clone())), - }, - api::v1::Value { - value_data: Some(ValueData::StringValue(self.service.clone())), - }, - api::v1::Value { - value_data: Some(ValueData::StringValue(self.service_version.clone())), - }, - api::v1::Value { - value_data: Some(ValueData::StringValue(self.service_environment.clone())), - }, - ]; - for tag in tags { - values.push(tag); - } - } -} - -struct CpuDataGenerator { - metadata: RegionMetadataRef, - column_schemas: Vec, - hosts: Vec, - start_sec: i64, - end_sec: i64, -} - -impl CpuDataGenerator { - fn new(metadata: RegionMetadataRef, num_hosts: usize, start_sec: i64, end_sec: i64) -> Self { - let column_schemas = region_metadata_to_row_schema(&metadata); - Self { - metadata, - column_schemas, - hosts: Self::generate_hosts(num_hosts), - start_sec, - end_sec, - } - } - - fn iter(&self) -> impl Iterator + '_ { - // point per 10s. - (self.start_sec..self.end_sec) - .step_by(10) - .enumerate() - .map(|(seq, ts)| self.build_key_values(seq, ts)) - } - - fn build_key_values(&self, seq: usize, current_sec: i64) -> KeyValues { - let rows = self - .hosts - .iter() - .map(|host| { - let mut rng = rand::rng(); - let mut values = Vec::with_capacity(21); - values.push(api::v1::Value { - value_data: Some(ValueData::TimestampMillisecondValue(current_sec * 1000)), - }); - host.fill_values(&mut values); - for _ in 0..10 { - values.push(api::v1::Value { - value_data: Some(ValueData::F64Value(Self::random_f64(&mut rng))), - }); - } - Row { values } - }) - .collect(); - let mutation = api::v1::Mutation { - op_type: api::v1::OpType::Put as i32, - sequence: seq as u64, - rows: Some(Rows { - schema: self.column_schemas.clone(), - rows, - }), - write_hint: None, - }; - - KeyValues::new(&self.metadata, mutation).unwrap() - } - - fn random_host_filter(&self) -> Predicate { - let host = self.random_hostname(); - let expr = Expr::Column(Column::from_name("hostname")).eq(lit(host)); - Predicate::new(vec![expr]) - } - - fn random_host_filter_exprs(&self) -> Vec { - let host = self.random_hostname(); - vec![Expr::Column(Column::from_name("hostname")).eq(lit(host))] - } - - fn random_hostname(&self) -> String { - let mut rng = rand::rng(); - self.hosts.choose(&mut rng).unwrap().hostname.clone() - } - - fn random_f64(rng: &mut ThreadRng) -> f64 { - let base: u32 = rng.random_range(30..95); - base as f64 - } - - fn generate_hosts(num_hosts: usize) -> Vec { - (0..num_hosts).map(Host::random_with_id).collect() - } -} - -/// Creates a metadata for TSBS cpu-like table. -fn cpu_metadata() -> RegionMetadata { - let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); - builder.push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - semantic_type: SemanticType::Timestamp, - column_id: 0, - }); - let mut column_id = 1; - let tags = [ - "hostname", - "region", - "datacenter", - "rack", - "os", - "arch", - "team", - "service", - "service_version", - "service_environment", - ]; - for tag in tags { - builder.push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new(tag, ConcreteDataType::string_datatype(), true), - semantic_type: SemanticType::Tag, - column_id, - }); - column_id += 1; - } - let fields = [ - "usage_user", - "usage_system", - "usage_idle", - "usage_nice", - "usage_iowait", - "usage_irq", - "usage_softirq", - "usage_steal", - "usage_guest", - "usage_guest_nice", - ]; - for field in fields { - builder.push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new(field, ConcreteDataType::float64_datatype(), true), - semantic_type: SemanticType::Field, - column_id, - }); - column_id += 1; - } - builder.primary_key(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); - builder.build().unwrap() -} - fn bulk_part_converter(c: &mut Criterion) { let metadata = Arc::new(cpu_metadata()); let start_sec = 1710043200; diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index c9a8b99166..35db74eee6 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -350,7 +350,7 @@ impl CacheStrategy { /// Calls [CacheManager::get_range_result()]. /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. - #[cfg_attr(not(test), allow(dead_code))] + #[allow(dead_code)] pub(crate) fn get_range_result( &self, key: &RangeScanCacheKey, @@ -363,7 +363,6 @@ impl CacheStrategy { /// Calls [CacheManager::put_range_result()]. /// It does nothing if the strategy isn't [CacheStrategy::EnableAll]. - #[cfg_attr(not(test), allow(dead_code))] pub(crate) fn put_range_result( &self, key: RangeScanCacheKey, @@ -476,7 +475,6 @@ pub struct CacheManager { /// Cache for time series selectors. selector_result_cache: Option, /// Cache for range scan outputs in flat format. - #[cfg_attr(not(test), allow(dead_code))] range_result_cache: Option, /// Cache for index result. index_result_cache: Option, @@ -713,7 +711,7 @@ impl CacheManager { } /// Gets cached result for range scan. - #[cfg_attr(not(test), allow(dead_code))] + #[allow(dead_code)] pub(crate) fn get_range_result( &self, key: &RangeScanCacheKey, @@ -723,8 +721,7 @@ impl CacheManager { .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE)) } - /// Puts range scan result into the cache. - #[cfg_attr(not(test), allow(dead_code))] + /// Puts range scan result into cache. pub(crate) fn put_range_result( &self, key: RangeScanCacheKey, @@ -949,7 +946,7 @@ impl CacheManagerBuilder { Cache::builder() .max_capacity(self.range_result_cache_size) .weigher(range_result_cache_weight) - .eviction_listener(|k, v, cause| { + .eviction_listener(move |k, v, cause| { let size = range_result_cache_weight(&k, &v); CACHE_BYTES .with_label_values(&[RANGE_RESULT_TYPE]) @@ -1361,7 +1358,7 @@ mod tests { } .build(), }; - let value = Arc::new(RangeScanCacheValue::new(Vec::new())); + let value = Arc::new(RangeScanCacheValue::new(Vec::new(), 0)); assert!(cache.get_range_result(&key).is_none()); cache.put_range_result(key.clone(), value.clone()); diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 602f5508ba..0eee067ab6 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -116,6 +116,8 @@ pub struct MitoConfig { pub page_cache_size: ReadableSize, /// Cache size for time series selector (e.g. `last_value()`). Setting it to 0 to disable the cache. pub selector_result_cache_size: ReadableSize, + /// Cache size for flat range scan results. Setting it to 0 to disable the cache. + pub range_result_cache_size: ReadableSize, /// Whether to enable the write cache. pub enable_write_cache: bool, /// File system path for write cache dir's root, defaults to `{data_home}`. @@ -200,6 +202,7 @@ impl Default for MitoConfig { vector_cache_size: ReadableSize::mb(512), page_cache_size: ReadableSize::mb(512), selector_result_cache_size: ReadableSize::mb(512), + range_result_cache_size: ReadableSize::mb(512), enable_write_cache: false, write_cache_path: String::new(), write_cache_size: ReadableSize::gb(5), @@ -336,6 +339,7 @@ impl MitoConfig { self.vector_cache_size = mem_cache_size; self.page_cache_size = page_cache_size; self.selector_result_cache_size = mem_cache_size; + self.range_result_cache_size = mem_cache_size; self.index.adjust_buffer_and_cache_size(sys_memory); } diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 71e49776c0..bf345c038e 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -967,7 +967,7 @@ impl EncodedBulkPart { Self { data, metadata } } - pub(crate) fn metadata(&self) -> &BulkPartMeta { + pub fn metadata(&self) -> &BulkPartMeta { &self.metadata } @@ -977,7 +977,7 @@ impl EncodedBulkPart { } /// Returns the encoded data. - pub(crate) fn data(&self) -> &Bytes { + pub fn data(&self) -> &Bytes { &self.data } @@ -1121,10 +1121,7 @@ pub struct BulkPartEncoder { } impl BulkPartEncoder { - pub(crate) fn new( - metadata: RegionMetadataRef, - row_group_size: usize, - ) -> Result { + pub fn new(metadata: RegionMetadataRef, row_group_size: usize) -> Result { // TODO(yingwen): Skip arrow schema if needed. let json = metadata.to_json().context(InvalidMetadataSnafu)?; let key_value_meta = @@ -1216,7 +1213,7 @@ impl BulkPartEncoder { } /// Encodes bulk part to a [EncodedBulkPart], returns the encoded data. - fn encode_part(&self, part: &BulkPart) -> Result> { + pub fn encode_part(&self, part: &BulkPart) -> Result> { if part.batch.num_rows() == 0 { return Ok(None); } diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index 1e9d955321..904aae8c90 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -50,7 +50,7 @@ pub struct EncodedBulkPartIter { impl EncodedBulkPartIter { /// Creates a new [BulkPartIter]. - pub(crate) fn try_new( + pub fn try_new( encoded_part: &EncodedBulkPart, context: BulkIterContextRef, mut row_groups_to_read: VecDeque, diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 240a99c247..84931b9f37 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -27,6 +27,9 @@ pub mod projection; pub(crate) mod prune; pub(crate) mod pruner; pub mod range; +#[cfg(feature = "test")] +pub mod range_cache; +#[cfg(not(feature = "test"))] pub(crate) mod range_cache; pub mod scan_region; pub mod scan_util; diff --git a/src/mito2/src/read/range_cache.rs b/src/mito2/src/read/range_cache.rs index 5b90e68bae..5fc8931691 100644 --- a/src/mito2/src/read/range_cache.rs +++ b/src/mito2/src/read/range_cache.rs @@ -17,12 +17,23 @@ use std::mem; use std::sync::Arc; +use async_stream::try_stream; +use common_time::range::TimestampRange; +use datatypes::arrow::array::{Array, AsArray, DictionaryArray}; +use datatypes::arrow::datatypes::UInt32Type; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::ConcreteDataType; +use futures::TryStreamExt; +use store_api::region_engine::PartitionRange; use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector}; -use crate::memtable::record_batch_estimated_size; +use crate::cache::CacheStrategy; +use crate::read::BoxedRecordBatchStream; +use crate::read::scan_region::StreamContext; +use crate::read::scan_util::PartitionMetrics; use crate::region::options::MergeMode; +use crate::sst::file::FileTimeRange; +use crate::sst::parquet::flat_format::primary_key_column_index; /// Fingerprint of the scan request fields that affect partition range cache reuse. /// @@ -124,7 +135,6 @@ impl ScanRequestFingerprint { .unwrap_or(&[]) } - #[cfg(test)] pub(crate) fn without_time_filters(&self) -> Self { Self { inner: Arc::clone(&self.inner), @@ -163,7 +173,7 @@ impl ScanRequestFingerprint { #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub(crate) struct RangeScanCacheKey { pub(crate) region_id: RegionId, - /// Sorted (file_id, row_group_index) pairs that uniquely identify the covered data. + /// Sorted (file_id, row_group_index) pairs that uniquely identify the data this range covers. pub(crate) row_groups: Vec<(FileId, i64)>, pub(crate) scan: ScanRequestFingerprint, } @@ -179,30 +189,458 @@ impl RangeScanCacheKey { /// Cached result for one range scan. pub(crate) struct RangeScanCacheValue { pub(crate) batches: Vec, + /// Precomputed size of all batches, accounting for shared dictionary values. + estimated_batches_size: usize, } impl RangeScanCacheValue { - #[cfg_attr(not(test), allow(dead_code))] - pub(crate) fn new(batches: Vec) -> Self { - Self { batches } + pub(crate) fn new(batches: Vec, estimated_batches_size: usize) -> Self { + Self { + batches, + estimated_batches_size, + } } pub(crate) fn estimated_size(&self) -> usize { mem::size_of::() + self.batches.capacity() * mem::size_of::() - + self - .batches - .iter() - .map(record_batch_estimated_size) - .sum::() + + self.estimated_batches_size } } +/// Row groups and whether all sources are file-only for a partition range. +#[allow(dead_code)] +pub(crate) struct PartitionRangeRowGroups { + /// Sorted (file_id, row_group_index) pairs. + pub(crate) row_groups: Vec<(FileId, i64)>, + pub(crate) only_file_sources: bool, +} + +/// Collects (file_id, row_group_index) pairs from a partition range's row group indices. +#[allow(dead_code)] +pub(crate) fn collect_partition_range_row_groups( + stream_ctx: &StreamContext, + part_range: &PartitionRange, +) -> PartitionRangeRowGroups { + let range_meta = &stream_ctx.ranges[part_range.identifier]; + let mut row_groups = Vec::new(); + let mut only_file_sources = true; + + for index in &range_meta.row_group_indices { + if stream_ctx.is_file_range_index(*index) { + let file_id = stream_ctx.input.file_from_index(*index).file_id().file_id(); + row_groups.push((file_id, index.row_group_index)); + } else { + only_file_sources = false; + } + } + + row_groups.sort_unstable_by(|a, b| a.0.as_bytes().cmp(b.0.as_bytes()).then(a.1.cmp(&b.1))); + + PartitionRangeRowGroups { + row_groups, + only_file_sources, + } +} + +/// Builds a cache key for the given partition range if it is eligible for caching. +#[allow(dead_code)] +pub(crate) fn build_range_cache_key( + stream_ctx: &StreamContext, + part_range: &PartitionRange, +) -> Option { + let fingerprint = stream_ctx.scan_fingerprint.as_ref()?; + + // Dyn filters can change at runtime, so we can't cache when they're present. + let has_dyn_filters = stream_ctx + .input + .predicate_group() + .predicate_without_region() + .is_some_and(|p| !p.dyn_filters().is_empty()); + if has_dyn_filters { + return None; + } + + let rg = collect_partition_range_row_groups(stream_ctx, part_range); + if !rg.only_file_sources || rg.row_groups.is_empty() { + return None; + } + + let range_meta = &stream_ctx.ranges[part_range.identifier]; + let scan = if query_time_range_covers_partition_range( + stream_ctx.input.time_range.as_ref(), + range_meta.time_range, + ) { + fingerprint.without_time_filters() + } else { + fingerprint.clone() + }; + + Some(RangeScanCacheKey { + region_id: stream_ctx.input.region_metadata().region_id, + row_groups: rg.row_groups, + scan, + }) +} + +#[allow(dead_code)] +fn query_time_range_covers_partition_range( + query_time_range: Option<&TimestampRange>, + partition_time_range: FileTimeRange, +) -> bool { + let Some(query_time_range) = query_time_range else { + return true; + }; + + let (part_start, part_end) = partition_time_range; + query_time_range.contains(&part_start) && query_time_range.contains(&part_end) +} + +/// Returns a stream that replays cached record batches. +#[allow(dead_code)] +pub(crate) fn cached_flat_range_stream(value: Arc) -> BoxedRecordBatchStream { + Box::pin(futures::stream::iter( + value.batches.clone().into_iter().map(Ok), + )) +} + +/// Returns true if two primary key dictionary arrays share the same underlying +/// values buffers by pointer comparison. +/// +/// The primary key column is always `DictionaryArray` with `Binary` values. +fn pk_values_ptr_eq(a: &DictionaryArray, b: &DictionaryArray) -> bool { + let a = a.values().as_binary::(); + let b = b.values().as_binary::(); + let values_eq = a.values().ptr_eq(b.values()) && a.offsets().ptr_eq(b.offsets()); + match (a.nulls(), b.nulls()) { + (Some(a), Some(b)) => values_eq && a.inner().ptr_eq(b.inner()), + (None, None) => values_eq, + _ => false, + } +} + +/// Buffers record batches for caching, tracking memory size while deduplicating +/// shared dictionary values across batches. +/// +/// Uses the primary key column as a proxy to detect dictionary sharing: if the PK +/// column's dictionary values are pointer-equal across batches, we assume all +/// dictionary columns share their values and deduct the total dictionary values size. +struct CacheBatchBuffer { + batches: Vec, + /// Running total of batch memory. + total_size: usize, + /// The first batch's PK dictionary array, for pointer comparison. + /// `None` if no dictionary PK column exists or no batch has been added yet. + first_pk_dict: Option>, + /// Sum of `get_array_memory_size()` of all dictionary value arrays from the first batch. + total_dict_values_size: usize, + /// Whether the PK dictionary is still shared across all batches seen so far. + shared: bool, +} + +impl CacheBatchBuffer { + fn new() -> Self { + Self { + batches: Vec::new(), + total_size: 0, + first_pk_dict: None, + total_dict_values_size: 0, + shared: true, + } + } + + fn push(&mut self, batch: RecordBatch) { + if self.batches.is_empty() { + self.init_first_batch(&batch); + } else { + self.add_subsequent_batch(&batch); + } + self.batches.push(batch); + } + + fn init_first_batch(&mut self, batch: &RecordBatch) { + self.total_size += batch.get_array_memory_size(); + + let pk_col_idx = primary_key_column_index(batch.num_columns()); + let mut total_dict_values_size = 0; + for col_idx in 0..batch.num_columns() { + let col = batch.column(col_idx); + if let Some(dict) = col.as_any().downcast_ref::>() { + total_dict_values_size += dict.values().get_array_memory_size(); + if col_idx == pk_col_idx { + self.first_pk_dict = Some(dict.clone()); + } + } + } + self.total_dict_values_size = total_dict_values_size; + } + + fn add_subsequent_batch(&mut self, batch: &RecordBatch) { + let batch_size = batch.get_array_memory_size(); + + if self.shared + && let Some(first_pk_dict) = &self.first_pk_dict + { + let pk_col_idx = primary_key_column_index(batch.num_columns()); + let col = batch.column(pk_col_idx); + if let Some(dict) = col.as_any().downcast_ref::>() + && pk_values_ptr_eq(first_pk_dict, dict) + { + // PK dict is shared, deduct all dict values sizes. + self.total_size += batch_size - self.total_dict_values_size; + return; + } + // Dictionary diverged. + self.shared = false; + } + + self.total_size += batch_size; + } + + fn estimated_batches_size(&self) -> usize { + self.total_size + } + + fn into_batches(self) -> Vec { + self.batches + } +} + +/// Wraps a stream to cache its output for future range cache hits. +#[allow(dead_code)] +pub(crate) fn cache_flat_range_stream( + mut stream: BoxedRecordBatchStream, + cache_strategy: CacheStrategy, + key: RangeScanCacheKey, + part_metrics: PartitionMetrics, +) -> BoxedRecordBatchStream { + Box::pin(try_stream! { + let mut buffer = CacheBatchBuffer::new(); + while let Some(batch) = stream.try_next().await? { + buffer.push(batch.clone()); + yield batch; + } + + let estimated_size = buffer.estimated_batches_size(); + let batches = buffer.into_batches(); + let value = Arc::new(RangeScanCacheValue::new(batches, estimated_size)); + part_metrics.inc_range_cache_size(key.estimated_size() + value.estimated_size()); + cache_strategy.put_range_result(key, value); + }) +} + +/// Creates a `cache_flat_range_stream` with dummy internals for benchmarking. +/// +/// This avoids exposing `RangeScanCacheKey`, `ScanRequestFingerprint`, and +/// `PartitionMetrics` publicly. +#[cfg(feature = "test")] +pub fn bench_cache_flat_range_stream( + stream: BoxedRecordBatchStream, + cache_size_bytes: u64, + region_id: RegionId, +) -> BoxedRecordBatchStream { + use std::time::Instant; + + use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; + + use crate::region::options::MergeMode; + + let cache_manager = Arc::new( + crate::cache::CacheManager::builder() + .range_result_cache_size(cache_size_bytes) + .build(), + ); + let cache_strategy = CacheStrategy::EnableAll(cache_manager); + + let fingerprint = ScanRequestFingerprintBuilder { + read_column_ids: vec![], + read_column_types: vec![], + filters: vec![], + time_filters: vec![], + series_row_selector: None, + append_mode: false, + filter_deleted: false, + merge_mode: MergeMode::LastRow, + partition_expr_version: 0, + } + .build(); + + let key = RangeScanCacheKey { + region_id, + row_groups: vec![], + scan: fingerprint, + }; + + let metrics_set = ExecutionPlanMetricsSet::new(); + let part_metrics = + PartitionMetrics::new(region_id, 0, "bench", Instant::now(), false, &metrics_set); + + cache_flat_range_stream(stream, cache_strategy, key, part_metrics) +} + #[cfg(test)] mod tests { - use store_api::storage::TimeSeriesRowSelector; + use std::sync::Arc; + use std::time::Instant; + + use common_time::Timestamp; + use common_time::range::TimestampRange; + use common_time::timestamp::TimeUnit; + use datafusion_common::ScalarValue; + use datafusion_expr::{Expr, col, lit}; + use smallvec::smallvec; + use store_api::storage::FileId; use super::*; + use crate::cache::CacheManager; + use crate::read::projection::ProjectionMapper; + use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex}; + use crate::read::scan_region::{PredicateGroup, ScanInput}; + use crate::test_util::memtable_util::metadata_with_primary_key; + use crate::test_util::scheduler_util::SchedulerEnv; + use crate::test_util::sst_util::sst_file_handle_with_file_id; + + fn test_cache_strategy() -> CacheStrategy { + CacheStrategy::EnableAll(Arc::new( + CacheManager::builder() + .range_result_cache_size(1024) + .build(), + )) + } + + async fn new_stream_context( + filters: Vec, + query_time_range: Option, + partition_time_range: FileTimeRange, + ) -> (StreamContext, PartitionRange) { + let env = SchedulerEnv::new().await; + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap(); + let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap(); + let file_id = FileId::random(); + let file = sst_file_handle_with_file_id( + file_id, + partition_time_range.0.value(), + partition_time_range.1.value(), + ); + let input = ScanInput::new(env.access_layer.clone(), mapper) + .with_predicate(predicate) + .with_time_range(query_time_range) + .with_files(vec![file]) + .with_cache(test_cache_strategy()) + .with_flat_format(true); + let range_meta = RangeMeta { + time_range: partition_time_range, + indices: smallvec![SourceIndex { + index: 0, + num_row_groups: 1, + }], + row_group_indices: smallvec![RowGroupIndex { + index: 0, + row_group_index: 0, + }], + num_rows: 10, + }; + let partition_range = range_meta.new_partition_range(0); + let scan_fingerprint = crate::read::scan_region::build_scan_fingerprint(&input); + let stream_ctx = StreamContext { + input, + ranges: vec![range_meta], + scan_fingerprint, + query_start: Instant::now(), + }; + + (stream_ctx, partition_range) + } + + /// Helper to create a timestamp millisecond literal. + fn ts_lit(val: i64) -> Expr { + lit(ScalarValue::TimestampMillisecond(Some(val), None)) + } + + #[tokio::test] + async fn strips_time_only_filters_when_query_covers_partition_range() { + let (stream_ctx, part_range) = new_stream_context( + vec![ + col("ts").gt_eq(ts_lit(1000)), + col("ts").lt(ts_lit(2001)), + col("ts").is_not_null(), + col("k0").eq(lit("foo")), + ], + TimestampRange::with_unit(1000, 2002, TimeUnit::Millisecond), + ( + Timestamp::new_millisecond(1000), + Timestamp::new_millisecond(2000), + ), + ) + .await; + + let key = build_range_cache_key(&stream_ctx, &part_range).unwrap(); + + // Range-reducible time filters should be cleared when query covers partition range. + assert!(key.scan.time_filters().is_empty()); + // Non-range time predicates stay in filters. + let mut expected_filters = [ + col("k0").eq(lit("foo")).to_string(), + col("ts").is_not_null().to_string(), + ]; + expected_filters.sort_unstable(); + assert_eq!(key.scan.filters(), expected_filters.as_slice()); + } + + #[tokio::test] + async fn preserves_time_filters_when_query_does_not_cover_partition_range() { + let (stream_ctx, part_range) = new_stream_context( + vec![col("ts").gt_eq(ts_lit(1000)), col("k0").eq(lit("foo"))], + TimestampRange::with_unit(1000, 1500, TimeUnit::Millisecond), + ( + Timestamp::new_millisecond(1000), + Timestamp::new_millisecond(2000), + ), + ) + .await; + + let key = build_range_cache_key(&stream_ctx, &part_range).unwrap(); + + // Time filters should be preserved when query does not cover partition range. + assert_eq!( + key.scan.time_filters(), + [col("ts").gt_eq(ts_lit(1000)).to_string()].as_slice() + ); + assert_eq!( + key.scan.filters(), + [col("k0").eq(lit("foo")).to_string()].as_slice() + ); + } + + #[tokio::test] + async fn strips_time_only_filters_when_query_has_no_time_range_limit() { + let (stream_ctx, part_range) = new_stream_context( + vec![ + col("ts").gt_eq(ts_lit(1000)), + col("ts").is_not_null(), + col("k0").eq(lit("foo")), + ], + None, + ( + Timestamp::new_millisecond(1000), + Timestamp::new_millisecond(2000), + ), + ) + .await; + + let key = build_range_cache_key(&stream_ctx, &part_range).unwrap(); + + // Range-reducible time filters should be cleared when query has no time range limit. + assert!(key.scan.time_filters().is_empty()); + // Non-range time predicates stay in filters. + let mut expected_filters = [ + col("k0").eq(lit("foo")).to_string(), + col("ts").is_not_null().to_string(), + ]; + expected_filters.sort_unstable(); + assert_eq!(key.scan.filters(), expected_filters.as_slice()); + } #[test] fn normalizes_and_clears_time_filters() { @@ -249,4 +687,170 @@ mod tests { fingerprint.partition_expr_version ); } + + /// Creates a test schema with 5 columns where the primary key dictionary column + /// is at index 2 (`num_columns - 3`), matching the flat format layout. + /// + /// Layout: `[field0: Int64, field1: Int64, pk: Dictionary, ts: Int64, seq: Int64]` + fn dict_test_schema() -> Arc { + use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; + Arc::new(Schema::new(vec![ + Field::new("field0", ArrowDataType::Int64, false), + Field::new("field1", ArrowDataType::Int64, false), + Field::new( + "pk", + ArrowDataType::Dictionary( + Box::new(ArrowDataType::UInt32), + Box::new(ArrowDataType::Binary), + ), + false, + ), + Field::new("ts", ArrowDataType::Int64, false), + Field::new("seq", ArrowDataType::Int64, false), + ])) + } + + /// Helper to create a record batch with a dictionary column at the primary key position. + fn make_dict_batch( + schema: Arc, + dict_values: &datatypes::arrow::array::BinaryArray, + keys: &[u32], + int_values: &[i64], + ) -> RecordBatch { + use datatypes::arrow::array::{Int64Array, UInt32Array}; + + let key_array = UInt32Array::from(keys.to_vec()); + let dict_array: DictionaryArray = + DictionaryArray::new(key_array, Arc::new(dict_values.clone())); + let int_array = Int64Array::from(int_values.to_vec()); + let zeros = Int64Array::from(vec![0i64; int_values.len()]); + RecordBatch::try_new( + schema, + vec![ + Arc::new(zeros.clone()), + Arc::new(int_array), + Arc::new(dict_array), + Arc::new(zeros.clone()), + Arc::new(zeros), + ], + ) + .unwrap() + } + + /// Computes the total `get_array_memory_size()` of all dictionary value arrays in a batch. + fn compute_total_dict_values_size(batch: &RecordBatch) -> usize { + batch + .columns() + .iter() + .filter_map(|col| { + col.as_any() + .downcast_ref::>() + .map(|dict| dict.values().get_array_memory_size()) + }) + .sum() + } + + #[test] + fn cache_batch_buffer_empty() { + let buffer = CacheBatchBuffer::new(); + assert_eq!(buffer.estimated_batches_size(), 0); + assert!(buffer.into_batches().is_empty()); + } + + #[test] + fn cache_batch_buffer_single_batch() { + use datatypes::arrow::array::BinaryArray; + + let schema = dict_test_schema(); + let dict_values = BinaryArray::from_vec(vec![b"a", b"b", b"c"]); + let batch = make_dict_batch(schema, &dict_values, &[0, 1, 2], &[10, 20, 30]); + + let full_size = batch.get_array_memory_size(); + + let mut buffer = CacheBatchBuffer::new(); + buffer.push(batch); + assert_eq!(buffer.estimated_batches_size(), full_size); + assert_eq!(buffer.into_batches().len(), 1); + } + + #[test] + fn cache_batch_buffer_shared_dictionary() { + use datatypes::arrow::array::BinaryArray; + + let schema = dict_test_schema(); + let dict_values = BinaryArray::from_vec(vec![b"alpha", b"beta", b"gamma"]); + + // Two batches sharing the same dictionary values array. + let batch1 = make_dict_batch(schema.clone(), &dict_values, &[0, 1], &[10, 20]); + let batch2 = make_dict_batch(schema, &dict_values, &[1, 2], &[30, 40]); + + let batch1_full = batch1.get_array_memory_size(); + let batch2_full = batch2.get_array_memory_size(); + + // The total dictionary values size that should be deduplicated for the second batch. + let dict_values_size = compute_total_dict_values_size(&batch2); + + let mut buffer = CacheBatchBuffer::new(); + buffer.push(batch1); + buffer.push(batch2); + + // Second batch's dict values should not be counted again. + assert_eq!( + buffer.estimated_batches_size(), + batch1_full + batch2_full - dict_values_size + ); + assert_eq!(buffer.into_batches().len(), 2); + } + + #[test] + fn cache_batch_buffer_non_shared_dictionary() { + use datatypes::arrow::array::BinaryArray; + + let schema = dict_test_schema(); + let dict_values1 = BinaryArray::from_vec(vec![b"a", b"b"]); + let dict_values2 = BinaryArray::from_vec(vec![b"x", b"y"]); + + let batch1 = make_dict_batch(schema.clone(), &dict_values1, &[0, 1], &[10, 20]); + let batch2 = make_dict_batch(schema, &dict_values2, &[0, 1], &[30, 40]); + + let batch1_full = batch1.get_array_memory_size(); + let batch2_full = batch2.get_array_memory_size(); + + let mut buffer = CacheBatchBuffer::new(); + buffer.push(batch1); + buffer.push(batch2); + + // Different dictionaries: full size for both. + assert_eq!(buffer.estimated_batches_size(), batch1_full + batch2_full); + } + + #[test] + fn cache_batch_buffer_shared_then_diverged() { + use datatypes::arrow::array::BinaryArray; + + let schema = dict_test_schema(); + let shared_values = BinaryArray::from_vec(vec![b"a", b"b", b"c"]); + let different_values = BinaryArray::from_vec(vec![b"x", b"y"]); + + let batch1 = make_dict_batch(schema.clone(), &shared_values, &[0], &[1]); + let batch2 = make_dict_batch(schema.clone(), &shared_values, &[1], &[2]); + let batch3 = make_dict_batch(schema, &different_values, &[0], &[3]); + + let size1 = batch1.get_array_memory_size(); + let size2 = batch2.get_array_memory_size(); + let size3 = batch3.get_array_memory_size(); + + let dict_values_size = compute_total_dict_values_size(&batch2); + + let mut buffer = CacheBatchBuffer::new(); + buffer.push(batch1); + buffer.push(batch2); + buffer.push(batch3); + + // batch2 shares dict with batch1 (dedup), batch3 does not (full size). + assert_eq!( + buffer.estimated_batches_size(), + size1 + (size2 - dict_values_size) + size3 + ); + } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 5cb2d75e25..e7cae7e7b8 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -40,7 +40,7 @@ use store_api::region_engine::{PartitionRange, RegionScannerRef}; use store_api::storage::{ ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector, }; -use table::predicate::{Predicate, build_time_range_predicate}; +use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr}; use tokio::sync::{Semaphore, mpsc}; use tokio_stream::wrappers::ReceiverStream; @@ -1420,7 +1420,6 @@ fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode { /// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible /// for partition range caching. -#[cfg_attr(not(test), allow(dead_code))] pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option { let eligible = input.flat_format && !input.compaction @@ -1439,7 +1438,14 @@ pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option Option false, }; - if is_time_only { + if is_time_only + && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some() + { + // Range-reducible time predicates can be safely dropped from the + // cache key when the query time range covers the partition range. time_filters.push(expr.to_string()); } else { + // Non-time filters and non-range time predicates (those that + // extract_time_range_from_expr cannot convert to a TimestampRange) + // always stay in the cache key. filters.push(expr.to_string()); } } @@ -1511,6 +1524,10 @@ pub struct StreamContext { pub input: ScanInput, /// Metadata for partition ranges. pub(crate) ranges: Vec, + /// Precomputed scan fingerprint for partition range caching. + /// `None` when the scan is not eligible for caching. + #[allow(dead_code)] + pub(crate) scan_fingerprint: Option, // Metrics: /// The start time of the query. @@ -1523,10 +1540,12 @@ impl StreamContext { let query_start = input.query_start.unwrap_or_else(Instant::now); let ranges = RangeMeta::seq_scan_ranges(&input); READ_SST_COUNT.observe(input.num_files() as f64); + let scan_fingerprint = build_scan_fingerprint(&input); Self { input, ranges, + scan_fingerprint, query_start, } } @@ -1536,10 +1555,12 @@ impl StreamContext { let query_start = input.query_start.unwrap_or_else(Instant::now); let ranges = RangeMeta::unordered_scan_ranges(&input); READ_SST_COUNT.observe(input.num_files() as f64); + let scan_fingerprint = build_scan_fingerprint(&input); Self { input, ranges, + scan_fingerprint, query_start, } } @@ -1849,6 +1870,7 @@ mod tests { use std::sync::Arc; use datafusion::physical_plan::expressions::lit as physical_lit; + use datafusion_common::ScalarValue; use datafusion_expr::{col, lit}; use datatypes::value::Value; use partition::expr::col as partition_col; @@ -2035,13 +2057,18 @@ mod tests { assert!(scan_region.use_flat_format()); } + /// Helper to create a timestamp millisecond literal. + fn ts_lit(val: i64) -> datafusion_expr::Expr { + lit(ScalarValue::TimestampMillisecond(Some(val), None)) + } + #[tokio::test] async fn test_build_scan_fingerprint_for_eligible_scan() { let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); let input = new_scan_input( metadata.clone(), vec![ - col("ts").gt_eq(lit(1000)), + col("ts").gt_eq(ts_lit(1000)), col("k0").eq(lit("foo")), col("v0").gt(lit(1)), ], @@ -2071,7 +2098,7 @@ mod tests { col("k0").eq(lit("foo")).to_string(), col("v0").gt(lit(1)).to_string(), ], - time_filters: vec![col("ts").gt_eq(lit(1000)).to_string()], + time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()], series_row_selector: Some(TimeSeriesRowSelector::LastRow), append_mode: false, filter_deleted: false, diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 0ee6a4437d..6f68616709 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -247,6 +247,12 @@ pub(crate) struct ScanMetricsSet { num_range_builders: isize, /// Peak number of file range builders. num_peak_range_builders: isize, + /// Total bytes added to the range cache during this scan. + range_cache_size: usize, + /// Number of range cache hits during this scan. + range_cache_hit: usize, + /// Number of range cache misses during this scan. + range_cache_miss: usize, } /// Wrapper for file metrics that compares by total cost in reverse order. @@ -345,6 +351,9 @@ impl fmt::Debug for ScanMetricsSet { build_ranges_peak_mem_size, num_range_builders: _, num_peak_range_builders, + range_cache_size, + range_cache_hit, + range_cache_miss, } = self; // Write core metrics @@ -590,6 +599,16 @@ impl fmt::Debug for ScanMetricsSet { write!(f, "}}")?; } + if *range_cache_size > 0 { + write!(f, ", \"range_cache_size\":{range_cache_size}")?; + } + if *range_cache_hit > 0 { + write!(f, ", \"range_cache_hit\":{range_cache_hit}")?; + } + if *range_cache_miss > 0 { + write!(f, ", \"range_cache_miss\":{range_cache_miss}")?; + } + write!( f, ", \"build_ranges_peak_mem_size\":{build_ranges_peak_mem_size}, \ @@ -1097,6 +1116,27 @@ impl PartitionMetrics { pub(crate) fn dedup_metrics_reporter(&self) -> Arc { self.0.clone() } + + /// Increments the total bytes added to the range cache. + #[allow(dead_code)] + pub(crate) fn inc_range_cache_size(&self, size: usize) { + let mut metrics = self.0.metrics.lock().unwrap(); + metrics.range_cache_size += size; + } + + /// Increments the range cache hit counter. + #[allow(dead_code)] + pub(crate) fn inc_range_cache_hit(&self) { + let mut metrics = self.0.metrics.lock().unwrap(); + metrics.range_cache_hit += 1; + } + + /// Increments the range cache miss counter. + #[allow(dead_code)] + pub(crate) fn inc_range_cache_miss(&self) { + let mut metrics = self.0.metrics.lock().unwrap(); + metrics.range_cache_miss += 1; + } } impl fmt::Debug for PartitionMetrics { diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 842689bba6..350195bfa9 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -15,6 +15,7 @@ //! Utilities for testing. pub mod batch_util; +pub mod bench_util; pub mod memtable_util; pub mod scheduler_util; pub mod sst_util; diff --git a/src/mito2/src/test_util/bench_util.rs b/src/mito2/src/test_util/bench_util.rs new file mode 100644 index 0000000000..8f182e4157 --- /dev/null +++ b/src/mito2/src/test_util/bench_util.rs @@ -0,0 +1,259 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Shared utilities for mito2 benchmarks. +//! +//! Provides a TSBS cpu-like data generator ([`CpuDataGenerator`]) and schema +//! ([`cpu_metadata`]) used by multiple benchmark binaries in this directory. + +use api::v1::value::ValueData; +use api::v1::{Row, Rows, SemanticType}; +use datafusion_common::Column; +use datafusion_expr::{Expr, lit}; +use datatypes::data_type::ConcreteDataType; +use datatypes::schema::ColumnSchema; +use rand::Rng; +use rand::rngs::ThreadRng; +use rand::seq::IndexedRandom; +use store_api::metadata::{ + ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, +}; +use store_api::storage::RegionId; +use table::predicate::Predicate; + +use crate::memtable::KeyValues; +use crate::test_util::memtable_util::region_metadata_to_row_schema; + +pub struct Host { + pub hostname: String, + pub region: String, + pub datacenter: String, + pub rack: String, + pub os: String, + pub arch: String, + pub team: String, + pub service: String, + pub service_version: String, + pub service_environment: String, +} + +impl Host { + pub fn random_with_id(id: usize) -> Host { + let mut rng = rand::rng(); + let region = format!("ap-southeast-{}", rng.random_range(0..10)); + let datacenter = format!( + "{}{}", + region, + ['a', 'b', 'c', 'd', 'e'].choose(&mut rng).unwrap() + ); + Host { + hostname: format!("host_{id}"), + region, + datacenter, + rack: rng.random_range(0..100).to_string(), + os: "Ubuntu16.04LTS".to_string(), + arch: "x86".to_string(), + team: "CHI".to_string(), + service: rng.random_range(0..100).to_string(), + service_version: rng.random_range(0..10).to_string(), + service_environment: "test".to_string(), + } + } + + pub fn fill_values(&self, values: &mut Vec) { + let tags = [ + api::v1::Value { + value_data: Some(ValueData::StringValue(self.hostname.clone())), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(self.region.clone())), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(self.datacenter.clone())), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(self.rack.clone())), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(self.os.clone())), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(self.arch.clone())), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(self.team.clone())), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(self.service.clone())), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(self.service_version.clone())), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(self.service_environment.clone())), + }, + ]; + for tag in tags { + values.push(tag); + } + } +} + +pub struct CpuDataGenerator { + pub metadata: RegionMetadataRef, + column_schemas: Vec, + hosts: Vec, + start_sec: i64, + end_sec: i64, +} + +impl CpuDataGenerator { + pub fn new( + metadata: RegionMetadataRef, + num_hosts: usize, + start_sec: i64, + end_sec: i64, + ) -> Self { + let column_schemas = region_metadata_to_row_schema(&metadata); + Self { + metadata, + column_schemas, + hosts: Self::generate_hosts(num_hosts), + start_sec, + end_sec, + } + } + + pub fn iter(&self) -> impl Iterator + '_ { + // point per 10s. + (self.start_sec..self.end_sec) + .step_by(10) + .enumerate() + .map(|(seq, ts)| self.build_key_values(seq, ts)) + } + + pub fn build_key_values(&self, seq: usize, current_sec: i64) -> KeyValues { + let rows = self + .hosts + .iter() + .map(|host| { + let mut rng = rand::rng(); + let mut values = Vec::with_capacity(21); + values.push(api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(current_sec * 1000)), + }); + host.fill_values(&mut values); + for _ in 0..10 { + values.push(api::v1::Value { + value_data: Some(ValueData::F64Value(Self::random_f64(&mut rng))), + }); + } + Row { values } + }) + .collect(); + let mutation = api::v1::Mutation { + op_type: api::v1::OpType::Put as i32, + sequence: seq as u64, + rows: Some(Rows { + schema: self.column_schemas.clone(), + rows, + }), + write_hint: None, + }; + + KeyValues::new(&self.metadata, mutation).unwrap() + } + + pub fn random_host_filter(&self) -> Predicate { + let host = self.random_hostname(); + let expr = Expr::Column(Column::from_name("hostname")).eq(lit(host)); + Predicate::new(vec![expr]) + } + + pub fn random_host_filter_exprs(&self) -> Vec { + let host = self.random_hostname(); + vec![Expr::Column(Column::from_name("hostname")).eq(lit(host))] + } + + pub fn random_hostname(&self) -> String { + let mut rng = rand::rng(); + self.hosts.choose(&mut rng).unwrap().hostname.clone() + } + + pub fn random_f64(rng: &mut ThreadRng) -> f64 { + let base: u32 = rng.random_range(30..95); + base as f64 + } + + pub fn generate_hosts(num_hosts: usize) -> Vec { + (0..num_hosts).map(Host::random_with_id).collect() + } +} + +/// Creates a metadata for TSBS cpu-like table. +pub fn cpu_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1)); + builder.push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 0, + }); + let mut column_id = 1; + let tags = [ + "hostname", + "region", + "datacenter", + "rack", + "os", + "arch", + "team", + "service", + "service_version", + "service_environment", + ]; + for tag in tags { + builder.push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new(tag, ConcreteDataType::string_datatype(), true), + semantic_type: SemanticType::Tag, + column_id, + }); + column_id += 1; + } + let fields = [ + "usage_user", + "usage_system", + "usage_idle", + "usage_nice", + "usage_iowait", + "usage_irq", + "usage_softirq", + "usage_steal", + "usage_guest", + "usage_guest_nice", + ]; + for field in fields { + builder.push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new(field, ConcreteDataType::float64_datatype(), true), + semantic_type: SemanticType::Field, + column_id, + }); + column_id += 1; + } + builder.primary_key(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + builder.build().unwrap() +} diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 8917875250..25ab9bb8b4 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -30,7 +30,7 @@ use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortFi use store_api::metadata::{ ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, }; -use store_api::storage::{ColumnId, RegionId, SequenceNumber, SequenceRange}; +use store_api::storage::{ColumnId, RegionId, SequenceNumber}; use crate::error::Result; use crate::memtable::bulk::part::BulkPart; diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 71896b3d5d..fd5ad82f3f 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -207,6 +207,7 @@ impl WorkerGroup { .vector_cache_size(config.vector_cache_size.as_bytes()) .page_cache_size(config.page_cache_size.as_bytes()) .selector_result_cache_size(config.selector_result_cache_size.as_bytes()) + .range_result_cache_size(config.range_result_cache_size.as_bytes()) .index_metadata_size(config.index.metadata_cache_size.as_bytes()) .index_content_size(config.index.content_cache_size.as_bytes()) .index_content_page_size(config.index.content_cache_page_size.as_bytes()) @@ -421,6 +422,7 @@ impl WorkerGroup { .vector_cache_size(config.vector_cache_size.as_bytes()) .page_cache_size(config.page_cache_size.as_bytes()) .selector_result_cache_size(config.selector_result_cache_size.as_bytes()) + .range_result_cache_size(config.range_result_cache_size.as_bytes()) .write_cache(write_cache) .build(), ); diff --git a/src/table/src/predicate.rs b/src/table/src/predicate.rs index f9be7be16e..2c9ac41560 100644 --- a/src/table/src/predicate.rs +++ b/src/table/src/predicate.rs @@ -203,7 +203,7 @@ pub fn build_time_range_predicate( /// Extract time range filter from `WHERE`/`IN (...)`/`BETWEEN` clauses. /// Return None if no time range can be found in expr. -fn extract_time_range_from_expr( +pub fn extract_time_range_from_expr( ts_col_name: &str, ts_col_unit: TimeUnit, expr: &Expr, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 65e56fa15e..7ae59ae9fc 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1642,6 +1642,7 @@ fn drop_lines_with_inconsistent_results(input: String) -> String { "metadata_cache_size =", "content_cache_size =", "result_cache_size =", + "range_result_cache_size =", "name =", "recovery_parallelism =", "max_background_index_builds =",