diff --git a/Cargo.lock b/Cargo.lock index ecf7dd9eed..4e24c073f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5378,6 +5378,7 @@ dependencies = [ "common-test-util", "futures", "lru 0.9.0", + "md5", "metrics", "opendal", "pin-project", diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 7e75cfe904..79c6b2798b 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -21,8 +21,6 @@ use snafu::Location; use storage::error::Error as StorageError; use table::error::Error as TableError; -use crate::datanode::ObjectStoreConfig; - /// Business error of datanode. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] @@ -210,9 +208,8 @@ pub enum Error { #[snafu(display("Failed to storage engine, source: {}", source))] OpenStorageEngine { source: StorageError }, - #[snafu(display("Failed to init backend, config: {:#?}, source: {}", config, source))] + #[snafu(display("Failed to init backend, source: {}", source))] InitBackend { - config: Box, source: object_store::Error, location: Location, }, diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index e64c6cbe72..6e81e5db52 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -379,15 +379,13 @@ pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Re .access_key_secret(oss_config.access_key_secret.expose_secret()); let object_store = ObjectStore::new(builder) - .with_context(|_| error::InitBackendSnafu { - config: store_config.clone(), - })? + .context(error::InitBackendSnafu)? .finish(); - create_object_store_with_cache(object_store, store_config) + create_object_store_with_cache(object_store, store_config).await } -fn create_object_store_with_cache( +async fn create_object_store_with_cache( object_store: ObjectStore, store_config: &ObjectStoreConfig, ) -> Result { @@ -410,14 +408,17 @@ fn create_object_store_with_cache( }; if let Some(path) = cache_path { - let cache_store = - FsBuilder::default() - .root(path) - .build() - .with_context(|_| error::InitBackendSnafu { - config: store_config.clone(), - })?; - let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize); + let atomic_temp_dir = format!("{path}/.tmp/"); + clean_temp_dir(&atomic_temp_dir)?; + let cache_store = FsBuilder::default() + .root(path) + .atomic_write_dir(&atomic_temp_dir) + .build() + .context(error::InitBackendSnafu)?; + + let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize) + .await + .context(error::InitBackendSnafu)?; Ok(object_store.layer(cache_layer)) } else { Ok(object_store) @@ -452,12 +453,21 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res create_object_store_with_cache( ObjectStore::new(builder) - .with_context(|_| error::InitBackendSnafu { - config: store_config.clone(), - })? + .context(error::InitBackendSnafu)? .finish(), store_config, ) + .await +} + +fn clean_temp_dir(dir: &str) -> Result<()> { + if path::Path::new(&dir).exists() { + info!("Begin to clean temp storage directory: {}", dir); + fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?; + info!("Cleaned temp storage directory: {}", dir); + } + + Ok(()) } pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result { @@ -471,24 +481,13 @@ pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Res info!("The file storage directory is: {}", &data_dir); let atomic_write_dir = format!("{data_dir}/.tmp/"); - if path::Path::new(&atomic_write_dir).exists() { - info!( - "Begin to clean temp storage directory: {}", - &atomic_write_dir - ); - fs::remove_dir_all(&atomic_write_dir).context(error::RemoveDirSnafu { - dir: &atomic_write_dir, - })?; - info!("Cleaned temp storage directory: {}", &atomic_write_dir); - } + clean_temp_dir(&atomic_write_dir)?; let mut builder = FsBuilder::default(); builder.root(&data_dir).atomic_write_dir(&atomic_write_dir); let object_store = ObjectStore::new(builder) - .context(error::InitBackendSnafu { - config: store_config.clone(), - })? + .context(error::InitBackendSnafu)? .finish(); Ok(object_store) diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index f68c10fd94..19806c1769 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -9,6 +9,7 @@ lru = "0.9" async-trait = "0.1" bytes = "1.4" futures = { version = "0.3" } +md5 = "0.7" metrics = "0.20" opendal = { version = "0.33", features = ["layers-tracing", "layers-metrics"] } pin-project = "1.0" diff --git a/src/object-store/src/cache_policy.rs b/src/object-store/src/cache_policy.rs index 81275d7d15..e1586cbcb3 100644 --- a/src/object-store/src/cache_policy.rs +++ b/src/object-store/src/cache_policy.rs @@ -17,11 +17,10 @@ use std::ops::DerefMut; use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use lru::LruCache; use metrics::increment_counter; use opendal::ops::{OpDelete, OpList, OpRead, OpScan, OpWrite}; -use opendal::raw::oio::{Read, Reader, Write}; +use opendal::raw::oio::{Page, Read, Reader, Write}; use opendal::raw::{Accessor, Layer, LayeredAccessor, RpDelete, RpList, RpRead, RpScan, RpWrite}; use opendal::{ErrorKind, Result}; use tokio::sync::Mutex; @@ -31,19 +30,41 @@ use crate::metrics::{ OBJECT_STORE_LRU_CACHE_MISS, }; +#[derive(Clone)] pub struct LruCacheLayer { cache: Arc, lru_cache: Arc>>, } -impl LruCacheLayer { - pub fn new(cache: Arc, capacity: usize) -> Self { - Self { +impl LruCacheLayer { + pub async fn new(cache: Arc, capacity: usize) -> Result { + let layer = Self { cache, lru_cache: Arc::new(Mutex::new(LruCache::new( NonZeroUsize::new(capacity).unwrap(), ))), + }; + layer.recover_keys().await?; + + Ok(layer) + } + + /// Recover existing keys from `cache` to `lru_cache`. + async fn recover_keys(&self) -> Result<()> { + let (_, mut pager) = self.cache.list("/", OpList::default()).await?; + + let mut lru_cache = self.lru_cache.lock().await; + while let Some(entries) = pager.next().await? { + for entry in entries { + lru_cache.push(entry.path().to_string(), ()); + } } + + Ok(()) + } + + pub async fn lru_contains_key(&self, key: &str) -> bool { + self.lru_cache.lock().await.contains(key) } } @@ -68,7 +89,11 @@ pub struct LruCacheAccessor { impl LruCacheAccessor { fn cache_path(&self, path: &str, args: &OpRead) -> String { - format!("{}.cache-{}", path, args.range().to_header()) + format!( + "{:x}.cache-{}", + md5::compute(path), + args.range().to_header() + ) } } @@ -91,9 +116,10 @@ impl LayeredAccessor for LruCacheAccessor { async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let path = path.to_string(); let cache_path = self.cache_path(&path, &args); - let lru_cache = self.lru_cache.clone(); + let lru_cache = &self.lru_cache; - match self.cache.read(&cache_path, args.clone()).await { + // the args is already in the cache path, so we must create a new OpRead. + match self.cache.read(&cache_path, OpRead::default()).await { Ok((rp, r)) => { increment_counter!(OBJECT_STORE_LRU_CACHE_HIT); @@ -105,18 +131,16 @@ impl LayeredAccessor for LruCacheAccessor { Err(err) if err.kind() == ErrorKind::NotFound => { increment_counter!(OBJECT_STORE_LRU_CACHE_MISS); - let (rp, mut reader) = self.inner.read(&path, args.clone()).await?; - let size = rp.clone().into_metadata().content_length(); + let (_, mut reader) = self.inner.read(&path, args.clone()).await?; let (_, mut writer) = self.cache.write(&cache_path, OpWrite::new()).await?; - // TODO(hl): We can use [Writer::append](https://docs.rs/opendal/0.30.4/opendal/struct.Writer.html#method.append) - // here to avoid loading whole file into memory once all our backend supports `Writer`. - let mut buf = vec![0; size as usize]; - reader.read(&mut buf).await?; - writer.write(Bytes::from(buf)).await?; + while let Some(bytes) = reader.next().await { + writer.write(bytes?).await?; + } + writer.close().await?; - match self.cache.read(&cache_path, args.clone()).await { + match self.cache.read(&cache_path, OpRead::default()).await { Ok((rp, reader)) => { let r = { // push new cache file name to lru @@ -144,15 +168,15 @@ impl LayeredAccessor for LruCacheAccessor { } async fn delete(&self, path: &str, args: OpDelete) -> Result { - let path = path.to_string(); - let lru_cache = self.lru_cache.clone(); + let cache_path = md5::compute(path); + let lru_cache = &self.lru_cache; let cache_files: Vec = { let mut guard = lru_cache.lock().await; let lru = guard.deref_mut(); let cache_files = lru .iter() - .filter(|(k, _v)| k.starts_with(format!("{path}.cache-").as_str())) + .filter(|(k, _v)| k.starts_with(format!("{:x}.cache-", cache_path).as_str())) .map(|(k, _v)| k.clone()) .collect::>(); for k in &cache_files { @@ -163,7 +187,7 @@ impl LayeredAccessor for LruCacheAccessor { for file in cache_files { let _ = self.cache.delete(&file, OpDelete::new()).await; } - return self.inner.delete(&path, args).await; + self.inner.delete(path, args).await } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 9b15a8aff9..4920b3fb73 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -22,6 +22,7 @@ use object_store::cache_policy::LruCacheLayer; use object_store::services::{Fs, S3}; use object_store::test_util::TempFolder; use object_store::{util, ObjectStore, ObjectStoreBuilder}; +use opendal::raw::Accessor; use opendal::services::Oss; use opendal::{EntryMode, Operator, OperatorBuilder}; @@ -157,6 +158,15 @@ async fn test_oss_backend() -> Result<()> { Ok(()) } +async fn assert_lru_cache( + cache_layer: &LruCacheLayer, + file_names: &[&str], +) { + for file_name in file_names { + assert!(cache_layer.lru_contains_key(file_name).await); + } +} + async fn assert_cache_files( store: &Operator, file_names: &[&str], @@ -188,7 +198,7 @@ async fn test_object_store_cache_policy() -> Result<()> { common_telemetry::init_default_ut_logging(); common_telemetry::init_default_metrics_recorder(); // create file storage - let root_dir = create_temp_dir("test_fs_backend"); + let root_dir = create_temp_dir("test_object_store_cache_policy"); let store = OperatorBuilder::new( Fs::default() .root(&root_dir.path().to_string_lossy()) @@ -199,7 +209,7 @@ async fn test_object_store_cache_policy() -> Result<()> { .finish(); // create file cache layer - let cache_dir = create_temp_dir("test_fs_cache"); + let cache_dir = create_temp_dir("test_object_store_cache_policy_cache"); let mut builder = Fs::default(); builder .root(&cache_dir.path().to_string_lossy()) @@ -208,7 +218,10 @@ async fn test_object_store_cache_policy() -> Result<()> { let cache_store = OperatorBuilder::new(cache_accessor.clone()).finish(); // create operator for cache dir to verify cache file - let store = store.layer(LruCacheLayer::new(Arc::new(cache_accessor), 3)); + let cache_layer = LruCacheLayer::new(Arc::new(cache_accessor.clone()), 3) + .await + .unwrap(); + let store = store.layer(cache_layer.clone()); // create several object handler. // write data into object; @@ -221,27 +234,41 @@ async fn test_object_store_cache_policy() -> Result<()> { store.range_read(p1, 0..).await?; store.read(p1).await?; store.range_read(p2, 0..).await?; + store.range_read(p2, 7..).await?; store.read(p2).await?; assert_cache_files( &cache_store, &[ - "test_file1.cache-bytes=0-", - "test_file2.cache-bytes=7-", - "test_file2.cache-bytes=0-", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-", ], &["Hello, object1!", "object2!", "Hello, object2!"], ) .await?; + assert_lru_cache( + &cache_layer, + &[ + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-", + ], + ) + .await; store.delete(p2).await.unwrap(); assert_cache_files( &cache_store, - &["test_file1.cache-bytes=0-"], + &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"], &["Hello, object1!"], ) .await?; + assert_lru_cache( + &cache_layer, + &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"], + ) + .await; let p3 = "test_file3"; store.write(p3, "Hello, object3!").await.unwrap(); @@ -252,13 +279,22 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_cache_files( &cache_store, &[ - "test_file1.cache-bytes=0-", - "test_file3.cache-bytes=0-", - "test_file3.cache-bytes=0-4", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], &["Hello, object1!", "Hello, object3!", "Hello"], ) .await?; + assert_lru_cache( + &cache_layer, + &[ + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", + ], + ) + .await; let handle = metric::try_handle().unwrap(); let metric_text = handle.render(); @@ -266,5 +302,20 @@ async fn test_object_store_cache_policy() -> Result<()> { assert!(metric_text.contains("object_store_lru_cache_hit")); assert!(metric_text.contains("object_store_lru_cache_miss")); + drop(cache_layer); + let cache_layer = LruCacheLayer::new(Arc::new(cache_accessor), 3) + .await + .unwrap(); + + assert_lru_cache( + &cache_layer, + &[ + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", + ], + ) + .await; + Ok(()) }