fix: checkpoint metadata file dirty caching (#2020)

fix: dirty last checkpoint metadata file when enable object store caching, #2013
This commit is contained in:
dennis zhuang
2023-07-24 16:18:19 +08:00
committed by GitHub
parent 632cb26430
commit 1f371f5e6e

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use lru::LruCache;
use metrics::increment_counter;
use opendal::raw::oio::{Page, Read, Reader, Write};
use opendal::raw::oio::{Page, Read, ReadExt, Reader, Write};
use opendal::raw::{
Accessor, Layer, LayeredAccessor, OpAppend, OpDelete, OpList, OpRead, OpWrite, RpAppend,
RpDelete, RpList, RpRead, RpWrite,
@@ -89,6 +89,12 @@ pub struct LruCacheAccessor<I, C> {
lru_cache: Arc<Mutex<LruCache<String, ()>>>,
}
/// Returns true when the path of the file can be cached.
fn can_cache(path: &str) -> bool {
// TODO(dennis): find a better way
!path.ends_with("_last_checkpoint")
}
impl<I, C> LruCacheAccessor<I, C> {
fn cache_path(&self, path: &str, args: &OpRead) -> String {
format!(
@@ -99,8 +105,6 @@ impl<I, C> LruCacheAccessor<I, C> {
}
}
use opendal::raw::oio::ReadExt;
#[async_trait]
impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
type Inner = I;
@@ -117,6 +121,10 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
if !can_cache(path) {
return self.inner.read(path, args).await.map(to_output_reader);
}
let path = path.to_string();
let cache_path = self.cache_path(&path, &args);
let lru_cache = &self.lru_cache;
@@ -218,3 +226,19 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
(input.0, Box::new(input.1))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_can_cache() {
assert!(can_cache("test"));
assert!(can_cache("a/b/c.parquet"));
assert!(can_cache("1.json"));
assert!(can_cache("100.checkpoint"));
assert!(can_cache("test/last_checkpoint"));
assert!(!can_cache("test/__last_checkpoint"));
assert!(!can_cache("a/b/c/__last_checkpoint"));
}
}