mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 20:40:39 +00:00
only preload from fs or cache
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -398,6 +398,19 @@ impl CacheManager {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the total weighted size of the in-memory SST meta cache.
|
||||
pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
|
||||
self.sst_meta_cache
|
||||
.as_ref()
|
||||
.map(|cache| cache.weighted_size())
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Returns true if the in-memory SST meta cache is enabled.
|
||||
pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
|
||||
self.sst_meta_cache.is_some()
|
||||
}
|
||||
|
||||
/// Gets a vector with repeated value for specific `key`.
|
||||
pub fn get_repeated_vector(
|
||||
&self,
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
use std::any::TypeId;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicI64, AtomicU64};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::{Arc, LazyLock, Mutex};
|
||||
use std::time::Instant;
|
||||
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
@@ -38,6 +38,7 @@ use store_api::metadata::{
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::region_request::PathType;
|
||||
use store_api::storage::{ColumnId, RegionId};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::access_layer::AccessLayer;
|
||||
use crate::cache::CacheManagerRef;
|
||||
@@ -75,6 +76,11 @@ use crate::time_provider::TimeProviderRef;
|
||||
use crate::wal::entry_reader::WalEntryReader;
|
||||
use crate::wal::{EntryId, Wal};
|
||||
|
||||
const PARQUET_META_PRELOAD_CONCURRENCY: usize = 8;
|
||||
|
||||
static PARQUET_META_PRELOAD_SEMAPHORE: LazyLock<Semaphore> =
|
||||
LazyLock::new(|| Semaphore::new(PARQUET_META_PRELOAD_CONCURRENCY));
|
||||
|
||||
/// A fetcher to retrieve partition expr for a region.
|
||||
///
|
||||
/// Compatibility: older regions didn't persist `partition_expr` in engine metadata,
|
||||
@@ -950,19 +956,40 @@ fn maybe_load_cache(
|
||||
///
|
||||
/// This improves the latency of the first query after server start by avoiding large Parquet
|
||||
/// metadata reads on demand.
|
||||
///
|
||||
/// The preload is best-effort:
|
||||
/// - Always tries to warm from the local write cache (file cache) first.
|
||||
/// - If the region storage backend is local filesystem (`Scheme::Fs`), it may also load metadata
|
||||
/// directly from the local store.
|
||||
/// - It will not fetch metadata from remote object stores (S3/GCS/OSS/...).
|
||||
async fn preload_parquet_meta_cache_for_files(
|
||||
region_id: RegionId,
|
||||
cache_manager: CacheManagerRef,
|
||||
sst_meta_cache_capacity: u64,
|
||||
table_dir: String,
|
||||
path_type: PathType,
|
||||
object_store: object_store::ObjectStore,
|
||||
cache_manager: CacheManagerRef,
|
||||
mut files: Vec<FileHandle>,
|
||||
) -> usize {
|
||||
// Load older files first so the most recent files remain hot in the LRU cache.
|
||||
files.sort_by(|a, b| a.meta_ref().time_range.1.cmp(&b.meta_ref().time_range.1));
|
||||
if !cache_manager.sst_meta_cache_enabled()
|
||||
|| sst_meta_cache_capacity == 0
|
||||
|| cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
let allow_direct_load = matches!(object_store.info().scheme(), object_store::Scheme::Fs);
|
||||
|
||||
// Sort by time range so we can prefer preloading newer files first.
|
||||
files.sort_by(|a, b| b.meta_ref().time_range.1.cmp(&a.meta_ref().time_range.1));
|
||||
|
||||
let mut loaded = 0usize;
|
||||
for file_handle in files {
|
||||
// Stop when the shared SST meta cache is full.
|
||||
if cache_manager.sst_meta_cache_weighted_size() >= sst_meta_cache_capacity {
|
||||
break;
|
||||
}
|
||||
|
||||
let file_id = file_handle.file_id();
|
||||
let mut cache_metrics = MetadataCacheMetrics::default();
|
||||
if cache_manager
|
||||
@@ -977,6 +1004,10 @@ async fn preload_parquet_meta_cache_for_files(
|
||||
continue;
|
||||
}
|
||||
|
||||
if !allow_direct_load {
|
||||
continue;
|
||||
}
|
||||
|
||||
let file_size = file_handle.meta_ref().file_size;
|
||||
let file_path = file_handle.file_path(&table_dir, path_type);
|
||||
let loader = MetadataLoader::new(object_store.clone(), &file_path, file_size);
|
||||
@@ -988,7 +1019,7 @@ async fn preload_parquet_meta_cache_for_files(
|
||||
Err(err) => {
|
||||
// Preloading is best-effort. Failure shouldn't affect region open.
|
||||
warn!(
|
||||
err; "Failed to preload parquet metadata, region: {}, file: {}",
|
||||
err; "Failed to preload parquet metadata from local store, region: {}, file: {}",
|
||||
region_id, file_path
|
||||
);
|
||||
}
|
||||
@@ -1006,16 +1037,26 @@ fn maybe_preload_parquet_meta_cache(
|
||||
let Some(cache_manager) = cache_manager else {
|
||||
return;
|
||||
};
|
||||
if !cache_manager.sst_meta_cache_enabled() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Skip if SST meta cache is disabled.
|
||||
if config.sst_meta_cache_size.as_bytes() == 0 {
|
||||
return;
|
||||
}
|
||||
if !config.preload_index_cache {
|
||||
return;
|
||||
}
|
||||
|
||||
let region = region.clone();
|
||||
let cache_manager = cache_manager.clone();
|
||||
let sst_meta_cache_capacity = config.sst_meta_cache_size.as_bytes();
|
||||
|
||||
tokio::spawn(async move {
|
||||
// Safety: semaphore must exist.
|
||||
let _permit = PARQUET_META_PRELOAD_SEMAPHORE.acquire().await.unwrap();
|
||||
|
||||
let region_id = region.region_id;
|
||||
let table_dir = region.access_layer.table_dir().to_string();
|
||||
let path_type = region.access_layer.path_type();
|
||||
@@ -1031,20 +1072,25 @@ fn maybe_preload_parquet_meta_cache(
|
||||
}
|
||||
}
|
||||
}
|
||||
let preloading_start = Instant::now();
|
||||
let loaded = preload_parquet_meta_cache_for_files(
|
||||
region_id,
|
||||
cache_manager,
|
||||
sst_meta_cache_capacity,
|
||||
table_dir,
|
||||
path_type,
|
||||
object_store,
|
||||
cache_manager,
|
||||
files,
|
||||
)
|
||||
.await;
|
||||
let preloading_cost = preloading_start.elapsed();
|
||||
|
||||
if loaded > 0 {
|
||||
info!(
|
||||
"Preloaded parquet metadata for region {}, loaded_files: {}",
|
||||
region_id, loaded
|
||||
"Preloaded parquet metadata for region {}, loaded_files: {}, elapsed_ms: {}",
|
||||
region_id,
|
||||
loaded,
|
||||
preloading_cost.as_millis()
|
||||
);
|
||||
}
|
||||
});
|
||||
@@ -1070,11 +1116,12 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_test_util::temp_dir::create_temp_dir;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::array::{ArrayRef, Int64Array};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use object_store::ObjectStore;
|
||||
use object_store::services::Memory;
|
||||
use object_store::services::{Fs, Memory};
|
||||
use parquet::arrow::ArrowWriter;
|
||||
use store_api::region_request::PathType;
|
||||
use store_api::storage::{FileId, RegionId};
|
||||
@@ -1156,19 +1203,18 @@ mod tests {
|
||||
.is_none()
|
||||
);
|
||||
|
||||
// Provide an empty remote object store (so object-store load should fail if attempted).
|
||||
let missing_store = ObjectStore::new(Memory::default()).unwrap().finish();
|
||||
let loaded = preload_parquet_meta_cache_for_files(
|
||||
region_id,
|
||||
cache_manager.clone(),
|
||||
1024 * 1024,
|
||||
table_dir.to_string(),
|
||||
path_type,
|
||||
missing_store,
|
||||
cache_manager.clone(),
|
||||
source_store.clone(),
|
||||
vec![file_handle],
|
||||
)
|
||||
.await;
|
||||
|
||||
// Should warm the in-memory cache from file cache even if remote store misses.
|
||||
// Should warm the in-memory cache from the local file cache.
|
||||
assert_eq!(loaded, 1);
|
||||
assert!(
|
||||
cache_manager
|
||||
@@ -1178,8 +1224,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_preload_parquet_meta_cache_with_unknown_file_size() {
|
||||
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
|
||||
async fn test_preload_parquet_meta_cache_skips_files_not_in_file_cache() {
|
||||
let cache_manager = Arc::new(
|
||||
CacheManager::builder()
|
||||
.sst_meta_cache_size(1024 * 1024)
|
||||
@@ -1189,15 +1234,7 @@ mod tests {
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let file_id = FileId::random();
|
||||
|
||||
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
|
||||
let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
|
||||
let mut parquet_bytes = Vec::new();
|
||||
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
|
||||
writer.write(&batch).unwrap();
|
||||
writer.close().unwrap();
|
||||
|
||||
// file_size is 0 when it's missing/defaulted in manifests; MetadataLoader::load will stat
|
||||
// the object store to retrieve it.
|
||||
// Without a local file cache entry, preloading should skip the file.
|
||||
let file_meta = FileMeta {
|
||||
region_id,
|
||||
file_id,
|
||||
@@ -1220,8 +1257,11 @@ mod tests {
|
||||
let table_dir = "test_table";
|
||||
let path_type = PathType::Bare;
|
||||
let remote_path = file_handle.file_path(table_dir, path_type);
|
||||
|
||||
// Even if the remote object store has the file, we should not preload from it.
|
||||
let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
|
||||
object_store
|
||||
.write(&remote_path, parquet_bytes)
|
||||
.write(&remote_path, b"noop".as_slice())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -1234,10 +1274,86 @@ mod tests {
|
||||
|
||||
let loaded = preload_parquet_meta_cache_for_files(
|
||||
region_id,
|
||||
cache_manager.clone(),
|
||||
1024 * 1024,
|
||||
table_dir.to_string(),
|
||||
path_type,
|
||||
object_store,
|
||||
vec![file_handle],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(loaded, 0);
|
||||
assert!(
|
||||
cache_manager
|
||||
.get_parquet_meta_data_from_mem_cache(region_file_id)
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_preload_parquet_meta_cache_loads_from_local_fs() {
|
||||
let cache_manager = Arc::new(
|
||||
CacheManager::builder()
|
||||
.sst_meta_cache_size(1024 * 1024)
|
||||
.build(),
|
||||
);
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let file_id = FileId::random();
|
||||
|
||||
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
|
||||
let batch = RecordBatch::try_from_iter([("col", col)]).unwrap();
|
||||
let mut parquet_bytes = Vec::new();
|
||||
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, batch.schema(), None).unwrap();
|
||||
writer.write(&batch).unwrap();
|
||||
writer.close().unwrap();
|
||||
|
||||
// file_size is 0 when it's missing/defaulted in manifests; MetadataLoader::load will stat
|
||||
// the local filesystem to retrieve it.
|
||||
let file_meta = FileMeta {
|
||||
region_id,
|
||||
file_id,
|
||||
time_range: (Timestamp::new_millisecond(0), Timestamp::new_millisecond(1)),
|
||||
level: 0,
|
||||
file_size: 0,
|
||||
max_row_group_uncompressed_size: 0,
|
||||
available_indexes: Default::default(),
|
||||
indexes: vec![],
|
||||
index_file_size: 0,
|
||||
index_version: 0,
|
||||
num_rows: 3,
|
||||
num_row_groups: 1,
|
||||
sequence: None,
|
||||
partition_expr: None,
|
||||
num_series: 0,
|
||||
};
|
||||
let file_handle = FileHandle::new(file_meta, Arc::new(NoopFilePurger));
|
||||
|
||||
let table_dir = "test_table";
|
||||
let path_type = PathType::Bare;
|
||||
let file_path = file_handle.file_path(table_dir, path_type);
|
||||
|
||||
let root = create_temp_dir("parquet-meta-preload");
|
||||
let object_store = ObjectStore::new(Fs::default().root(root.path().to_str().unwrap()))
|
||||
.unwrap()
|
||||
.finish();
|
||||
object_store.write(&file_path, parquet_bytes).await.unwrap();
|
||||
|
||||
let region_file_id = file_handle.file_id();
|
||||
assert!(
|
||||
cache_manager
|
||||
.get_parquet_meta_data_from_mem_cache(region_file_id)
|
||||
.is_none()
|
||||
);
|
||||
|
||||
let loaded = preload_parquet_meta_cache_for_files(
|
||||
region_id,
|
||||
cache_manager.clone(),
|
||||
1024 * 1024,
|
||||
table_dir.to_string(),
|
||||
path_type,
|
||||
object_store,
|
||||
vec![file_handle],
|
||||
)
|
||||
.await;
|
||||
|
||||
Reference in New Issue
Block a user