mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-29 11:20:38 +00:00
enhance cache file enter logic
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -978,10 +978,6 @@ async fn preload_parquet_meta_cache_for_files(
|
||||
}
|
||||
|
||||
let file_size = file_handle.meta_ref().file_size;
|
||||
if file_size == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let file_path = file_handle.file_path(&table_dir, path_type);
|
||||
let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
|
||||
match loader.load(&mut cache_metrics).await {
|
||||
@@ -1180,4 +1176,77 @@ mod tests {
|
||||
.is_some()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_preload_parquet_meta_cache_with_unknown_file_size() {
|
||||
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
|
||||
let cache_manager = Arc::new(
|
||||
CacheManager::builder()
|
||||
.sst_meta_cache_size(1024 * 1024)
|
||||
.build(),
|
||||
);
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let file_id = FileId::random();
|
||||
|
||||
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
|
||||
let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
|
||||
let mut parquet_bytes = Vec::new();
|
||||
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
|
||||
writer.write(&batch).unwrap();
|
||||
writer.close().unwrap();
|
||||
|
||||
// file_size is 0 when it's missing/defaulted in manifests; MetadataLoader::load will stat
|
||||
// the object store to retrieve it.
|
||||
let file_meta = FileMeta {
|
||||
region_id,
|
||||
file_id,
|
||||
time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
|
||||
level: 0,
|
||||
file_size: 0,
|
||||
max_row_group_uncompressed_size: 0,
|
||||
available_indexes: Default::default(),
|
||||
indexes: vec![],
|
||||
index_file_size: 0,
|
||||
index_version: 0,
|
||||
num_rows: 3,
|
||||
num_row_groups: 1,
|
||||
sequence: None,
|
||||
partition_expr: None,
|
||||
num_series: 0,
|
||||
};
|
||||
let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
|
||||
|
||||
let table_dir = "test_table";
|
||||
let path_type = PathType::Bare;
|
||||
let remote_path = file_handle.file_path(table_dir, path_type);
|
||||
object_store
|
||||
.write(&remote_path, parquet_bytes)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let region_file_id = file_handle.file_id();
|
||||
assert!(
|
||||
cache_manager
|
||||
.get_parquet_meta_data_from_mem_cache(region_file_id)
|
||||
.is_none()
|
||||
);
|
||||
|
||||
let loaded = preload_parquet_meta_cache_for_files(
|
||||
region_id,
|
||||
table_dir.to_string(),
|
||||
path_type,
|
||||
object_store,
|
||||
cache_manager.clone(),
|
||||
vec![file_handle],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(loaded, 1);
|
||||
assert!(
|
||||
cache_manager
|
||||
.get_parquet_meta_data_from_mem_cache(region_file_id)
|
||||
.is_some()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1150,30 +1150,32 @@ impl ParquetReaderBuilder {
|
||||
};
|
||||
|
||||
let file_id = self.file_handle.file_id().file_id();
|
||||
let cached_minmax_key = if predicate.dyn_filters().is_empty() {
|
||||
// Cache min-max pruning results keyed by predicate expressions. This avoids repeatedly
|
||||
// building row-group pruning stats for identical predicates across queries.
|
||||
let mut exprs = predicate
|
||||
.exprs()
|
||||
.iter()
|
||||
.map(|expr| format!("{expr:?}"))
|
||||
.collect::<Vec<_>>();
|
||||
exprs.sort();
|
||||
let schema_version = self
|
||||
.expected_metadata
|
||||
.as_ref()
|
||||
.map(|meta| meta.schema_version)
|
||||
.unwrap_or_else(|| read_format.metadata().schema_version);
|
||||
Some(PredicateKey::new_minmax(
|
||||
Arc::new(exprs),
|
||||
schema_version,
|
||||
skip_fields,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let index_result_cache = self.cache_strategy.index_result_cache();
|
||||
let cached_minmax_key =
|
||||
if index_result_cache.is_some() && predicate.dyn_filters().is_empty() {
|
||||
// Cache min-max pruning results keyed by predicate expressions. This avoids repeatedly
|
||||
// building row-group pruning stats for identical predicates across queries.
|
||||
let mut exprs = predicate
|
||||
.exprs()
|
||||
.iter()
|
||||
.map(|expr| format!("{expr:?}"))
|
||||
.collect::<Vec<_>>();
|
||||
exprs.sort();
|
||||
let schema_version = self
|
||||
.expected_metadata
|
||||
.as_ref()
|
||||
.map(|meta| meta.schema_version)
|
||||
.unwrap_or_else(|| read_format.metadata().schema_version);
|
||||
Some(PredicateKey::new_minmax(
|
||||
Arc::new(exprs),
|
||||
schema_version,
|
||||
skip_fields,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if let Some(index_result_cache) = self.cache_strategy.index_result_cache()
|
||||
if let Some(index_result_cache) = index_result_cache
|
||||
&& let Some(predicate_key) = cached_minmax_key.as_ref()
|
||||
&& let Some(result) = index_result_cache.get(predicate_key, file_id)
|
||||
{
|
||||
@@ -1212,7 +1214,7 @@ impl ParquetReaderBuilder {
|
||||
.num_row_groups()
|
||||
.saturating_sub(output.row_group_count());
|
||||
|
||||
if let Some(index_result_cache) = self.cache_strategy.index_result_cache()
|
||||
if let Some(index_result_cache) = index_result_cache
|
||||
&& let Some(predicate_key) = cached_minmax_key
|
||||
{
|
||||
index_result_cache.put(predicate_key, file_id, Arc::new(output.clone()));
|
||||
@@ -2157,3 +2159,112 @@ impl FlatRowGroupReader {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::any::Any;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::sync::{Arc, LazyLock};
|
||||
|
||||
use datafusion::arrow::datatypes::DataType;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::expr::ScalarFunction;
|
||||
use datafusion_expr::{
|
||||
ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, Volatility,
|
||||
};
|
||||
use datatypes::arrow::array::{ArrayRef, Int64Array};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use object_store::services::Memory;
|
||||
use parquet::arrow::ArrowWriter;
|
||||
use store_api::region_request::PathType;
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use super::*;
|
||||
use crate::sst::parquet::metadata::MetadataLoader;
|
||||
use crate::test_util::sst_util::{sst_file_handle, sst_region_metadata};
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn test_minmax_predicate_key_not_built_when_index_result_cache_disabled() {
|
||||
#[derive(Eq, PartialEq, Hash)]
|
||||
struct PanicDebugUdf;
|
||||
|
||||
impl Debug for PanicDebugUdf {
|
||||
fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
panic!("minmax predicate key should not format exprs when cache is disabled");
|
||||
}
|
||||
}
|
||||
|
||||
impl ScalarUDFImpl for PanicDebugUdf {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn name(&self) -> &str {
|
||||
"panic_debug_udf"
|
||||
}
|
||||
|
||||
fn signature(&self) -> &Signature {
|
||||
static SIGNATURE: LazyLock<Signature> =
|
||||
LazyLock::new(|| Signature::variadic_any(Volatility::Immutable));
|
||||
&SIGNATURE
|
||||
}
|
||||
|
||||
fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
|
||||
Ok(DataType::Int64)
|
||||
}
|
||||
|
||||
fn invoke_with_args(
|
||||
&self,
|
||||
_args: ScalarFunctionArgs,
|
||||
) -> datafusion_common::Result<ColumnarValue> {
|
||||
Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(1))))
|
||||
}
|
||||
}
|
||||
|
||||
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
|
||||
let file_handle = sst_file_handle(0, 1);
|
||||
let table_dir = "test_table".to_string();
|
||||
let path_type = PathType::Bare;
|
||||
let file_path = file_handle.file_path(&table_dir, path_type);
|
||||
|
||||
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
|
||||
let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
|
||||
let mut parquet_bytes = Vec::new();
|
||||
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
|
||||
writer.write(&batch).unwrap();
|
||||
writer.close().unwrap();
|
||||
let file_size = parquet_bytes.len() as u64;
|
||||
object_store.write(&file_path, parquet_bytes).await.unwrap();
|
||||
|
||||
let region_metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
|
||||
let read_format =
|
||||
ReadFormat::new(region_metadata, None, false, None, &file_path, false).unwrap();
|
||||
|
||||
let mut cache_metrics = MetadataCacheMetrics::default();
|
||||
let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
|
||||
let parquet_meta = loader.load(&mut cache_metrics).await.unwrap();
|
||||
|
||||
let udf = Arc::new(ScalarUDF::new_from_impl(PanicDebugUdf));
|
||||
let predicate = Predicate::new(vec![Expr::ScalarFunction(ScalarFunction::new_udf(
|
||||
udf,
|
||||
vec![],
|
||||
))]);
|
||||
let builder = ParquetReaderBuilder::new(table_dir, path_type, file_handle, object_store)
|
||||
.predicate(Some(predicate))
|
||||
.cache(CacheStrategy::Disabled);
|
||||
|
||||
let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
|
||||
let total_row_count = parquet_meta.file_metadata().num_rows() as usize;
|
||||
let mut metrics = ReaderFilterMetrics::default();
|
||||
let selection = builder.row_groups_by_minmax(
|
||||
&read_format,
|
||||
&parquet_meta,
|
||||
row_group_size,
|
||||
total_row_count,
|
||||
&mut metrics,
|
||||
false,
|
||||
);
|
||||
|
||||
assert!(!selection.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user