feat!: download file to fill the cache on write cache miss (#7294)

* feat: download inverted index file

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: download for bloom and fulltext

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: implement maybe_download_background for FileCache

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: load file for parquet

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: reduce channel size

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: use ManifestCache

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: pass cache to ManifestObjectStore::new

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix fmt and clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove manifest cache ttl

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove read cache

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: clean old read cache path

Signed-off-by: evenyag <realevenyag@gmail.com>

* docs: update config

Signed-off-by: evenyag <realevenyag@gmail.com>

* docs: update config examples

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update test

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix CI

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: also clean the root directory

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update manifest test

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix compiler errors

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: skip file if it exists

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: remove warn in replace

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add a flag to enable/disable background download

set the concurrency to 1 for background download

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename write_cache_enable_background_download to enable_refill_cache_on_read

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update config test

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: address comments

Signed-off-by: evenyag <realevenyag@gmail.com>

* docs: update config.md

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fmt code

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-12-16 16:31:26 +08:00
committed by GitHub
parent 2dfcf35fee
commit f6afb10e33
27 changed files with 479 additions and 939 deletions

9
Cargo.lock generated
View File

@@ -7330,12 +7330,6 @@ dependencies = [
"digest",
]
[[package]]
name = "md5"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "md5"
version = "0.8.0"
@@ -8399,7 +8393,6 @@ dependencies = [
"futures",
"humantime-serde",
"lazy_static",
"md5 0.7.0",
"moka",
"opendal",
"prometheus",
@@ -9246,7 +9239,7 @@ dependencies = [
"futures",
"hex",
"lazy-regex",
"md5 0.8.0",
"md5",
"postgres-types",
"rand 0.9.1",
"ring",

View File

@@ -108,9 +108,6 @@
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.enable_read_cache` | Bool | `true` | Whether to enable read cache. If not set, the read cache will be enabled by default when using object storage. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
| `storage.access_key_id` | String | Unset | The access key id of the aws account.<br/>It's **highly recommended** to use AWS IAM roles instead of hardcoding the access key id and secret key.<br/>**It's only used when the storage type is `S3` and `Oss`**. |
@@ -156,6 +153,8 @@
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.preload_index_cache` | Bool | `true` | Preload index (puffin) files into cache on region open (default: true).<br/>When enabled, index files are loaded into the write cache during region initialization,<br/>which can improve query performance at the cost of longer startup times. |
| `region_engine.mito.index_cache_percent` | Integer | `20` | Percentage of write cache capacity allocated for index (puffin) files (default: 20).<br/>The remaining capacity is used for data (parquet) files.<br/>Must be between 0 and 100 (exclusive). For example, with a 5GiB write cache and 20% allocation,<br/>1GiB is reserved for index files and 4GiB for data files. |
| `region_engine.mito.enable_refill_cache_on_read` | Bool | `true` | Enable refilling cache on read operations (default: true).<br/>When disabled, cache refilling on read won't happen. |
| `region_engine.mito.manifest_cache_size` | String | `256MB` | Capacity for manifest cache (default: 256MB). |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
@@ -488,9 +487,6 @@
| `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.enable_read_cache` | Bool | `true` | Whether to enable read cache. If not set, the read cache will be enabled by default when using object storage. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
| `storage.access_key_id` | String | Unset | The access key id of the aws account.<br/>It's **highly recommended** to use AWS IAM roles instead of hardcoding the access key id and secret key.<br/>**It's only used when the storage type is `S3` and `Oss`**. |
@@ -538,6 +534,8 @@
| `region_engine.mito.write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.preload_index_cache` | Bool | `true` | Preload index (puffin) files into cache on region open (default: true).<br/>When enabled, index files are loaded into the write cache during region initialization,<br/>which can improve query performance at the cost of longer startup times. |
| `region_engine.mito.index_cache_percent` | Integer | `20` | Percentage of write cache capacity allocated for index (puffin) files (default: 20).<br/>The remaining capacity is used for data (parquet) files.<br/>Must be between 0 and 100 (exclusive). For example, with a 5GiB write cache and 20% allocation,<br/>1GiB is reserved for index files and 4GiB for data files. |
| `region_engine.mito.enable_refill_cache_on_read` | Bool | `true` | Enable refilling cache on read operations (default: true).<br/>When disabled, cache refilling on read won't happen. |
| `region_engine.mito.manifest_cache_size` | String | `256MB` | Capacity for manifest cache (default: 256MB). |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |

View File

@@ -281,18 +281,6 @@ data_home = "./greptimedb_data"
## - `Oss`: the data is stored in the Aliyun OSS.
type = "File"
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
## A local file directory, defaults to `{data_home}`. An empty string means disabling.
## @toml2docs:none-default
#+ cache_path = ""
## Whether to enable read cache. If not set, the read cache will be enabled by default when using object storage.
#+ enable_read_cache = true
## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger.
## @toml2docs:none-default
cache_capacity = "5GiB"
## The S3 bucket name.
## **It's only used when the storage type is `S3`, `Oss` and `Gcs`**.
## @toml2docs:none-default
@@ -516,6 +504,13 @@ preload_index_cache = true
## 1GiB is reserved for index files and 4GiB for data files.
index_cache_percent = 20
## Enable refilling cache on read operations (default: true).
## When disabled, cache refilling on read won't happen.
enable_refill_cache_on_read = true
## Capacity for manifest cache (default: 256MB).
manifest_cache_size = "256MB"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"

View File

@@ -388,18 +388,6 @@ data_home = "./greptimedb_data"
## - `Oss`: the data is stored in the Aliyun OSS.
type = "File"
## Whether to enable read cache. If not set, the read cache will be enabled by default when using object storage.
#+ enable_read_cache = true
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
## A local file directory, defaults to `{data_home}`. An empty string means disabling.
## @toml2docs:none-default
#+ cache_path = ""
## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger.
## @toml2docs:none-default
cache_capacity = "5GiB"
## The S3 bucket name.
## **It's only used when the storage type is `S3`, `Oss` and `Gcs`**.
## @toml2docs:none-default
@@ -610,6 +598,13 @@ preload_index_cache = true
## 1GiB is reserved for index files and 4GiB for data files.
index_cache_percent = 20
## Enable refilling cache on read operations (default: true).
## When disabled, cache refilling on read won't happen.
enable_refill_cache_on_read = true
## Capacity for manifest cache (default: 256MB).
manifest_cache_size = "256MB"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"

View File

@@ -410,14 +410,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to build cache store"))]
BuildCacheStore {
#[snafu(source)]
error: object_store::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Not yet implemented: {what}"))]
NotYetImplemented { what: String },
}
@@ -493,7 +485,6 @@ impl ErrorExt for Error {
SerializeJson { .. } => StatusCode::Internal,
ObjectStore { source, .. } => source.status_code(),
BuildCacheStore { .. } => StatusCode::StorageUnavailable,
}
}

View File

@@ -14,15 +14,10 @@
//! object storage utilities
use std::sync::Arc;
use common_telemetry::info;
use object_store::config::ObjectStorageCacheConfig;
use common_telemetry::{info, warn};
use object_store::factory::new_raw_object_store;
use object_store::layers::LruCacheLayer;
use object_store::services::Fs;
use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers, with_retry_layers};
use object_store::{ATOMIC_WRITE_DIR, Access, ObjectStore, ObjectStoreBuilder};
use object_store::{ATOMIC_WRITE_DIR, ObjectStore};
use snafu::prelude::*;
use crate::config::ObjectStoreConfig;
@@ -47,23 +42,58 @@ pub(crate) async fn new_object_store_without_cache(
Ok(object_store)
}
/// Cleans up old LRU read cache directories that were removed.
fn clean_old_read_cache(store: &ObjectStoreConfig, data_home: &str) {
if !store.is_object_storage() {
return;
}
let Some(cache_config) = store.cache_config() else {
return;
};
// Only cleans if read cache was enabled
if !cache_config.enable_read_cache {
return;
}
let cache_base_dir = if cache_config.cache_path.is_empty() {
data_home
} else {
&cache_config.cache_path
};
// Cleans up the old read cache directory
let old_read_cache_dir = join_dir(cache_base_dir, "cache/object/read");
info!(
"Cleaning up old read cache directory: {}",
old_read_cache_dir
);
if let Err(e) = clean_temp_dir(&old_read_cache_dir) {
warn!(e; "Failed to clean old read cache directory {}", old_read_cache_dir);
}
// Cleans up the atomic temp dir used by the cache layer
let cache_atomic_temp_dir = join_dir(cache_base_dir, ATOMIC_WRITE_DIR);
info!(
"Cleaning up old cache atomic temp directory: {}",
cache_atomic_temp_dir
);
if let Err(e) = clean_temp_dir(&cache_atomic_temp_dir) {
warn!(e; "Failed to clean old cache atomic temp directory {}", cache_atomic_temp_dir);
}
}
pub async fn new_object_store(store: ObjectStoreConfig, data_home: &str) -> Result<ObjectStore> {
// Cleans up old LRU read cache directories.
// TODO: Remove this line after the 1.0 release.
clean_old_read_cache(&store, data_home);
let object_store = new_raw_object_store(&store, data_home)
.await
.context(error::ObjectStoreSnafu)?;
// Enable retry layer and cache layer for non-fs object storages
// Enables retry layer for non-fs object storages
let object_store = if store.is_object_storage() {
let object_store = {
// It's safe to unwrap here because we already checked above.
let cache_config = store.cache_config().unwrap();
if let Some(cache_layer) = build_cache_layer(cache_config, data_home).await? {
// Adds cache layer
object_store.layer(cache_layer)
} else {
object_store
}
};
// Adds retry layer
with_retry_layers(object_store)
} else {
@@ -73,40 +103,3 @@ pub async fn new_object_store(store: ObjectStoreConfig, data_home: &str) -> Resu
let object_store = with_instrument_layers(object_store, true);
Ok(object_store)
}
async fn build_cache_layer(
cache_config: &ObjectStorageCacheConfig,
data_home: &str,
) -> Result<Option<LruCacheLayer<impl Access>>> {
// No need to build cache layer if read cache is disabled.
if !cache_config.enable_read_cache {
return Ok(None);
}
let cache_base_dir = if cache_config.cache_path.is_empty() {
data_home
} else {
&cache_config.cache_path
};
let atomic_temp_dir = join_dir(cache_base_dir, ATOMIC_WRITE_DIR);
clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
let cache_store = Fs::default()
.root(cache_base_dir)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::BuildCacheStoreSnafu)?;
let cache_layer = LruCacheLayer::new(
Arc::new(cache_store),
cache_config.cache_capacity.0 as usize,
)
.context(error::BuildCacheStoreSnafu)?;
cache_layer.recover_cache(false).await;
info!(
"Enabled local object storage cache, path: {}, capacity: {}.",
cache_config.cache_path, cache_config.cache_capacity
);
Ok(Some(cache_layer))
}

View File

@@ -34,6 +34,7 @@ use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef}
use index::result_cache::IndexResultCache;
use moka::notification::RemovalCause;
use moka::sync::Cache;
use object_store::ObjectStore;
use parquet::file::metadata::ParquetMetaData;
use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
@@ -263,6 +264,26 @@ impl CacheStrategy {
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
/// Triggers download if the strategy is [CacheStrategy::EnableAll] and write cache is available.
pub fn maybe_download_background(
&self,
index_key: IndexKey,
remote_path: String,
remote_store: ObjectStore,
file_size: u64,
) {
if let CacheStrategy::EnableAll(cache_manager) = self
&& let Some(write_cache) = cache_manager.write_cache()
{
write_cache.file_cache().maybe_download_background(
index_key,
remote_path,
remote_store,
file_size,
);
}
}
}
/// Manages cached data for the engine.

