diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 3a95b79c61..302e0d3902 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -978,10 +978,6 @@ async fn preload_parquet_meta_cache_for_files( } let file_size = file_handle.meta_ref().file_size; - if file_size == 0 { - continue; - } - let file_path = file_handle.file_path(&table_dir, path_type); let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size); match loader.load(&mut cache_metrics).await { @@ -1180,4 +1176,77 @@ mod tests { .is_some() ); } + + #[tokio::test] + async fn test_preload_parquet_meta_cache_with_unknown_file_size() { + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let cache_manager = Arc::new( + CacheManager::builder() + .sst_meta_cache_size(1024 * 1024) + .build(), + ); + + let region_id = RegionId::new(1, 1); + let file_id = FileId::random(); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let batch = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut parquet_bytes = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + // file_size is 0 when it's missing/defaulted in manifests; MetadataLoader::load will stat + // the object store to retrieve it. + let file_meta = FileMeta { + region_id, + file_id, + time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)), + level: 0, + file_size: 0, + max_row_group_uncompressed_size: 0, + available_indexes: Default::default(), + indexes: vec![], + index_file_size: 0, + index_version: 0, + num_rows: 3, + num_row_groups: 1, + sequence: None, + partition_expr: None, + num_series: 0, + }; + let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger)); + + let table_dir = "test_table"; + let path_type = PathType::Bare; + let remote_path = file_handle.file_path(table_dir, path_type); + object_store + .write(&remote_path, parquet_bytes) + .await + .unwrap(); + + let region_file_id = file_handle.file_id(); + assert!( + cache_manager + .get_parquet_meta_data_from_mem_cache(region_file_id) + .is_none() + ); + + let loaded = preload_parquet_meta_cache_for_files( + region_id, + table_dir.to_string(), + path_type, + object_store, + cache_manager.clone(), + vec![file_handle], + ) + .await; + + assert_eq!(loaded, 1); + assert!( + cache_manager + .get_parquet_meta_data_from_mem_cache(region_file_id) + .is_some() + ); + } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 77bf128e32..794007c1a4 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -1150,30 +1150,32 @@ impl ParquetReaderBuilder { }; let file_id = self.file_handle.file_id().file_id(); - let cached_minmax_key = if predicate.dyn_filters().is_empty() { - // Cache min-max pruning results keyed by predicate expressions. This avoids repeatedly - // building row-group pruning stats for identical predicates across queries. - let mut exprs = predicate - .exprs() - .iter() - .map(|expr| format!("{expr:?}")) - .collect::>(); - exprs.sort(); - let schema_version = self - .expected_metadata - .as_ref() - .map(|meta| meta.schema_version) - .unwrap_or_else(|| read_format.metadata().schema_version); - Some(PredicateKey::new_minmax( - Arc::new(exprs), - schema_version, - skip_fields, - )) - } else { - None - }; + let index_result_cache = self.cache_strategy.index_result_cache(); + let cached_minmax_key = + if index_result_cache.is_some() && predicate.dyn_filters().is_empty() { + // Cache min-max pruning results keyed by predicate expressions. This avoids repeatedly + // building row-group pruning stats for identical predicates across queries. + let mut exprs = predicate + .exprs() + .iter() + .map(|expr| format!("{expr:?}")) + .collect::>(); + exprs.sort(); + let schema_version = self + .expected_metadata + .as_ref() + .map(|meta| meta.schema_version) + .unwrap_or_else(|| read_format.metadata().schema_version); + Some(PredicateKey::new_minmax( + Arc::new(exprs), + schema_version, + skip_fields, + )) + } else { + None + }; - if let Some(index_result_cache) = self.cache_strategy.index_result_cache() + if let Some(index_result_cache) = index_result_cache && let Some(predicate_key) = cached_minmax_key.as_ref() && let Some(result) = index_result_cache.get(predicate_key, file_id) { @@ -1212,7 +1214,7 @@ impl ParquetReaderBuilder { .num_row_groups() .saturating_sub(output.row_group_count()); - if let Some(index_result_cache) = self.cache_strategy.index_result_cache() + if let Some(index_result_cache) = index_result_cache && let Some(predicate_key) = cached_minmax_key { index_result_cache.put(predicate_key, file_id, Arc::new(output.clone())); @@ -2157,3 +2159,112 @@ impl FlatRowGroupReader { } } } + +#[cfg(test)] +mod tests { + use std::any::Any; + use std::fmt::{Debug, Formatter}; + use std::sync::{Arc, LazyLock}; + + use datafusion::arrow::datatypes::DataType; + use datafusion_common::ScalarValue; + use datafusion_expr::expr::ScalarFunction; + use datafusion_expr::{ + ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility, + }; + use datatypes::arrow::array::{ArrayRef, Int64Array}; + use datatypes::arrow::record_batch::RecordBatch; + use object_store::services::Memory; + use parquet::arrow::ArrowWriter; + use store_api::region_request::PathType; + use table::predicate::Predicate; + + use super::*; + use crate::sst::parquet::metadata::MetadataLoader; + use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata}; + + #[tokio::test(flavor = "current_thread")] + async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() { + #[derive(Eq, PartialEq, Hash)] + struct PanicDebugUdf; + + impl Debug for PanicDebugUdf { + fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result { + panic!("minmax predicate key should not format exprs when cache is disabled"); + } + } + + impl ScalarUDFImpl for PanicDebugUdf { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "panic_debug_udf" + } + + fn signature(&self) -> &Signature { + static SIGNATURE: LazyLock = + LazyLock::new(|| Signature::variadic_any(Volatility::Immutable)); + &SIGNATURE + } + + fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result { + Ok(DataType::Int64) + } + + fn invoke_with_args( + &self, + _args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1)))) + } + } + + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let file_handle = sst_file_handle(0, 1); + let table_dir = "test_table".to_string(); + let path_type = PathType::Bare; + let file_path = file_handle.file_path(&table_dir, path_type); + + let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef; + let batch = RecordBatch::try_from_iter([("col", col)]).unwrap(); + let mut parquet_bytes = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + let file_size = parquet_bytes.len() as u64; + object_store.write(&file_path, parquet_bytes).await.unwrap(); + + let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata()); + let read_format = + ReadFormat::new(region_metadata, None, false, None, &file_path, false).unwrap(); + + let mut cache_metrics = MetadataCacheMetrics::default(); + let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size); + let parquet_meta = loader.load(&mut cache_metrics).await.unwrap(); + + let udf = Arc::new(ScalarUDF::new_from_impl(PanicDebugUdf)); + let predicate = Predicate::new(vec![Expr::ScalarFunction(ScalarFunction::new_udf( + udf, + vec![], + ))]); + let builder = ParquetReaderBuilder::new(table_dir, path_type, file_handle, object_store) + .predicate(Some(predicate)) + .cache(CacheStrategy::Disabled); + + let row_group_size = parquet_meta.row_group(0).num_rows() as usize; + let total_row_count = parquet_meta.file_metadata().num_rows() as usize; + let mut metrics = ReaderFilterMetrics::default(); + let selection = builder.row_groups_by_minmax( + &read_format, + &parquet_meta, + row_group_size, + total_row_count, + &mut metrics, + false, + ); + + assert!(!selection.is_empty()); + } +}