feat: allow purging a given puffin file in staging area (#5558)

* feat: purge a given puffin file in staging area

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* polish log

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* ttl set to 2d

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* feat: expose staging_ttl to index config

* fix test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* use `invalidate_entries_if` instead of maintaining map

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* run_pending_tasks after purging

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Zhenchi
2025-02-19 16:58:30 +08:00
committed by GitHub
parent aada5c1706
commit 421e38c481
17 changed files with 209 additions and 21 deletions

View File

@@ -152,6 +152,7 @@
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
| `region_engine.mito.index.staging_ttl` | String | `7d` | The TTL of the staging directory.<br/>Defaults to 7 days.<br/>Setting it to "0s" to disable TTL. |
| `region_engine.mito.index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
| `region_engine.mito.index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
| `region_engine.mito.index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. |
@@ -491,6 +492,7 @@
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
| `region_engine.mito.index.staging_ttl` | String | `7d` | The TTL of the staging directory.<br/>Defaults to 7 days.<br/>Setting it to "0s" to disable TTL. |
| `region_engine.mito.index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
| `region_engine.mito.index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
| `region_engine.mito.index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. |

View File

@@ -497,6 +497,11 @@ aux_path = ""
## The max capacity of the staging directory.
staging_size = "2GB"
## The TTL of the staging directory.
## Defaults to 7 days.
## Setting it to "0s" to disable TTL.
staging_ttl = "7d"
## Cache size for inverted index metadata.
metadata_cache_size = "64MiB"

View File

@@ -584,6 +584,11 @@ aux_path = ""
## The max capacity of the staging directory.
staging_size = "2GB"
## The TTL of the staging directory.
## Defaults to 7 days.
## Setting it to "0s" to disable TTL.
staging_ttl = "7d"
## Cache size for inverted index metadata.
metadata_cache_size = "64MiB"

View File

@@ -30,7 +30,7 @@ async fn new_bounded_stager(prefix: &str) -> (TempDir, Arc<BoundedStager>) {
let path = staging_dir.path().to_path_buf();
(
staging_dir,
Arc::new(BoundedStager::new(path, 102400, None).await.unwrap()),
Arc::new(BoundedStager::new(path, 102400, None, None).await.unwrap()),
)
}

View File

@@ -135,6 +135,7 @@ pub async fn open_compaction_region(
&mito_config.index.aux_path,
mito_config.index.staging_size.as_bytes(),
Some(mito_config.index.write_buffer_size.as_bytes() as _),
mito_config.index.staging_ttl,
)
.await?;
let intermediate_manager =

View File

@@ -299,6 +299,11 @@ pub struct IndexConfig {
/// The max capacity of the staging directory.
pub staging_size: ReadableSize,
/// The TTL of the staging directory.
/// Defaults to 7 days.
/// Setting it to "0s" to disable TTL.
#[serde(with = "humantime_serde")]
pub staging_ttl: Option<Duration>,
/// Write buffer size for creating the index.
pub write_buffer_size: ReadableSize,
@@ -316,6 +321,7 @@ impl Default for IndexConfig {
Self {
aux_path: String::new(),
staging_size: ReadableSize::gb(2),
staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
write_buffer_size: ReadableSize::mb(8),
metadata_cache_size: ReadableSize::mb(64),
content_cache_size: ReadableSize::mb(128),
@@ -352,6 +358,10 @@ impl IndexConfig {
);
}
if self.staging_ttl.map(|ttl| ttl.is_zero()).unwrap_or(false) {
self.staging_ttl = None;
}
Ok(())
}
}

View File

@@ -823,6 +823,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to purge puffin stager"))]
PuffinPurgeStager {
source: puffin::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build puffin reader"))]
PuffinBuildReader {
source: puffin::error::Error,
@@ -1062,7 +1069,8 @@ impl ErrorExt for Error {
PuffinReadBlob { source, .. }
| PuffinAddBlob { source, .. }
| PuffinInitStager { source, .. }
| PuffinBuildReader { source, .. } => source.status_code(),
| PuffinBuildReader { source, .. }
| PuffinPurgeStager { source, .. } => source.status_code(),
CleanDir { .. } => StatusCode::Unexpected,
InvalidConfig { .. } => StatusCode::InvalidArguments,
StaleLogEntry { .. } => StatusCode::Unexpected,

View File

@@ -154,6 +154,10 @@ pub enum IndexType {
}
impl FileMeta {
pub fn exists_index(&self) -> bool {
!self.available_indexes.is_empty()
}
/// Returns true if the file has an inverted index
pub fn inverted_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::InvertedIndex)

View File

@@ -92,8 +92,8 @@ impl FilePurger for LocalFilePurger {
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
{
// Removes the inverted index from the cache.
if file_meta.inverted_index_available() {
// Removes index file from the cache.
if file_meta.exists_index() {
write_cache
.remove(IndexKey::new(
file_meta.region_id,
@@ -111,6 +111,18 @@ impl FilePurger for LocalFilePurger {
))
.await;
}
// Purges index content in the stager.
let puffin_file_name =
crate::sst::location::index_file_path(sst_layer.region_dir(), file_meta.file_id);
if let Err(e) = sst_layer
.puffin_manager_factory()
.purge_stager(&puffin_file_name)
.await
{
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
file_meta.file_id, file_meta.region_id);
}
})) {
error!(e; "Failed to schedule the file purge request");
}
@@ -146,7 +158,7 @@ mod tests {
let path = location::sst_file_path(sst_dir, sst_file_id);
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None)
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
@@ -202,7 +214,7 @@ mod tests {
let sst_dir = "table1";
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None)
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())

View File

@@ -14,6 +14,7 @@
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use common_error::ext::BoxedError;
@@ -21,11 +22,11 @@ use object_store::{FuturesAsyncWriter, ObjectStore};
use puffin::error::{self as puffin_error, Result as PuffinResult};
use puffin::puffin_manager::file_accessor::PuffinFileAccessor;
use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
use puffin::puffin_manager::stager::BoundedStager;
use puffin::puffin_manager::stager::{BoundedStager, Stager};
use puffin::puffin_manager::{BlobGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use crate::error::{PuffinInitStagerSnafu, Result};
use crate::error::{PuffinInitStagerSnafu, PuffinPurgeStagerSnafu, Result};
use crate::metrics::{
StagerMetrics, INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_READ_BYTES_TOTAL,
INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL,
@@ -61,12 +62,14 @@ impl PuffinManagerFactory {
aux_path: impl AsRef<Path>,
staging_capacity: u64,
write_buffer_size: Option<usize>,
staging_ttl: Option<Duration>,
) -> Result<Self> {
let staging_dir = aux_path.as_ref().join(STAGING_DIR);
let stager = BoundedStager::new(
staging_dir,
staging_capacity,
Some(Arc::new(StagerMetrics::default())),
staging_ttl,
)
.await
.context(PuffinInitStagerSnafu)?;
@@ -81,6 +84,13 @@ impl PuffinManagerFactory {
let puffin_file_accessor = ObjectStorePuffinFileAccessor::new(store);
SstPuffinManager::new(self.stager.clone(), puffin_file_accessor)
}
pub(crate) async fn purge_stager(&self, puffin_file_name: &str) -> Result<()> {
self.stager
.purge(puffin_file_name)
.await
.context(PuffinPurgeStagerSnafu)
}
}
#[cfg(test)]
@@ -89,7 +99,7 @@ impl PuffinManagerFactory {
prefix: &str,
) -> (common_test_util::temp_dir::TempDir, Self) {
let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
let factory = Self::new(tempdir.path().to_path_buf(), 1024, None)
let factory = Self::new(tempdir.path().to_path_buf(), 1024, None, None)
.await
.unwrap();
(tempdir, factory)
@@ -98,7 +108,7 @@ impl PuffinManagerFactory {
pub(crate) fn new_for_test_block(prefix: &str) -> (common_test_util::temp_dir::TempDir, Self) {
let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
let f = Self::new(tempdir.path().to_path_buf(), 1024, None);
let f = Self::new(tempdir.path().to_path_buf(), 1024, None, None);
let factory = common_runtime::block_on_global(f).unwrap();
(tempdir, factory)

View File

@@ -640,7 +640,7 @@ impl TestEnv {
capacity: ReadableSize,
) -> WriteCacheRef {
let index_aux_path = self.data_home.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None)
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())

View File

@@ -55,7 +55,7 @@ impl SchedulerEnv {
let builder = Fs::default().root(&path_str);
let index_aux_path = path.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None)
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())

View File

@@ -146,6 +146,7 @@ impl WorkerGroup {
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Some(config.index.write_buffer_size.as_bytes() as _),
config.index.staging_ttl,
)
.await?;
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
@@ -295,6 +296,7 @@ impl WorkerGroup {
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Some(config.index.write_buffer_size.as_bytes() as _),
config.index.staging_ttl,
)
.await?;
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)

View File

@@ -88,6 +88,9 @@ pub trait Stager: Send + Sync {
dir_path: PathBuf,
dir_size: u64,
) -> Result<()>;
/// Purges all content for the given puffin file from the staging area.
async fn purge(&self, puffin_file_name: &str) -> Result<()>;
}
/// `StagerNotifier` provides a way to notify the caller of the staging events.

View File

@@ -78,19 +78,21 @@ impl BoundedStager {
base_dir: PathBuf,
capacity: u64,
notifier: Option<Arc<dyn StagerNotifier>>,
cache_ttl: Option<Duration>,
) -> Result<Self> {
tokio::fs::create_dir_all(&base_dir)
.await
.context(CreateSnafu)?;
let recycle_bin = Cache::builder().time_to_live(RECYCLE_BIN_TTL).build();
let recycle_bin = Cache::builder().time_to_idle(RECYCLE_BIN_TTL).build();
let recycle_bin_cloned = recycle_bin.clone();
let notifier_cloned = notifier.clone();
let cache = Cache::builder()
let mut cache_builder = Cache::builder()
.max_capacity(capacity)
.weigher(|_: &String, v: &CacheValue| v.weight())
.eviction_policy(EvictionPolicy::lru())
.support_invalidation_closures()
.async_eviction_listener(move |k, v, _| {
let recycle_bin = recycle_bin_cloned.clone();
if let Some(notifier) = notifier_cloned.as_ref() {
@@ -101,8 +103,13 @@ impl BoundedStager {
recycle_bin.insert(k.as_str().to_string(), v).await;
}
.boxed()
})
.build();
});
if let Some(ttl) = cache_ttl {
if !ttl.is_zero() {
cache_builder = cache_builder.time_to_live(ttl);
}
}
let cache = cache_builder.build();
let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE);
let notifier_cloned = notifier.clone();
@@ -162,6 +169,7 @@ impl Stager for BoundedStager {
notifier.on_load_blob(timer.elapsed());
}
let guard = Arc::new(FsBlobGuard {
puffin_file_name: puffin_file_name.to_string(),
path,
delete_queue: self.delete_queue.clone(),
size,
@@ -216,6 +224,7 @@ impl Stager for BoundedStager {
notifier.on_load_dir(timer.elapsed());
}
let guard = Arc::new(FsDirGuard {
puffin_file_name: puffin_file_name.to_string(),
path,
size,
delete_queue: self.delete_queue.clone(),
@@ -266,6 +275,7 @@ impl Stager for BoundedStager {
notifier.on_cache_insert(size);
}
let guard = Arc::new(FsDirGuard {
puffin_file_name: puffin_file_name.to_string(),
path,
size,
delete_queue: self.delete_queue.clone(),
@@ -284,6 +294,15 @@ impl Stager for BoundedStager {
Ok(())
}
async fn purge(&self, puffin_file_name: &str) -> Result<()> {
let file_name = puffin_file_name.to_string();
self.cache
.invalidate_entries_if(move |_k, v| v.puffin_file_name() == file_name)
.unwrap(); // SAFETY: `support_invalidation_closures` is enabled
self.cache.run_pending_tasks().await;
Ok(())
}
}
impl BoundedStager {
@@ -337,6 +356,9 @@ impl BoundedStager {
}
/// Recovers the staging area by iterating through the staging directory.
///
/// Note: It can't recover the mapping between puffin files and keys, so TTL
/// is configured to purge the dangling files and directories.
async fn recover(&self) -> Result<()> {
let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?;
@@ -376,6 +398,9 @@ impl BoundedStager {
path,
size,
delete_queue: self.delete_queue.clone(),
// placeholder
puffin_file_name: String::new(),
}));
// A duplicate dir will be moved to the delete queue.
let _dup_dir = elems.insert(key, v);
@@ -385,6 +410,9 @@ impl BoundedStager {
path,
size,
delete_queue: self.delete_queue.clone(),
// placeholder
puffin_file_name: String::new(),
}));
// A duplicate file will be moved to the delete queue.
let _dup_file = elems.insert(key, v);
@@ -506,6 +534,13 @@ impl CacheValue {
fn weight(&self) -> u32 {
self.size().try_into().unwrap_or(u32::MAX)
}
fn puffin_file_name(&self) -> &str {
match self {
CacheValue::File(guard) => &guard.puffin_file_name,
CacheValue::Dir(guard) => &guard.puffin_file_name,
}
}
}
enum DeleteTask {
@@ -518,6 +553,7 @@ enum DeleteTask {
/// automatically deleting the file on drop.
#[derive(Debug)]
pub struct FsBlobGuard {
puffin_file_name: String,
path: PathBuf,
size: u64,
delete_queue: Sender<DeleteTask>,
@@ -550,6 +586,7 @@ impl Drop for FsBlobGuard {
/// automatically deleting the directory on drop.
#[derive(Debug)]
pub struct FsDirGuard {
puffin_file_name: String,
path: PathBuf,
size: u64,
delete_queue: Sender<DeleteTask>,
@@ -754,6 +791,7 @@ mod tests {
tempdir.path().to_path_buf(),
u64::MAX,
Some(notifier.clone()),
None,
)
.await
.unwrap();
@@ -810,6 +848,7 @@ mod tests {
tempdir.path().to_path_buf(),
u64::MAX,
Some(notifier.clone()),
None,
)
.await
.unwrap();
@@ -884,6 +923,7 @@ mod tests {
tempdir.path().to_path_buf(),
u64::MAX,
Some(notifier.clone()),
None,
)
.await
.unwrap();
@@ -937,7 +977,7 @@ mod tests {
// recover stager
drop(stager);
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None)
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
.await
.unwrap();
@@ -997,6 +1037,7 @@ mod tests {
tempdir.path().to_path_buf(),
1, /* extremely small size */
Some(notifier.clone()),
None,
)
.await
.unwrap();
@@ -1217,7 +1258,7 @@ mod tests {
#[tokio::test]
async fn test_get_blob_concurrency_on_fail() {
let tempdir = create_temp_dir("test_get_blob_concurrency_on_fail_");
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None)
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
.await
.unwrap();
@@ -1254,7 +1295,7 @@ mod tests {
#[tokio::test]
async fn test_get_dir_concurrency_on_fail() {
let tempdir = create_temp_dir("test_get_dir_concurrency_on_fail_");
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None)
let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
.await
.unwrap();
@@ -1287,4 +1328,84 @@ mod tests {
assert!(!stager.in_cache(puffin_file_name, key));
}
#[tokio::test]
async fn test_purge() {
let tempdir = create_temp_dir("test_purge_");
let notifier = MockNotifier::build();
let stager = BoundedStager::new(
tempdir.path().to_path_buf(),
u64::MAX,
Some(notifier.clone()),
None,
)
.await
.unwrap();
// initialize stager
let puffin_file_name = "test_purge";
let blob_key = "blob_key";
let guard = stager
.get_blob(
puffin_file_name,
blob_key,
Box::new(|mut writer| {
Box::pin(async move {
writer.write_all(b"hello world").await.unwrap();
Ok(11)
})
}),
)
.await
.unwrap();
drop(guard);
let files_in_dir = [
("file_a", "Hello, world!".as_bytes()),
("file_b", "Hello, Rust!".as_bytes()),
("file_c", "你好,世界!".as_bytes()),
("subdir/file_d", "Hello, Puffin!".as_bytes()),
("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
];
let dir_key = "dir_key";
let guard = stager
.get_dir(
puffin_file_name,
dir_key,
Box::new(|writer_provider| {
Box::pin(async move {
let mut size = 0;
for (rel_path, content) in &files_in_dir {
size += content.len();
let mut writer = writer_provider.writer(rel_path).await.unwrap();
writer.write_all(content).await.unwrap();
}
Ok(size as _)
})
}),
)
.await
.unwrap();
drop(guard);
// purge the stager
stager.purge(puffin_file_name).await.unwrap();
stager.cache.run_pending_tasks().await;
let stats = notifier.stats();
assert_eq!(
stats,
Stats {
cache_insert_size: 81,
cache_evict_size: 81,
cache_hit_count: 0,
cache_hit_size: 0,
cache_miss_count: 2,
cache_miss_size: 81,
recycle_insert_size: 81,
recycle_clear_size: 0
}
);
}
}

View File

@@ -32,7 +32,11 @@ async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc<Bounde
let path = staging_dir.path().to_path_buf();
(
staging_dir,
Arc::new(BoundedStager::new(path, capacity, None).await.unwrap()),
Arc::new(
BoundedStager::new(path, capacity, None, None)
.await
.unwrap(),
),
)
}

View File

@@ -1007,6 +1007,7 @@ min_compaction_interval = "0s"
[region_engine.mito.index]
aux_path = ""
staging_size = "2GiB"
staging_ttl = "7days"
write_buffer_size = "8MiB"
content_cache_page_size = "64KiB"