From 87bd12d6df4e0e7eeb75a8ad5996515ccd9fc049 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Tue, 14 Jan 2025 15:28:09 +0800 Subject: [PATCH] refactor: Bump opendal to 0.51.1 (#5354) * refactor: Bump opendal to 0.51.1 Signed-off-by: Xuanwo * Ignore dirs from cache Signed-off-by: Xuanwo * Reduce extra alloc Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- Cargo.lock | 12 +--- src/common/procedure/src/store/state_store.rs | 2 +- src/mito2/src/cache/file_cache.rs | 9 ++- src/mito2/src/manifest/storage.rs | 2 +- src/mito2/src/worker/handle_drop.rs | 2 +- src/object-store/Cargo.toml | 2 +- src/object-store/src/layers/lru_cache.rs | 22 +++--- .../src/layers/lru_cache/read_cache.rs | 69 ++++++++++++++----- src/object-store/src/lib.rs | 4 +- 9 files changed, 82 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 748d247626..2f36838a7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3857,12 +3857,6 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" -[[package]] -name = "flagset" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3ea1ec5f8307826a5b71094dd91fc04d4ae75d5709b20ad351c7fb4815c86ec" - [[package]] name = "flatbuffers" version = "23.5.26" @@ -7160,8 +7154,9 @@ checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" [[package]] name = "opendal" -version = "0.50.2" -source = "git+https://github.com/GreptimeTeam/opendal.git?rev=c82605177f2feec83e49dcaa537c505639d94024#c82605177f2feec83e49dcaa537c505639d94024" +version = "0.51.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c9dcfa7a3615e3c60eb662ed6b46b6f244cf2658098f593c0c0915430b3a268" dependencies = [ "anyhow", "async-trait", @@ -7170,7 +7165,6 @@ dependencies = [ "bytes", "chrono", "crc32c", - "flagset", "futures", "getrandom", "http 1.1.0", diff --git a/src/common/procedure/src/store/state_store.rs b/src/common/procedure/src/store/state_store.rs index 4f11973954..a43d7da86f 100644 --- a/src/common/procedure/src/store/state_store.rs +++ b/src/common/procedure/src/store/state_store.rs @@ -189,7 +189,7 @@ impl StateStore for ObjectStateStore { async fn batch_delete(&self, keys: &[String]) -> Result<()> { self.store - .remove(keys.to_vec()) + .delete_iter(keys.iter().map(String::as_str)) .await .with_context(|_| DeleteStateSnafu { key: format!("{:?}", keys), diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 51c20f7428..be05bfc98e 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -25,7 +25,7 @@ use futures::{FutureExt, TryStreamExt}; use moka::future::Cache; use moka::notification::RemovalCause; use object_store::util::join_path; -use object_store::{ErrorKind, Metakey, ObjectStore, Reader}; +use object_store::{ErrorKind, ObjectStore, Reader}; use parquet::file::metadata::ParquetMetaData; use snafu::ResultExt; use store_api::storage::RegionId; @@ -195,7 +195,6 @@ impl FileCache { let mut lister = self .local_store .lister_with(FILE_DIR) - .metakey(Metakey::ContentLength) .await .context(OpenDalSnafu)?; // Use i64 for total_size to reduce the risk of overflow. @@ -209,6 +208,12 @@ impl FileCache { let Some(key) = parse_index_key(entry.name()) else { continue; }; + + let meta = self + .local_store + .stat(entry.path()) + .await + .context(OpenDalSnafu)?; let file_size = meta.content_length() as u32; self.memory_index .insert(key, IndexValue { file_size }) diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 060a84f49d..be0ead8f88 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -343,7 +343,7 @@ impl ManifestObjectStore { ); self.object_store - .remove(paths) + .delete_iter(paths) .await .context(OpenDalSnafu)?; diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index a569f22360..d0b8dc3175 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -188,7 +188,7 @@ pub(crate) async fn remove_region_dir_once( // no parquet file found, delete the region path // first delete all files other than the marker object_store - .remove(files_to_remove_first) + .delete_iter(files_to_remove_first) .await .context(OpenDalSnafu)?; // then remove the marker with this dir diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 275343fa5e..8aab3af382 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -17,7 +17,7 @@ futures.workspace = true lazy_static.workspace = true md5 = "0.7" moka = { workspace = true, features = ["future"] } -opendal = { git = "https://github.com/GreptimeTeam/opendal.git", rev = "c82605177f2feec83e49dcaa537c505639d94024", features = [ +opendal = { version = "0.51.1", features = [ "layers-tracing", "layers-prometheus", "services-azblob", diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index 95e9349452..9175192467 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -16,8 +16,7 @@ use std::sync::Arc; use opendal::raw::oio::Reader; use opendal::raw::{ - Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, - RpWrite, + Access, Layer, LayeredAccess, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite, }; use opendal::Result; mod read_cache; @@ -26,6 +25,8 @@ use std::time::Instant; use common_telemetry::{error, info}; use read_cache::ReadCache; +use crate::layers::lru_cache::read_cache::CacheAwareDeleter; + /// An opendal layer with local LRU file cache supporting. pub struct LruCacheLayer { // The read cache @@ -103,6 +104,8 @@ impl LayeredAccess for LruCacheAccess { type BlockingWriter = I::BlockingWriter; type Lister = I::Lister; type BlockingLister = I::BlockingLister; + type Deleter = CacheAwareDeleter; + type BlockingDeleter = I::BlockingDeleter; fn inner(&self) -> &Self::Inner { &self.inner @@ -122,12 +125,11 @@ impl LayeredAccess for LruCacheAccess { result } - async fn delete(&self, path: &str, args: OpDelete) -> Result { - let result = self.inner.delete(path, args).await; - - self.read_cache.invalidate_entries_with_prefix(path); - - result + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + self.inner + .delete() + .await + .map(|(rp, deleter)| (rp, CacheAwareDeleter::new(self.read_cache.clone(), deleter))) } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { @@ -150,4 +152,8 @@ impl LayeredAccess for LruCacheAccess { fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { self.inner.blocking_list(path, args) } + + fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> { + self.inner.blocking_delete() + } } diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index 1e3cf61615..0f427c97ef 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -15,12 +15,12 @@ use std::sync::Arc; use common_telemetry::debug; -use futures::FutureExt; +use futures::{FutureExt, TryStreamExt}; use moka::future::Cache; use moka::notification::ListenerFuture; use opendal::raw::oio::{Read, Reader, Write}; -use opendal::raw::{Access, OpDelete, OpRead, OpStat, OpWrite, RpRead}; -use opendal::{EntryMode, Error as OpendalError, ErrorKind, Metakey, OperatorBuilder, Result}; +use opendal::raw::{oio, Access, OpDelete, OpRead, OpStat, OpWrite, RpRead}; +use opendal::{Error as OpendalError, ErrorKind, OperatorBuilder, Result}; use crate::metrics::{ OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT, @@ -95,7 +95,7 @@ impl Clone for ReadCache { impl ReadCache { /// Create a [`ReadCache`] with capacity in bytes. pub(crate) fn new(file_cache: Arc, capacity: usize) -> Self { - let file_cache_cloned = file_cache.clone(); + let file_cache_cloned = OperatorBuilder::new(file_cache.clone()).finish(); let eviction_listener = move |read_key: Arc, read_result: ReadResult, cause| -> ListenerFuture { // Delete the file from local file cache when it's purged from mem_cache. @@ -106,7 +106,7 @@ impl ReadCache { if let ReadResult::Success(size) = read_result { OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64); - let result = file_cache_cloned.delete(&read_key, OpDelete::new()).await; + let result = file_cache_cloned.delete(&read_key).await; debug!( "Deleted local cache file `{}`, result: {:?}, cause: {:?}.", read_key, result, cause @@ -150,24 +150,36 @@ impl ReadCache { /// Return entry count and total approximate entry size in bytes. pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> { let op = OperatorBuilder::new(self.file_cache.clone()).finish(); + let cloned_op = op.clone(); let root = read_cache_root(); let mut entries = op - .list_with(&root) - .metakey(Metakey::ContentLength | Metakey::ContentType) - .concurrent(RECOVER_CACHE_LIST_CONCURRENT) + .lister_with(&root) + .await? + .map_ok(|entry| async { + let (path, mut meta) = entry.into_parts(); + + if !cloned_op.info().full_capability().list_has_content_length { + meta = cloned_op.stat(&path).await?; + } + + Ok((path, meta)) + }) + .try_buffer_unordered(RECOVER_CACHE_LIST_CONCURRENT) + .try_collect::>() .await?; - while let Some(entry) = entries.pop() { - let read_key = entry.path(); - let size = entry.metadata().content_length(); + while let Some((read_key, metadata)) = entries.pop() { + if !metadata.is_file() { + continue; + } + + let size = metadata.content_length(); OBJECT_STORE_LRU_CACHE_ENTRIES.inc(); OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64); - // ignore root path - if entry.metadata().mode() == EntryMode::FILE { - self.mem_cache - .insert(read_key.to_string(), ReadResult::Success(size as u32)) - .await; - } + + self.mem_cache + .insert(read_key.to_string(), ReadResult::Success(size as u32)) + .await; } Ok(self.cache_stat().await) @@ -302,6 +314,29 @@ impl ReadCache { } } +pub struct CacheAwareDeleter { + cache: ReadCache, + deleter: D, +} + +impl CacheAwareDeleter { + pub(crate) fn new(cache: ReadCache, deleter: D) -> Self { + Self { cache, deleter } + } +} + +impl oio::Delete for CacheAwareDeleter { + fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.cache.invalidate_entries_with_prefix(path); + self.deleter.delete(path, args)?; + Ok(()) + } + + async fn flush(&mut self) -> Result { + self.deleter.flush().await + } +} + fn to_output_reader(input: (RpRead, R)) -> (RpRead, Reader) { (input.0, Box::new(input.1)) } diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index b727b67352..2e27822318 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -15,8 +15,8 @@ pub use opendal::raw::{Access, HttpClient}; pub use opendal::{ services, Buffer, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, - FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader, - Result, Writer, + FuturesAsyncReader, FuturesAsyncWriter, Lister, Operator as ObjectStore, Reader, Result, + Writer, }; pub mod layers;