View File

@@ -31,7 +31,7 @@ use object_store::{ErrorKind, ObjectStore, Reader};
use parquet::file::metadata::ParquetMetaData;
use snafu::ResultExt;
use store_api::storage::{FileId, RegionId};
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::{Sender, UnboundedReceiver};
use crate::access_layer::TempFileCleaner;
use crate::cache::{FILE_TYPE, INDEX_TYPE};
@@ -55,6 +55,17 @@ pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
/// Minimum capacity for each cache (512MB).
const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
/// Channel capacity for background download tasks.
const DOWNLOAD_TASK_CHANNEL_SIZE: usize = 64;
/// A task to download a file in the background.
struct DownloadTask {
index_key: IndexKey,
remote_path: String,
remote_store: ObjectStore,
file_size: u64,
}
/// Inner struct for FileCache that can be used in spawned tasks.
#[derive(Debug)]
struct FileCacheInner {
@@ -170,8 +181,8 @@ impl FileCacheInner {
remote_path: &str,
remote_store: &ObjectStore,
file_size: u64,
concurrency: usize,
) -> Result<()> {
const DOWNLOAD_READER_CONCURRENCY: usize = 8;
const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
let file_type = index_key.file_type;
@@ -184,7 +195,7 @@ impl FileCacheInner {
let reader = remote_store
.reader_with(remote_path)
.concurrent(DOWNLOAD_READER_CONCURRENCY)
.concurrent(concurrency)
.chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
.await
.context(error::OpenDalSnafu)?
@@ -238,11 +249,14 @@ impl FileCacheInner {
remote_path: &str,
remote_store: &ObjectStore,
file_size: u64,
concurrency: usize,
) -> Result<()> {
if let Err(e) = self
.download_without_cleaning(index_key, remote_path, remote_store, file_size)
.download_without_cleaning(index_key, remote_path, remote_store, file_size, concurrency)
.await
{
error!(e; "Failed to download file '{}' for region {}", remote_path, index_key.region_id);
let filename = index_key.to_string();
TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
@@ -251,6 +265,11 @@ impl FileCacheInner {
Ok(())
}
/// Checks if the key is in the file cache.
fn contains_key(&self, key: &IndexKey) -> bool {
self.memory_index(key.file_type).contains_key(key)
}
}
/// A file cache manages files on local store and evict files based
@@ -261,6 +280,8 @@ pub(crate) struct FileCache {
inner: Arc<FileCacheInner>,
/// Capacity of the puffin (index) cache in bytes.
puffin_capacity: u64,
/// Channel for background download tasks. None if background worker is disabled.
download_task_tx: Option<Sender<DownloadTask>>,
}
pub(crate) type FileCacheRef = Arc<FileCache>;
@@ -272,6 +293,7 @@ impl FileCache {
capacity: ReadableSize,
ttl: Option<Duration>,
index_cache_percent: Option<u8>,
enable_background_worker: bool,
) -> FileCache {
// Validate and use the provided percent or default
let index_percent = index_cache_percent
@@ -306,12 +328,54 @@ impl FileCache {
puffin_index,
});
// Only create channel and spawn worker if background download is enabled
let download_task_tx = if enable_background_worker {
let (tx, rx) = tokio::sync::mpsc::channel(DOWNLOAD_TASK_CHANNEL_SIZE);
Self::spawn_download_worker(inner.clone(), rx);
Some(tx)
} else {
None
};
FileCache {
inner,
puffin_capacity,
download_task_tx,
}
}
/// Spawns a background worker to process download tasks.
fn spawn_download_worker(
inner: Arc<FileCacheInner>,
mut download_task_rx: tokio::sync::mpsc::Receiver<DownloadTask>,
) {
tokio::spawn(async move {
info!("Background download worker started");
while let Some(task) = download_task_rx.recv().await {
// Check if the file is already in the cache
if inner.contains_key(&task.index_key) {
debug!(
"Skipping background download for region {}, file {} - already in cache",
task.index_key.region_id, task.index_key.file_id
);
continue;
}
// Ignores background download errors.
let _ = inner
.download(
task.index_key,
&task.remote_path,
&task.remote_store,
task.file_size,
1, // Background downloads use concurrency=1
)
.await;
}
info!("Background download worker stopped");
});
}
/// Builds a cache for a specific file type.
fn build_cache(
local_store: ObjectStore,
@@ -333,11 +397,9 @@ impl FileCache {
let file_path = cache_file_path(FILE_DIR, *key);
async move {
if let RemovalCause::Replaced = cause {
// The cache is replaced by another file. This is unexpected, we don't remove the same
// The cache is replaced by another file (maybe download again). We don't remove the same
// file but updates the metrics as the file is already replaced by users.
CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
// TODO(yingwen): Don't log warn later.
warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
return;
}
@@ -553,7 +615,7 @@ impl FileCache {
/// Checks if the key is in the file cache.
pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
self.inner.memory_index(key.file_type).contains_key(key)
self.inner.contains_key(key)
}
/// Returns the capacity of the puffin (index) cache in bytes.
@@ -576,9 +638,42 @@ impl FileCache {
file_size: u64,
) -> Result<()> {
self.inner
.download(index_key, remote_path, remote_store, file_size)
.download(index_key, remote_path, remote_store, file_size, 8) // Foreground uses concurrency=8
.await
}
/// Downloads a file in `remote_path` from the remote object store to the local cache
/// (specified by `index_key`) in the background. Errors are logged but not returned.
///
/// This method attempts to send a download task to the background worker.
/// If the channel is full, the task is silently dropped.
pub(crate) fn maybe_download_background(
&self,
index_key: IndexKey,
remote_path: String,
remote_store: ObjectStore,
file_size: u64,
) {
// Do nothing if background worker is disabled (channel is None)
let Some(tx) = &self.download_task_tx else {
return;
};
let task = DownloadTask {
index_key,
remote_path,
remote_store,
file_size,
};
// Try to send the task; if the channel is full, just drop it
if let Err(e) = tx.try_send(task) {
debug!(
"Failed to queue background download task for region {}, file {}: {:?}",
index_key.region_id, index_key.file_id, e
);
}
}
}
/// Key of file cache index.
@@ -708,6 +803,7 @@ mod tests {
ReadableSize::mb(10),
Some(Duration::from_millis(10)),
None,
true, // enable_background_worker
);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
@@ -744,7 +840,13 @@ mod tests {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
let cache = FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
None,
None,
true, // enable_background_worker
);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
@@ -792,7 +894,13 @@ mod tests {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
let cache = FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
None,
None,
true, // enable_background_worker
);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
@@ -824,7 +932,13 @@ mod tests {
async fn test_file_cache_recover() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
let cache = FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
None,
None,
true, // enable_background_worker
);
let region_id = RegionId::new(2000, 0);
let file_type = FileType::Parquet;
@@ -850,7 +964,13 @@ mod tests {
}
// Recover the cache.
let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
let cache = FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
None,
None,
true, // enable_background_worker
);
// No entry before recovery.
assert!(
cache
@@ -879,7 +999,13 @@ mod tests {
async fn test_file_cache_read_ranges() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
let file_cache = FileCache::new(
local_store.clone(),
ReadableSize::mb(10),
None,
None,
true, // enable_background_worker
);
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = IndexKey::new(region_id, file_id, FileType::Parquet);

View File

@@ -370,7 +370,22 @@ impl ManifestCache {
/// If `check_mtime` is true, only removes directories that have not been modified
/// for at least 1 hour.
fn clean_empty_dirs_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<()> {
Self::remove_empty_dirs_recursive_sync(dir, check_mtime)?;
let is_empty = Self::remove_empty_dirs_recursive_sync(dir, check_mtime)?;
if is_empty {
if let Err(e) = std::fs::remove_dir(dir) {
if e.kind() != std::io::ErrorKind::NotFound {
warn!(e; "Failed to remove empty root dir {}", dir.display());
return Err(e);
} else {
warn!("Empty root dir not found before removal {}", dir.display());
}
} else {
info!(
"Removed empty root dir {} from manifest cache",
dir.display()
);
}
}
Ok(())
}
@@ -412,11 +427,16 @@ impl ManifestCache {
let subdir_empty = Self::remove_empty_dirs_recursive_sync(&path, check_mtime)?;
if subdir_empty {
if let Err(e) = std::fs::remove_dir(&path)
&& e.kind() != std::io::ErrorKind::NotFound
{
warn!(e; "Failed to remove empty directory {}", path.display());
is_empty = false;
if let Err(e) = std::fs::remove_dir(&path) {
if e.kind() != std::io::ErrorKind::NotFound {
warn!(e; "Failed to remove empty directory {}", path.display());
is_empty = false;
} else {
info!(
"Empty directory {} not found before removal",
path.display()
);
}
} else {
info!(
"Removed empty directory {} from manifest cache",
@@ -571,4 +591,116 @@ mod tests {
cache.cache_file_path("region_1/manifest/00000000000000000007.checkpoint")
);
}
#[tokio::test]
async fn test_clean_empty_dirs_sync_no_mtime_check() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("");
let root = PathBuf::from(dir.path());
// Create a directory structure:
// root/
// empty_dir1/
// empty_dir2/
// empty_subdir/
// non_empty_dir/
// file.txt
// nested/
// empty_subdir1/
// non_empty_subdir/
// file.txt
let empty_dir1 = root.join("empty_dir1");
let empty_dir2 = root.join("empty_dir2");
let empty_subdir = empty_dir2.join("empty_subdir");
let non_empty_dir = root.join("non_empty_dir");
let nested = root.join("nested");
let nested_empty = nested.join("empty_subdir1");
let nested_non_empty = nested.join("non_empty_subdir");
// Create directories
std::fs::create_dir_all(&empty_dir1).unwrap();
std::fs::create_dir_all(&empty_subdir).unwrap();
std::fs::create_dir_all(&non_empty_dir).unwrap();
std::fs::create_dir_all(&nested_empty).unwrap();
std::fs::create_dir_all(&nested_non_empty).unwrap();
// Create files in non-empty directories
std::fs::write(non_empty_dir.join("file.txt"), b"content").unwrap();
std::fs::write(nested_non_empty.join("file.txt"), b"content").unwrap();
// Verify initial state
assert!(empty_dir1.exists());
assert!(empty_dir2.exists());
assert!(empty_subdir.exists());
assert!(non_empty_dir.exists());
assert!(nested.exists());
assert!(nested_empty.exists());
assert!(nested_non_empty.exists());
// Clean empty directories with check_mtime = false
ManifestCache::clean_empty_dirs_sync(&root, false).unwrap();
// Verify empty directories are removed
assert!(!empty_dir1.exists());
assert!(!empty_dir2.exists());
assert!(!empty_subdir.exists());
assert!(!nested_empty.exists());
// Verify non-empty directories still exist
assert!(non_empty_dir.exists());
assert!(non_empty_dir.join("file.txt").exists());
assert!(nested.exists());
assert!(nested_non_empty.exists());
assert!(nested_non_empty.join("file.txt").exists());
}
#[tokio::test]
async fn test_clean_empty_dirs_sync_with_mtime_check() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("");
let root = PathBuf::from(dir.path());
// Create a directory structure with recently created empty directories
// root/
// empty_dir1/
// empty_dir2/
// empty_subdir/
// non_empty_dir/
// file.txt
let empty_dir1 = root.join("empty_dir1");
let empty_dir2 = root.join("empty_dir2");
let empty_subdir = empty_dir2.join("empty_subdir");
let non_empty_dir = root.join("non_empty_dir");
// Create directories
std::fs::create_dir_all(&empty_dir1).unwrap();
std::fs::create_dir_all(&empty_subdir).unwrap();
std::fs::create_dir_all(&non_empty_dir).unwrap();
// Create file in non-empty directory
std::fs::write(non_empty_dir.join("file.txt"), b"content").unwrap();
// Verify initial state
assert!(empty_dir1.exists());
assert!(empty_dir2.exists());
assert!(empty_subdir.exists());
assert!(non_empty_dir.exists());
// Clean empty directories with check_mtime = true
// Since the directories were just created (mtime < 1 hour), they should NOT be removed
ManifestCache::clean_empty_dirs_sync(&root, true).unwrap();
// Verify empty directories are NOT removed (they're too recent)
assert!(empty_dir1.exists());
assert!(empty_dir2.exists());
assert!(empty_subdir.exists());
// Verify non-empty directory still exists
assert!(non_empty_dir.exists());
assert!(non_empty_dir.join("file.txt").exists());
}
}

View File

@@ -63,11 +63,13 @@ pub type WriteCacheRef = Arc<WriteCache>;
impl WriteCache {
/// Create the cache with a `local_store` to cache files and a
/// `object_store_manager` for all object stores.
#[allow(clippy::too_many_arguments)]
pub async fn new(
local_store: ObjectStore,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
index_cache_percent: Option<u8>,
enable_background_worker: bool,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
manifest_cache: Option<ManifestCache>,
@@ -79,6 +81,7 @@ impl WriteCache {
cache_capacity,
ttl,
index_cache_percent,
enable_background_worker,
));
file_cache.recover(false, Some(task_receiver)).await;
@@ -92,11 +95,13 @@ impl WriteCache {
}
/// Creates a write cache based on local fs.
#[allow(clippy::too_many_arguments)]
pub async fn new_fs(
cache_dir: &str,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
index_cache_percent: Option<u8>,
enable_background_worker: bool,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
manifest_cache_capacity: ReadableSize,
@@ -117,6 +122,7 @@ impl WriteCache {
cache_capacity,
ttl,
index_cache_percent,
enable_background_worker,
puffin_manager_factory,
intermediate_manager,
manifest_cache,

View File

@@ -131,6 +131,11 @@ pub struct MitoConfig {
/// The remaining capacity is used for data (parquet) files.
/// Must be between 0 and 100 (exclusive).
pub index_cache_percent: u8,
/// Enable background downloading of files to the local cache when accessed during queries (default: true).
/// When enabled, files will be asynchronously downloaded to improve performance for subsequent reads.
pub enable_refill_cache_on_read: bool,
/// Capacity for manifest cache (default: 256MB).
pub manifest_cache_size: ReadableSize,
// Other configs:
/// Buffer size for SST writing.
@@ -198,6 +203,8 @@ impl Default for MitoConfig {
write_cache_ttl: None,
preload_index_cache: true,
index_cache_percent: DEFAULT_INDEX_CACHE_PERCENT,
enable_refill_cache_on_read: true,
manifest_cache_size: ReadableSize::mb(256),
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,

View File

@@ -160,6 +160,8 @@ async fn test_index_build_type_flush() {
#[tokio::test]
async fn test_index_build_type_compact() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("test_index_build_type_compact_").await;
let listener = Arc::new(IndexBuildListener::default());
let engine = env

View File

@@ -157,6 +157,8 @@ impl ManifestObjectStore {
total_manifest_size: Arc<AtomicU64>,
manifest_cache: Option<ManifestCache>,
) -> Self {
common_telemetry::info!("Create manifest store, cache: {}", manifest_cache.is_some());
let path = util::normalize_dir(path);
let staging_path = {
// Convert "region_dir/manifest/" to "region_dir/staging/manifest/"

View File

@@ -32,6 +32,7 @@ use datatypes::arrow::array::BinaryArray;
use datatypes::arrow::record_batch::RecordBatch;
use mito_codec::index::IndexValuesCodec;
use mito_codec::row_converter::CompositeValues;
use object_store::ObjectStore;
use puffin_manager::SstPuffinManager;
use smallvec::{SmallVec, smallvec};
use snafu::{OptionExt, ResultExt};
@@ -42,7 +43,7 @@ use strum::IntoStaticStr;
use tokio::sync::mpsc::Sender;
use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
use crate::error::{
@@ -76,6 +77,30 @@ pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
/// Triggers background download of an index file to the local cache.
pub(crate) fn trigger_index_background_download(
file_cache: Option<&FileCacheRef>,
file_id: &RegionIndexId,
file_size_hint: Option<u64>,
path_factory: &RegionFilePathFactory,
object_store: &ObjectStore,
) {
if let (Some(file_cache), Some(file_size)) = (file_cache, file_size_hint) {
let index_key = IndexKey::new(
file_id.region_id(),
file_id.file_id(),
FileType::Puffin(file_id.version),
);
let remote_path = path_factory.build_index_file_path(file_id.file_id);
file_cache.maybe_download_background(
index_key,
remote_path,
object_store.clone(),
file_size,
);
}
}
/// Output of the index creation.
#[derive(Debug, Clone, Default)]
pub struct IndexOutput {
@@ -1794,6 +1819,7 @@ mod tests {
ReadableSize::mb(10),
None,
None,
true, // enable_background_worker
factory,
intm_manager,
ReadableSize::mb(10),

View File

@@ -45,10 +45,10 @@ use crate::error::{
};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::RegionIndexId;
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::{TYPE_BLOOM_FILTER_INDEX, trigger_index_background_download};
/// Metrics for tracking bloom filter index apply operations.
#[derive(Default, Clone)]
@@ -378,12 +378,20 @@ impl BloomFilterIndexApplier {
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<BlobReader> {
let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
// Trigger background download if file cache and file size are available
trigger_index_background_download(
self.file_cache.as_ref(),
&file_id,
file_size_hint,
&path_factory,
&self.object_store,
);
let puffin_manager = self
.puffin_manager_factory
.build(
self.object_store.clone(),
RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
)
.build(self.object_store.clone(), path_factory)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let blob_name = Self::column_blob_name(column_id);

View File

@@ -45,12 +45,12 @@ use crate::error::{
};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::RegionIndexId;
use crate::sst::index::TYPE_FULLTEXT_INDEX;
use crate::sst::index::fulltext_index::applier::builder::{FulltextRequest, FulltextTerm};
use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
use crate::sst::index::puffin_manager::{
PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
};
use crate::sst::index::{TYPE_FULLTEXT_INDEX, trigger_index_background_download};
pub mod builder;
@@ -748,12 +748,20 @@ impl IndexSource {
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<SstPuffinReader> {
let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
// Trigger background download if file cache and file size are available
trigger_index_background_download(
self.file_cache.as_ref(),
&file_id,
file_size_hint,
&path_factory,
&self.remote_store,
);
let puffin_manager = self
.puffin_manager_factory
.build(
self.remote_store.clone(),
RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
)
.build(self.remote_store.clone(), path_factory)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let reader = puffin_manager

View File

@@ -41,9 +41,9 @@ use crate::error::{
};
use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
use crate::sst::file::RegionIndexId;
use crate::sst::index::TYPE_INVERTED_INDEX;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::{TYPE_INVERTED_INDEX, trigger_index_background_download};
/// Metrics for tracking inverted index apply operations.
#[derive(Default, Clone)]
@@ -311,12 +311,20 @@ impl InvertedIndexApplier {
file_id: RegionIndexId,
file_size_hint: Option<u64>,
) -> Result<BlobReader> {
let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
// Trigger background download if file cache and file size are available
trigger_index_background_download(
self.file_cache.as_ref(),
&file_id,
file_size_hint,
&path_factory,
&self.store,
);
let puffin_manager = self
.puffin_manager_factory
.build(
self.store.clone(),
RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
)
.build(self.store.clone(), path_factory)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
puffin_manager

View File

@@ -269,7 +269,7 @@ impl ParquetReaderBuilder {
let file_size = self.file_handle.meta_ref().file_size;
// Loads parquet metadata of the file.
let parquet_meta = self
let (parquet_meta, cache_miss) = self
.read_parquet_metadata(&file_path, file_size, &mut metrics.metadata_cache_metrics)
.await?;
// Decodes region metadata.
@@ -326,6 +326,22 @@ impl ParquetReaderBuilder {
.row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
.await;
// Trigger background download if metadata had a cache miss and selection is not empty
if cache_miss && !selection.is_empty() {
use crate::cache::file_cache::{FileType, IndexKey};
let index_key = IndexKey::new(
self.file_handle.region_id(),
self.file_handle.file_id().file_id(),
FileType::Parquet,
);
self.cache_strategy.maybe_download_background(
index_key,
file_path.clone(),
self.object_store.clone(),
file_size,
);
}
let reader_builder = RowGroupReaderBuilder {
file_handle: self.file_handle.clone(),
file_path,
@@ -395,12 +411,13 @@ impl ParquetReaderBuilder {
}
/// Reads parquet metadata of specific file.
/// Returns (metadata, cache_miss_flag).
async fn read_parquet_metadata(
&self,
file_path: &str,
file_size: u64,
cache_metrics: &mut MetadataCacheMetrics,
) -> Result<Arc<ParquetMetaData>> {
) -> Result<(Arc<ParquetMetaData>, bool)> {
let start = Instant::now();
let _t = READ_STAGE_ELAPSED
.with_label_values(&["read_parquet_metadata"])
@@ -414,7 +431,7 @@ impl ParquetReaderBuilder {
.await
{
cache_metrics.metadata_load_cost += start.elapsed();
return Ok(metadata);
return Ok((metadata, false));
}
// Cache miss, load metadata directly.
@@ -427,7 +444,7 @@ impl ParquetReaderBuilder {
.put_parquet_meta_data(file_id, metadata.clone());
cache_metrics.metadata_load_cost += start.elapsed();
Ok(metadata)
Ok((metadata, true))
}
/// Computes row groups to read, along with their respective row selections.

View File

@@ -655,6 +655,7 @@ impl TestEnv {
capacity,
None,
None,
true, // enable_background_worker
self.puffin_manager.clone(),
self.intermediate_manager.clone(),
None, // manifest_cache
@@ -676,6 +677,7 @@ impl TestEnv {
capacity,
None,
None,
true, // enable_background_worker
self.puffin_manager.clone(),
self.intermediate_manager.clone(),
ReadableSize::mb(0), // manifest_cache_capacity

View File

@@ -38,7 +38,6 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use common_base::Plugins;
use common_base::readable_size::ReadableSize;
use common_error::ext::BoxedError;
use common_meta::key::SchemaMetadataManagerRef;
use common_runtime::JoinHandle;
@@ -481,10 +480,10 @@ pub async fn write_cache_from_config(
config.write_cache_size,
config.write_cache_ttl,
Some(config.index_cache_percent),
config.enable_refill_cache_on_read,
puffin_manager_factory,
intermediate_manager,
// TODO(yingwen): Enable manifest cache after removing read cache.
ReadableSize(0),
config.manifest_cache_size,
)
.await?;
Ok(Some(Arc::new(cache)))

View File

@@ -21,7 +21,6 @@ derive_builder = { workspace = true, optional = true }
futures.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
md5 = "0.7"
moka = { workspace = true, features = ["future"] }
opendal = { version = "0.54", features = [
"layers-tracing",

View File

@@ -12,11 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod lru_cache;
#[cfg(feature = "testing")]
pub mod mock;
pub use lru_cache::*;
pub use opendal::layers::*;
pub use prometheus::build_prometheus_metrics_layer;

View File

@@ -1,134 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use opendal::Result;
use opendal::raw::oio::Reader;
use opendal::raw::{
Access, Layer, LayeredAccess, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite,
};
mod read_cache;
use std::time::Instant;
use common_telemetry::{error, info};
use read_cache::ReadCache;
use crate::layers::lru_cache::read_cache::CacheAwareDeleter;
/// An opendal layer with local LRU file cache supporting.
pub struct LruCacheLayer<C: Access> {
// The read cache
read_cache: ReadCache<C>,
}
impl<C: Access> Clone for LruCacheLayer<C> {
fn clone(&self) -> Self {
Self {
read_cache: self.read_cache.clone(),
}
}
}
impl<C: Access> LruCacheLayer<C> {
/// Create a [`LruCacheLayer`] with local file cache and capacity in bytes.
pub fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
let read_cache = ReadCache::new(file_cache, capacity);
Ok(Self { read_cache })
}
/// Recovers cache
pub async fn recover_cache(&self, sync: bool) {
let now = Instant::now();
let moved_read_cache = self.read_cache.clone();
let handle = tokio::spawn(async move {
match moved_read_cache.recover_cache().await {
Ok((entries, bytes)) => info!(
"Recovered {} entries and total size {} in bytes for LruCacheLayer, cost: {:?}",
entries,
bytes,
now.elapsed()
),
Err(err) => error!(err; "Failed to recover file cache."),
}
});
if sync {
let _ = handle.await;
}
}
/// Returns true when the local cache contains the specific file
pub async fn contains_file(&self, path: &str) -> bool {
self.read_cache.contains_file(path).await
}
/// Returns the read cache statistics info `(EntryCount, SizeInBytes)`.
pub async fn read_cache_stat(&self) -> (u64, u64) {
self.read_cache.cache_stat().await
}
}
impl<I: Access, C: Access> Layer<I> for LruCacheLayer<C> {
type LayeredAccess = LruCacheAccess<I, C>;
fn layer(&self, inner: I) -> Self::LayeredAccess {
LruCacheAccess {
inner,
read_cache: self.read_cache.clone(),
}
}
}
#[derive(Debug)]
pub struct LruCacheAccess<I, C> {
inner: I,
read_cache: ReadCache<C>,
}
impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
type Inner = I;
type Reader = Reader;
type Writer = I::Writer;
type Lister = I::Lister;
type Deleter = CacheAwareDeleter<C, I::Deleter>;
fn inner(&self) -> &Self::Inner {
&self.inner
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
self.read_cache
.read_from_cache(&self.inner, path, args)
.await
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let result = self.inner.write(path, args).await;
self.read_cache.invalidate_entries_with_prefix(path);
result
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner
.delete()
.await
.map(|(rp, deleter)| (rp, CacheAwareDeleter::new(self.read_cache.clone(), deleter)))
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.inner.list(path, args).await
}
}

View File

@@ -1,366 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::{debug, trace};
use futures::{FutureExt, TryStreamExt};
use moka::future::Cache;
use moka::notification::ListenerFuture;
use moka::policy::EvictionPolicy;
use opendal::raw::oio::{Read, Reader, Write};
use opendal::raw::{Access, OpDelete, OpRead, OpStat, OpWrite, RpRead, oio};
use opendal::{Error as OpendalError, ErrorKind, OperatorBuilder, Result};
use crate::metrics::{
OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
OBJECT_STORE_LRU_CACHE_MISS, OBJECT_STORE_READ_ERROR,
};
const RECOVER_CACHE_LIST_CONCURRENT: usize = 8;
/// Subdirectory of cached files for read.
///
/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
const READ_CACHE_DIR: &str = "cache/object/read";
/// Cache value for read file
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
enum ReadResult {
// Read success with size
Success(u32),
// File not found
NotFound,
}
impl ReadResult {
fn size_bytes(&self) -> u32 {
match self {
ReadResult::NotFound => 0,
ReadResult::Success(size) => *size,
}
}
}
/// Returns true when the path of the file can be cached.
fn can_cache(path: &str) -> bool {
// TODO(dennis): find a better way
!path.ends_with("_last_checkpoint")
}
/// Generate a unique cache key for the read path and range.
fn read_cache_key(path: &str, args: &OpRead) -> String {
format!(
"{READ_CACHE_DIR}/{:x}.cache-{}",
md5::compute(path),
args.range().to_header()
)
}
fn read_cache_root() -> String {
format!("/{READ_CACHE_DIR}/")
}
fn read_cache_key_prefix(path: &str) -> String {
format!("{READ_CACHE_DIR}/{:x}", md5::compute(path))
}
/// Local read cache for files in object storage
#[derive(Debug)]
pub(crate) struct ReadCache<C> {
/// Local file cache backend
file_cache: Arc<C>,
/// Local memory cache to track local cache files
mem_cache: Cache<String, ReadResult>,
}
impl<C> Clone for ReadCache<C> {
fn clone(&self) -> Self {
Self {
file_cache: self.file_cache.clone(),
mem_cache: self.mem_cache.clone(),
}
}
}
impl<C: Access> ReadCache<C> {
/// Create a [`ReadCache`] with capacity in bytes.
pub(crate) fn new(file_cache: Arc<C>, capacity: usize) -> Self {
let file_cache_cloned = OperatorBuilder::new(file_cache.clone()).finish();
let eviction_listener =
move |read_key: Arc<String>, read_result: ReadResult, cause| -> ListenerFuture {
// Delete the file from local file cache when it's purged from mem_cache.
OBJECT_STORE_LRU_CACHE_ENTRIES.dec();
let file_cache_cloned = file_cache_cloned.clone();
async move {
if let ReadResult::Success(size) = read_result {
OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64);
let result = file_cache_cloned.delete(&read_key).await;
debug!(
"Deleted local cache file `{}`, result: {:?}, cause: {:?}.",
read_key, result, cause
);
}
}
.boxed()
};
Self {
file_cache,
mem_cache: Cache::builder()
.max_capacity(capacity as u64)
.eviction_policy(EvictionPolicy::lru())
.weigher(|_key, value: &ReadResult| -> u32 {
// TODO(dennis): add key's length to weight?
value.size_bytes()
})
.async_eviction_listener(eviction_listener)
.support_invalidation_closures()
.build(),
}
}
/// Returns the cache's entry count and total approximate entry size in bytes.
pub(crate) async fn cache_stat(&self) -> (u64, u64) {
self.mem_cache.run_pending_tasks().await;
(self.mem_cache.entry_count(), self.mem_cache.weighted_size())
}
/// Invalidate all cache items belong to the specific path.
pub(crate) fn invalidate_entries_with_prefix(&self, path: &str) {
let prefix = read_cache_key_prefix(path);
// Safety: always ok when building cache with `support_invalidation_closures`.
self.mem_cache
.invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
.ok();
}
/// Recover existing cache items from `file_cache` to `mem_cache`.
/// Return entry count and total approximate entry size in bytes.
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
let op = OperatorBuilder::new(self.file_cache.clone()).finish();
let cloned_op = op.clone();
let root = read_cache_root();
let mut entries = op
.lister_with(&root)
.await?
.map_ok(|entry| async {
let (path, mut meta) = entry.into_parts();
// TODO(dennis): Use a better API, see https://github.com/apache/opendal/issues/6522
if meta.content_length() == 0 {
meta = cloned_op.stat(&path).await?;
}
Ok((path, meta))
})
.try_buffer_unordered(RECOVER_CACHE_LIST_CONCURRENT)
.try_collect::<Vec<_>>()
.await?;
while let Some((read_key, metadata)) = entries.pop() {
if !metadata.is_file() {
continue;
}
let size = metadata.content_length();
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
self.mem_cache
.insert(read_key.clone(), ReadResult::Success(size as u32))
.await;
}
Ok(self.cache_stat().await)
}
/// Returns true when the read cache contains the specific file.
pub(crate) async fn contains_file(&self, path: &str) -> bool {
self.mem_cache.run_pending_tasks().await;
self.mem_cache.contains_key(path)
&& self.file_cache.stat(path, OpStat::default()).await.is_ok()
}
/// Read from a specific path using the OpRead operation.
/// It will attempt to retrieve the data from the local cache.
/// If the data is not found in the local cache,
/// it will fall back to retrieving it from remote object storage
/// and cache the result locally.
pub(crate) async fn read_from_cache<I>(
&self,
inner: &I,
path: &str,
args: OpRead,
) -> Result<(RpRead, Reader)>
where
I: Access,
{
if !can_cache(path) {
return inner.read(path, args).await.map(to_output_reader);
}
let read_key = read_cache_key(path, &args);
let read_result = self
.mem_cache
.try_get_with(
read_key.clone(),
self.read_remote(inner, &read_key, path, args.clone()),
)
.await
.map_err(|e| OpendalError::new(e.kind(), e.to_string()))?;
match read_result {
ReadResult::Success(_) => {
// There is a concurrent issue here, the local cache may be purged
// while reading, we have to fall back to remote read
match self.file_cache.read(&read_key, OpRead::default()).await {
Ok(ret) => {
OBJECT_STORE_LRU_CACHE_HIT
.with_label_values(&["success"])
.inc();
Ok(to_output_reader(ret))
}
Err(_) => {
OBJECT_STORE_LRU_CACHE_MISS.inc();
inner.read(path, args).await.map(to_output_reader)
}
}
}
ReadResult::NotFound => {
OBJECT_STORE_LRU_CACHE_HIT
.with_label_values(&["not_found"])
.inc();
Err(OpendalError::new(
ErrorKind::NotFound,
format!("File not found: {path}"),
))
}
}
}
async fn try_write_cache<I>(&self, mut reader: I::Reader, read_key: &str) -> Result<usize>
where
I: Access,
{
let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?;
let mut total = 0;
loop {
let bytes = reader.read().await?;
if bytes.is_empty() {
break;
}
total += bytes.len();
writer.write(bytes).await?;
}
// Call `close` to ensure data is written.
writer.close().await?;
Ok(total)
}
/// Read the file from remote storage. If success, write the content into local cache.
async fn read_remote<I>(
&self,
inner: &I,
read_key: &str,
path: &str,
args: OpRead,
) -> Result<ReadResult>
where
I: Access,
{
OBJECT_STORE_LRU_CACHE_MISS.inc();
let (_, reader) = inner.read(path, args).await?;
let result = self.try_write_cache::<I>(reader, read_key).await;
trace!(
"Read cache miss for key '{}' and fetch file '{}' from object store",
read_key, path,
);
match result {
Ok(read_bytes) => {
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(read_bytes as i64);
Ok(ReadResult::Success(read_bytes as u32))
}
Err(e) if e.kind() == ErrorKind::NotFound => {
OBJECT_STORE_READ_ERROR
.with_label_values(&[e.kind().to_string().as_str()])
.inc();
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
Ok(ReadResult::NotFound)
}
Err(e) => {
OBJECT_STORE_READ_ERROR
.with_label_values(&[e.kind().to_string().as_str()])
.inc();
Err(e)
}
}
}
}
pub struct CacheAwareDeleter<C, D> {
cache: ReadCache<C>,
deleter: D,
}
impl<C: Access, D: oio::Delete> CacheAwareDeleter<C, D> {
pub(crate) fn new(cache: ReadCache<C>, deleter: D) -> Self {
Self { cache, deleter }
}
}
impl<C: Access, D: oio::Delete> oio::Delete for CacheAwareDeleter<C, D> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.cache.invalidate_entries_with_prefix(path);
self.deleter.delete(path, args)?;
Ok(())
}
async fn flush(&mut self) -> Result<usize> {
self.deleter.flush().await
}
}
fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
(input.0, Box::new(input.1))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_can_cache() {
assert!(can_cache("test"));
assert!(can_cache("a/b/c.parquet"));
assert!(can_cache("1.json"));
assert!(can_cache("100.checkpoint"));
assert!(can_cache("test/last_checkpoint"));
assert!(!can_cache("test/__last_checkpoint"));
assert!(!can_cache("a/b/c/__last_checkpoint"));
}
}

View File

@@ -13,38 +13,3 @@
// limitations under the License.
//! object-store metrics
/// Cache hit counter, no matter what the cache result is.
use lazy_static::lazy_static;
use prometheus::*;
lazy_static! {
/// Cache hit counter, no matter what the cache result is.
pub static ref OBJECT_STORE_LRU_CACHE_HIT: IntCounterVec = register_int_counter_vec!(
"greptime_object_store_lru_cache_hit",
"object store lru cache hit",
&["result"]
)
.unwrap();
/// Cache miss counter
pub static ref OBJECT_STORE_LRU_CACHE_MISS: IntCounter =
register_int_counter!("greptime_object_store_lru_cache_miss", "object store lru cache miss")
.unwrap();
/// Object store read error counter
pub static ref OBJECT_STORE_READ_ERROR: IntCounterVec = register_int_counter_vec!(
"greptime_object_store_read_errors",
"object store read errors",
&["kind"]
)
.unwrap();
/// Cache entry number
pub static ref OBJECT_STORE_LRU_CACHE_ENTRIES: IntGauge =
register_int_gauge!("greptime_object_store_lru_cache_entries", "object store lru cache entries")
.unwrap();
/// Cache size in bytes
pub static ref OBJECT_STORE_LRU_CACHE_BYTES: IntGauge =
register_int_gauge!("greptime_object_store_lru_cache_bytes", "object store lru cache bytes")
.unwrap();
}

View File

@@ -13,22 +13,15 @@
// limitations under the License.
use std::env;
use std::sync::Arc;
use anyhow::Result;
use common_telemetry::info;
use common_test_util::temp_dir::create_temp_dir;
use object_store::layers::LruCacheLayer;
use object_store::ObjectStore;
use object_store::services::{Fs, S3};
use object_store::test_util::TempFolder;
use object_store::{ObjectStore, ObjectStoreBuilder};
use opendal::raw::oio::{List, Read};
use opendal::raw::{Access, OpList, OpRead};
use opendal::EntryMode;
use opendal::services::{Azblob, Gcs, Oss};
use opendal::{EntryMode, OperatorBuilder};
/// Duplicate of the constant in `src/layers/lru_cache/read_cache.rs`
const READ_CACHE_DIR: &str = "cache/object/read";
async fn test_object_crud(store: &ObjectStore) -> Result<()> {
// Create object handler.
@@ -231,249 +224,3 @@ async fn test_gcs_backend() -> Result<()> {
}
Ok(())
}
#[tokio::test]
async fn test_file_backend_with_lru_cache() -> Result<()> {
common_telemetry::init_default_ut_logging();
let data_dir = create_temp_dir("test_file_backend_with_lru_cache");
let tmp_dir = create_temp_dir("test_file_backend_with_lru_cache");
let builder = Fs::default()
.root(&data_dir.path().to_string_lossy())
.atomic_write_dir(&tmp_dir.path().to_string_lossy());
let store = builder.build().unwrap();
let cache_dir = create_temp_dir("test_file_backend_with_lru_cache");
let cache_layer = {
let builder = Fs::default()
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&cache_dir.path().to_string_lossy());
let file_cache = Arc::new(builder.build().unwrap());
let cache_layer = LruCacheLayer::new(file_cache, 32).unwrap();
cache_layer.recover_cache(true).await;
cache_layer
};
let store = OperatorBuilder::new(store)
.layer(cache_layer.clone())
.finish();
test_object_crud(&store).await?;
test_object_list(&store).await?;
assert_eq!(cache_layer.read_cache_stat().await, (0, 0));
Ok(())
}
async fn assert_lru_cache<C: Access>(cache_layer: &LruCacheLayer<C>, file_names: &[&str]) {
for file_name in file_names {
let file_path = format!("{READ_CACHE_DIR}/{file_name}");
assert!(cache_layer.contains_file(&file_path).await, "{file_path:?}");
}
}
async fn assert_cache_files<C: Access>(
store: &C,
file_names: &[&str],
file_contents: &[&str],
) -> Result<()> {
let (_, mut lister) = store.list("/", OpList::default()).await?;
let mut objects = vec![];
while let Some(e) = lister.next().await? {
if e.mode() == EntryMode::FILE {
objects.push(e);
}
}
// compare the cache file with the expected cache file; ignore orders
for o in objects {
let position = file_names.iter().position(|&x| x == o.path());
assert!(position.is_some(), "file not found: {}", o.path());
let position = position.unwrap();
let (_, mut r) = store.read(o.path(), OpRead::default()).await.unwrap();
let bs = r.read_all().await.unwrap();
assert_eq!(
file_contents[position],
String::from_utf8(bs.to_vec())?,
"file content not match: {}",
o.path()
);
}
Ok(())
}
#[tokio::test]
async fn test_object_store_cache_policy() -> Result<()> {
common_telemetry::init_default_ut_logging();
// create file storage
let root_dir = create_temp_dir("test_object_store_cache_policy");
let store = OperatorBuilder::new(
Fs::default()
.root(&root_dir.path().to_string_lossy())
.atomic_write_dir(&root_dir.path().to_string_lossy())
.build()
.unwrap(),
)
.finish();
// create file cache layer
let cache_dir = create_temp_dir("test_object_store_cache_policy_cache");
let atomic_temp_dir = create_temp_dir("test_object_store_cache_policy_cache_tmp");
let builder = Fs::default()
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&atomic_temp_dir.path().to_string_lossy());
let file_cache = Arc::new(builder.build().unwrap());
let cache_store = file_cache.clone();
// create operator for cache dir to verify cache file
let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).unwrap();
cache_layer.recover_cache(true).await;
let store = store.layer(cache_layer.clone());
// create several object handler.
// write data into object;
let p1 = "test_file1";
let p2 = "test_file2";
store.write(p1, "Hello, object1!").await.unwrap();
store.write(p2, "Hello, object2!").await.unwrap();
// Try to read p1 and p2
let _ = store.read_with(p1).range(0..).await?;
let _ = store.read(p1).await?;
let _ = store.read_with(p2).range(0..).await?;
let _ = store.read_with(p2).range(7..).await?;
let _ = store.read(p2).await?;
assert_eq!(cache_layer.read_cache_stat().await, (3, 38));
assert_cache_files(
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-",
],
&["Hello, object1!", "object2!", "Hello, object2!"],
)
.await?;
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-",
],
)
.await;
// Delete p2 file
store.delete(p2).await.unwrap();
assert_eq!(cache_layer.read_cache_stat().await, (1, 15));
assert_cache_files(
&cache_store,
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"],
&["Hello, object1!"],
)
.await?;
assert_lru_cache(
&cache_layer,
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"],
)
.await;
// Read the deleted file without a deterministic range size requires an extra `stat.`
// Therefore, it won't go into the cache.
assert!(store.read(p2).await.is_err());
let p3 = "test_file3";
store.write(p3, "Hello, object3!").await.unwrap();
// Try to read p3
let _ = store.read(p3).await.unwrap();
let _ = store.read_with(p3).range(0..5).await.unwrap();
assert_eq!(cache_layer.read_cache_stat().await, (3, 35));
// However, The real open file happens after the reader is created.
// The reader will throw an error during the reading
// instead of returning `NotFound` during the reader creation.
// The entry count is 4, because we have the p2 `NotFound` cache.
assert!(store.read_with(p2).range(0..4).await.is_err());
assert_eq!(cache_layer.read_cache_stat().await, (3, 35));
assert_cache_files(
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
&["Hello, object1!", "Hello, object3!", "Hello"],
)
.await?;
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)
.await;
// try to read p1, p2, p3
let _ = store.read(p3).await.unwrap();
let _ = store.read_with(p3).range(0..5).await.unwrap();
assert!(store.read(p2).await.is_err());
// Read p1 with range `1..` , the existing p1 with range `0..` must be evicted.
let _ = store.read_with(p1).range(1..15).await.unwrap();
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
assert_cache_files(
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
&["ello, object1!", "Hello, object3!", "Hello"],
)
.await?;
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)
.await;
let metric_text = common_telemetry::dump_metrics().unwrap();
assert!(metric_text.contains("object_store_lru_cache_hit"));
assert!(metric_text.contains("object_store_lru_cache_miss"));
drop(cache_layer);
// Test recover
let cache_layer = LruCacheLayer::new(cache_store, 38).unwrap();
cache_layer.recover_cache(true).await;
// The p2 `NotFound` cache will not be recovered
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)
.await;
Ok(())
}

View File

@@ -1503,6 +1503,8 @@ write_cache_path = ""
write_cache_size = "5GiB"
preload_index_cache = true
index_cache_percent = 20
enable_refill_cache_on_read = true
manifest_cache_size = "256MiB"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32
max_concurrent_scan_files = 384