diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index c2c14b4680..9c8e8d6ce8 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -23,7 +23,9 @@ use datafusion::error::Result as DatafusionResult; use datafusion::parquet::arrow::async_reader::AsyncFileReader; use datafusion::parquet::arrow::{ArrowWriter, parquet_to_arrow_schema}; use datafusion::parquet::errors::{ParquetError, Result as ParquetResult}; -use datafusion::parquet::file::metadata::ParquetMetaData; +use datafusion::parquet::file::metadata::{ + PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, +}; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_datasource::PartitionedFile; @@ -94,35 +96,40 @@ impl DefaultParquetFileReaderFactory { } impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { - // TODO(weny): Supports [`metadata_size_hint`]. - // The upstream has a implementation supports [`metadata_size_hint`], - // however it coupled with Box. fn create_reader( &self, _partition_index: usize, partitioned_file: PartitionedFile, - _metadata_size_hint: Option, + metadata_size_hint: Option, _metrics: &ExecutionPlanMetricsSet, ) -> DatafusionResult> { let path = partitioned_file.path().to_string(); let object_store = self.object_store.clone(); - Ok(Box::new(LazyParquetFileReader::new(object_store, path))) + Ok(Box::new(LazyParquetFileReader::new( + object_store, + path, + metadata_size_hint, + ))) } } pub struct LazyParquetFileReader { object_store: ObjectStore, reader: Option>, + file_size: Option, + metadata_size_hint: Option, path: String, } impl LazyParquetFileReader { - pub fn new(object_store: ObjectStore, path: String) -> Self { + pub fn new(object_store: ObjectStore, path: String, metadata_size_hint: Option) -> Self { LazyParquetFileReader { object_store, path, reader: None, + file_size: None, + metadata_size_hint, } } @@ -130,6 +137,7 @@ impl LazyParquetFileReader { async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> { if self.reader.is_none() { let meta = self.object_store.stat(&self.path).await?; + self.file_size = Some(meta.content_length()); let reader = self .object_store .reader(&self.path) @@ -166,8 +174,19 @@ impl AsyncFileReader for LazyParquetFileReader { self.maybe_initialize() .await .map_err(|e| ParquetError::External(Box::new(e)))?; - // Safety: Must initialized - self.reader.as_mut().unwrap().get_metadata(options).await + + let metadata_opts = options.map(|o| o.metadata_options().clone()); + let metadata_reader = ParquetMetaDataReader::new() + .with_metadata_options(metadata_opts) + .with_page_index_policy(PageIndexPolicy::from( + options.is_some_and(|o| o.page_index()), + )) + .with_prefetch_hint(self.metadata_size_hint); + + let metadata = metadata_reader + .load_and_finish(self.reader.as_mut().unwrap(), self.file_size.unwrap()) + .await?; + Ok(Arc::new(metadata)) }) } } diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 278838b369..9b987c810b 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -288,6 +288,17 @@ pub(crate) struct FileCache { pub(crate) type FileCacheRef = Arc; impl FileCache { + /// Splits the configured total capacity between parquet and puffin caches + /// without exceeding the requested overall budget. + fn split_cache_capacities(total_capacity: u64, index_percent: u8) -> (u64, u64) { + let desired_puffin_capacity = total_capacity * u64::from(index_percent) / 100; + let min_cache_capacity = MIN_CACHE_CAPACITY.min(total_capacity / 2); + let puffin_capacity = + desired_puffin_capacity.clamp(min_cache_capacity, total_capacity - min_cache_capacity); + let parquet_capacity = total_capacity - puffin_capacity; + (parquet_capacity, puffin_capacity) + } + /// Creates a new file cache. pub(crate) fn new( local_store: ObjectStore, @@ -302,14 +313,8 @@ impl FileCache { .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT); let total_capacity = capacity.as_bytes(); - // Convert percent to ratio and calculate capacity for each cache - let index_ratio = index_percent as f64 / 100.0; - let puffin_capacity = (total_capacity as f64 * index_ratio) as u64; - let parquet_capacity = total_capacity - puffin_capacity; - - // Ensure both capacities are at least 512MB - let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY); - let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY); + let (parquet_capacity, puffin_capacity) = + Self::split_cache_capacities(total_capacity, index_percent); info!( "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}", @@ -1064,6 +1069,28 @@ mod tests { assert_eq!(data, bytes[3].as_ref()); } + #[test] + fn test_file_cache_capacity_respects_total_budget() { + let total_capacity = ReadableSize::mb(256).as_bytes(); + let (parquet_capacity, puffin_capacity) = + FileCache::split_cache_capacities(total_capacity, 20); + + assert_eq!(total_capacity, parquet_capacity + puffin_capacity); + assert_eq!(ReadableSize::mb(128).as_bytes(), parquet_capacity); + assert_eq!(ReadableSize::mb(128).as_bytes(), puffin_capacity); + } + + #[test] + fn test_file_cache_capacity_keeps_split_when_total_allows_it() { + let total_capacity = ReadableSize::gb(5).as_bytes(); + let (parquet_capacity, puffin_capacity) = + FileCache::split_cache_capacities(total_capacity, 20); + + assert_eq!(total_capacity, parquet_capacity + puffin_capacity); + assert_eq!(ReadableSize::gb(4).as_bytes(), parquet_capacity); + assert_eq!(ReadableSize::gb(1).as_bytes(), puffin_capacity); + } + #[test] fn test_cache_file_path() { let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap(); diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 4dad4fb885..d4d9714390 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -137,7 +137,7 @@ struct CollectedParts { /// All parts in a bulk memtable. #[derive(Default)] struct BulkParts { - /// Unordered small parts (< 1024 rows). + /// Unordered small parts. unordered_part: UnorderedPart, /// All parts (raw and encoded). parts: Vec, diff --git a/src/mito2/src/memtable/partition_tree/data.rs b/src/mito2/src/memtable/partition_tree/data.rs index a6d40bdcbf..f6e2a59bec 100644 --- a/src/mito2/src/memtable/partition_tree/data.rs +++ b/src/mito2/src/memtable/partition_tree/data.rs @@ -50,6 +50,7 @@ use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource use crate::metrics::{ PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED, PARTITION_TREE_READ_STAGE_ELAPSED, }; +use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; const PK_INDEX_COLUMN_NAME: &str = "__pk_index"; @@ -821,7 +822,11 @@ impl DataPart { /// Reads frozen data part and yields [DataBatch]es. pub fn read(&self) -> Result { match self { - DataPart::Parquet(data_bytes) => DataPartReader::new(data_bytes.data.clone(), None), + // Keep encoded memtable scans aligned with mito/DataFusion batch sizing instead of + // parquet-rs's implicit 1024-row default. + DataPart::Parquet(data_bytes) => { + DataPartReader::new(data_bytes.data.clone(), Some(DEFAULT_READ_BATCH_SIZE)) + } } } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 26bed76fd6..e1c75a10c5 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -39,9 +39,17 @@ pub mod writer; pub const PARQUET_METADATA_KEY: &str = "greptime:metadata"; /// Default batch size to read parquet files. -pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024; +/// +/// This is a runtime-only scan granularity, so we align it with DataFusion's +/// default execution batch size to reduce rebatching and concatenation in the +/// query pipeline. +pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 8 * 1024; /// Default row group size for parquet files. -pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE; +/// +/// Keep the existing persisted/on-disk default stable. It intentionally stays +/// decoupled from [`DEFAULT_READ_BATCH_SIZE`] so we can tune runtime scan +/// batching without changing the row group layout of newly written SSTs. +pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024; /// Parquet write options. #[derive(Debug, Clone)] diff --git a/src/promql/src/extension_plan/absent.rs b/src/promql/src/extension_plan/absent.rs index db31a3d901..7787d7cce6 100644 --- a/src/promql/src/extension_plan/absent.rs +++ b/src/promql/src/extension_plan/absent.rs @@ -49,9 +49,6 @@ use snafu::ResultExt; use crate::error::DeserializeSnafu; use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index}; -/// Maximum number of rows per output batch -const ABSENT_BATCH_SIZE: usize = 8192; - #[derive(Debug, PartialEq, Eq, Hash)] pub struct Absent { start: Millisecond, @@ -390,11 +387,13 @@ impl ExecutionPlan for AbsentExec { context: Arc, ) -> DataFusionResult { let baseline_metric = BaselineMetrics::new(&self.metric, partition); + let batch_size = context.session_config().batch_size(); let input = self.input.execute(partition, context)?; Ok(Box::pin(AbsentStream { end: self.end, step: self.step, + batch_size, time_index_column_index: self .input .schema() @@ -441,6 +440,7 @@ impl DisplayAs for AbsentExec { pub struct AbsentStream { end: Millisecond, step: Millisecond, + batch_size: usize, time_index_column_index: usize, output_schema: SchemaRef, fake_labels: Vec<(String, String)>, @@ -474,7 +474,7 @@ impl Stream for AbsentStream { self.metric.elapsed_compute().add_elapsed(timer); // If we have enough data for a batch, output it - if self.output_timestamps.len() >= ABSENT_BATCH_SIZE { + if self.output_timestamps.len() >= self.batch_size { let timer = std::time::Instant::now(); let result = self.flush_output_batch(); self.metric.elapsed_compute().add_elapsed(timer); diff --git a/src/query/src/range_select/plan.rs b/src/query/src/range_select/plan.rs index e863aaced0..83b47bc1b1 100644 --- a/src/query/src/range_select/plan.rs +++ b/src/query/src/range_select/plan.rs @@ -836,6 +836,7 @@ impl ExecutionPlan for RangeSelectExec { context: Arc, ) -> DfResult { let baseline_metric = BaselineMetrics::new(&self.metric, partition); + let batch_size = context.session_config().batch_size(); let input = self.input.execute(partition, context)?; let schema = input.schema(); let time_index = schema @@ -852,6 +853,7 @@ impl ExecutionPlan for RangeSelectExec { .collect(), )?; Ok(Box::pin(RangeSelectStream { + batch_size, schema: self.schema.clone(), range_exec: self.range_exec.clone(), input, @@ -868,6 +870,8 @@ impl ExecutionPlan for RangeSelectExec { metric: baseline_metric, schema_project: self.schema_project.clone(), schema_before_project: self.schema_before_project.clone(), + output_batch: None, + output_batch_offset: 0, })) } @@ -881,6 +885,7 @@ impl ExecutionPlan for RangeSelectExec { } struct RangeSelectStream { + batch_size: usize, /// the schema of output column schema: SchemaRef, range_exec: Vec, @@ -907,6 +912,8 @@ struct RangeSelectStream { metric: BaselineMetrics, schema_project: Option>, schema_before_project: SchemaRef, + output_batch: Option, + output_batch_offset: usize, } #[derive(Debug)] @@ -1149,6 +1156,35 @@ impl RangeSelectStream { }; Ok(project_output) } + + fn next_output_batch(&mut self) -> DfResult> { + if self.output_batch.is_none() { + self.output_batch = Some(self.generate_output()?); + self.output_batch_offset = 0; + } + + let num_rows = self.output_batch.as_ref().unwrap().num_rows(); + if num_rows == 0 { + self.output_batch_offset = 0; + return Ok(self.output_batch.take()); + } + + if self.output_batch_offset == 0 && num_rows <= self.batch_size { + return Ok(self.output_batch.take()); + } + + let offset = self.output_batch_offset; + let len = (num_rows - offset).min(self.batch_size); + let batch = self.output_batch.as_ref().unwrap().slice(offset, len); + self.output_batch_offset += len; + + if self.output_batch_offset >= num_rows { + self.output_batch = None; + self.output_batch_offset = 0; + } + + Ok(Some(batch)) + } } enum ExecutionState { @@ -1191,13 +1227,16 @@ impl Stream for RangeSelectStream { } } ExecutionState::ProducingOutput => { - let result = self.generate_output(); + let result = self.next_output_batch(); return match result { // made output - Ok(batch) => { - self.exec_state = ExecutionState::Done; + Ok(Some(batch)) => { + if self.output_batch.is_none() { + self.exec_state = ExecutionState::Done; + } Poll::Ready(Some(Ok(batch))) } + Ok(None) => Poll::Ready(None), // error making output Err(error) => Poll::Ready(Some(Err(error))), }; @@ -1313,15 +1352,15 @@ mod test { )) } - async fn do_range_select_test( + async fn collect_range_select_test( range1: Millisecond, range2: Millisecond, align: Millisecond, fill: Option, is_float: bool, is_gap: bool, - expected: String, - ) { + batch_size: usize, + ) -> Vec { let data_type = if is_float { DataType::Float64 } else { @@ -1412,11 +1451,25 @@ mod test { .into(), range_select_exec, ); - let session_context = SessionContext::default(); + let session_context = SessionContext::new_with_config( + datafusion::execution::config::SessionConfig::new().with_batch_size(batch_size), + ); + datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx()) + .await + .unwrap() + } + + async fn do_range_select_test( + range1: Millisecond, + range2: Millisecond, + align: Millisecond, + fill: Option, + is_float: bool, + is_gap: bool, + expected: String, + ) { let result = - datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx()) - .await - .unwrap(); + collect_range_select_test(range1, range2, align, fill, is_float, is_gap, 8192).await; let result_literal = arrow::util::pretty::pretty_format_batches(&result) .unwrap() @@ -1700,6 +1753,18 @@ mod test { .await; } + #[tokio::test] + async fn range_select_respects_session_batch_size() { + let result = + collect_range_select_test(10_000, 5_000, 5_000, Some(Fill::Null), true, false, 3).await; + + let row_counts = result + .iter() + .map(|batch| batch.num_rows()) + .collect::>(); + assert_eq!(vec![3, 3, 3, 3], row_counts); + } + #[test] fn fill_test() { assert!(Fill::try_from_str("", &DataType::UInt8).unwrap().is_none());