From e0a43f37d73f62cf77200b0fccf8189a1838aaf4 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 5 Jan 2024 18:05:41 +0900 Subject: [PATCH] chore: bump opendal to 0.44 (#3058) * chore: bump opendal to 0.44 * fix: fix test_object_store_cache_policy * Revert "fix: fix test_object_store_cache_policy" This reverts commit 46c37c343f66114e0f6ee7a0a3b9ee2b79c810af. * fix: fix test_object_store_cache_policy * fix: fix test_file_backend_with_lru_cache * chore: apply suggestions from CR * fix(mito): fix mito2 cache * chore: apply suggestions from CR * chore: apply suggestions from CR --- Cargo.lock | 12 ++-- src/common/procedure/src/store/state_store.rs | 2 +- src/mito2/src/cache/file_cache.rs | 25 +++++-- src/object-store/Cargo.toml | 2 +- src/object-store/src/layers/lru_cache.rs | 8 +-- .../src/layers/lru_cache/read_cache.rs | 65 ++++++++++--------- src/object-store/src/layers/prometheus.rs | 8 +-- src/object-store/src/lib.rs | 1 - src/object-store/tests/object_store_test.rs | 44 ++++++++----- 9 files changed, 96 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1c49e58416..2c1705a262 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5502,9 +5502,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.40.0" +version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddba7299bab261d3ae2f37617fb7f45b19ed872752bb4e22cf93a69d979366c5" +checksum = "c32736a48ef08a5d2212864e2295c8e54f4d6b352b7f49aa0c29a12fc410ff66" dependencies = [ "anyhow", "async-compat", @@ -5515,15 +5515,15 @@ dependencies = [ "chrono", "flagset", "futures", + "getrandom", "http", - "hyper", "log", "md-5", "once_cell", "parking_lot 0.12.1", "percent-encoding", "pin-project", - "quick-xml 0.29.0", + "quick-xml 0.30.0", "reqsign", "reqwest", "serde", @@ -6939,9 +6939,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.29.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81b9228215d82c7b61490fec1de287136b5de6f5700f6e58ea9ad61a7964ca51" +checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" dependencies = [ "memchr", "serde", diff --git a/src/common/procedure/src/store/state_store.rs b/src/common/procedure/src/store/state_store.rs index 46f566098d..ed168c4dd7 100644 --- a/src/common/procedure/src/store/state_store.rs +++ b/src/common/procedure/src/store/state_store.rs @@ -87,7 +87,7 @@ impl StateStore for ObjectStateStore { let mut lister = self .store .lister_with(path) - .delimiter("") + .recursive(true) .await .map_err(|e| { BoxedError::new(PlainError::new( diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 3fd3408edd..86624a78f3 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -100,6 +100,14 @@ impl FileCache { self.memory_index.insert(key, value).await; } + async fn get_reader(&self, file_path: &str) -> object_store::Result> { + if self.local_store.is_exist(file_path).await? { + Ok(Some(self.local_store.reader(file_path).await?)) + } else { + Ok(None) + } + } + /// Reads a file from the cache. pub(crate) async fn reader(&self, key: IndexKey) -> Option { if !self.memory_index.contains_key(&key) { @@ -108,26 +116,29 @@ impl FileCache { } let file_path = self.cache_file_path(key); - match self.local_store.reader(&file_path).await { - Ok(reader) => { + match self.get_reader(&file_path).await { + Ok(Some(reader)) => { CACHE_HIT.with_label_values(&[FILE_TYPE]).inc(); - Some(reader) + return Some(reader); } Err(e) => { if e.kind() != ErrorKind::NotFound { warn!("Failed to get file for key {:?}, err: {}", key, e); } - // We removes the file from the index. - self.memory_index.remove(&key).await; - CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); - None } + Ok(None) => {} } + + // We removes the file from the index. + self.memory_index.remove(&key).await; + CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + None } /// Removes a file from the cache explicitly. pub(crate) async fn remove(&self, key: IndexKey) { let file_path = self.cache_file_path(key); + self.memory_index.remove(&key).await; if let Err(e) = self.local_store.delete(&file_path).await { warn!(e; "Failed to delete a cached file {}", file_path); } diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 5cf792fa01..1dcd71b2d5 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -15,7 +15,7 @@ futures.workspace = true lazy_static.workspace = true md5 = "0.7" moka = { workspace = true, features = ["future"] } -opendal = { version = "0.40", features = [ +opendal = { version = "0.44", features = [ "layers-tracing", ] } prometheus.workspace = true diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index c0958638b9..67010f0257 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -81,8 +81,8 @@ impl LayeredAccessor for LruCacheAccessor &Self::Inner { &self.inner @@ -112,7 +112,7 @@ impl LayeredAccessor for LruCacheAccessor Result<(RpList, Self::Pager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { self.inner.list(path, args).await } @@ -130,7 +130,7 @@ impl LayeredAccessor for LruCacheAccessor Result<(RpList, Self::BlockingPager)> { + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { self.inner.blocking_list(path, args) } } diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index 61c1c82855..f5bbb9ff86 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -18,7 +18,7 @@ use common_telemetry::logging::debug; use futures::FutureExt; use moka::future::Cache; use moka::notification::ListenerFuture; -use opendal::raw::oio::{Page, Read, ReadExt, Reader, WriteExt}; +use opendal::raw::oio::{ListExt, Read, ReadExt, Reader, WriteExt}; use opendal::raw::{Accessor, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead}; use opendal::{Error as OpendalError, ErrorKind, Result}; @@ -135,24 +135,22 @@ impl ReadCache { pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> { let (_, mut pager) = self.file_cache.list("/", OpList::default()).await?; - while let Some(entries) = pager.next().await? { - for entry in entries { - let read_key = entry.path(); + while let Some(entry) = pager.next().await? { + let read_key = entry.path(); - // We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly, - // because it's private field. - let size = { - let stat = self.file_cache.stat(read_key, OpStat::default()).await?; + // We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly, + // because it's private field. + let size = { + let stat = self.file_cache.stat(read_key, OpStat::default()).await?; - stat.into_metadata().content_length() - }; + stat.into_metadata().content_length() + }; - OBJECT_STORE_LRU_CACHE_ENTRIES.inc(); - OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64); - self.mem_cache - .insert(read_key.to_string(), ReadResult::Success(size as u32)) - .await; - } + OBJECT_STORE_LRU_CACHE_ENTRIES.inc(); + OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64); + self.mem_cache + .insert(read_key.to_string(), ReadResult::Success(size as u32)) + .await; } Ok(self.stat().await) @@ -224,6 +222,22 @@ impl ReadCache { } } + async fn try_write_cache(&self, mut reader: I::Reader, read_key: &str) -> Result + where + I: Accessor, + { + let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?; + let mut total = 0; + while let Some(bytes) = reader.next().await { + let bytes = &bytes?; + total += bytes.len(); + writer.write(bytes).await?; + } + // Call `close` to ensure data is written. + writer.close().await?; + Ok(total) + } + /// Read the file from remote storage. If success, write the content into local cache. async fn read_remote( &self, @@ -237,24 +251,15 @@ impl ReadCache { { OBJECT_STORE_LRU_CACHE_MISS.inc(); - let inner_result = inner.read(path, args).await; + let (_, reader) = inner.read(path, args).await?; + let result = self.try_write_cache::(reader, read_key).await; - match inner_result { - Ok((rp, mut reader)) => { - let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?; - - while let Some(bytes) = reader.next().await { - writer.write(&bytes?).await?; - } - - // Call `close` to ensure data is written. - writer.close().await?; - - let read_bytes = rp.metadata().content_length() as u32; + match result { + Ok(read_bytes) => { OBJECT_STORE_LRU_CACHE_ENTRIES.inc(); OBJECT_STORE_LRU_CACHE_BYTES.add(read_bytes as i64); - Ok(ReadResult::Success(read_bytes)) + Ok(ReadResult::Success(read_bytes as u32)) } Err(e) if e.kind() == ErrorKind::NotFound => { diff --git a/src/object-store/src/layers/prometheus.rs b/src/object-store/src/layers/prometheus.rs index 2917eb6a62..4773463470 100644 --- a/src/object-store/src/layers/prometheus.rs +++ b/src/object-store/src/layers/prometheus.rs @@ -124,8 +124,8 @@ impl LayeredAccessor for PrometheusAccessor { type BlockingReader = PrometheusMetricWrapper; type Writer = PrometheusMetricWrapper; type BlockingWriter = PrometheusMetricWrapper; - type Pager = A::Pager; - type BlockingPager = A::BlockingPager; + type Lister = A::Lister; + type BlockingLister = A::BlockingLister; fn inner(&self) -> &Self::Inner { &self.inner @@ -243,7 +243,7 @@ impl LayeredAccessor for PrometheusAccessor { }) } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { REQUESTS_TOTAL .with_label_values(&[&self.scheme, Operation::List.into_static()]) .inc(); @@ -388,7 +388,7 @@ impl LayeredAccessor for PrometheusAccessor { }) } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { REQUESTS_TOTAL .with_label_values(&[&self.scheme, Operation::BlockingList.into_static()]) .inc(); diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index 88663ee0a3..a26a9bda64 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use opendal::raw::oio::Pager; pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient}; pub use opendal::{ services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Lister, Metakey, diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 166be33e92..f5371a4a16 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -248,7 +248,7 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { test_object_crud(&store).await?; test_object_list(&store).await?; - assert_eq!(cache_layer.read_cache_stat().await, (4, 0)); + assert_eq!(cache_layer.read_cache_stat().await, (0, 0)); Ok(()) } @@ -303,10 +303,11 @@ async fn test_object_store_cache_policy() -> Result<()> { // create file cache layer let cache_dir = create_temp_dir("test_object_store_cache_policy_cache"); + let atomic_temp_dir = create_temp_dir("test_object_store_cache_policy_cache_tmp"); let mut builder = Fs::default(); let _ = builder .root(&cache_dir.path().to_string_lossy()) - .atomic_write_dir(&cache_dir.path().to_string_lossy()); + .atomic_write_dir(&atomic_temp_dir.path().to_string_lossy()); let file_cache = Arc::new(builder.build().unwrap()); let cache_store = OperatorBuilder::new(file_cache.clone()).finish(); @@ -334,9 +335,9 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_cache_files( &cache_store, &[ - "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", - "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-", - "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14", ], &["Hello, object1!", "object2!", "Hello, object2!"], ) @@ -344,9 +345,9 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_lru_cache( &cache_layer, &[ - "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", - "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-", - "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14", ], ) .await; @@ -357,16 +358,18 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_eq!(cache_layer.read_cache_stat().await, (1, 15)); assert_cache_files( &cache_store, - &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"], + &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14"], &["Hello, object1!"], ) .await?; assert_lru_cache( &cache_layer, - &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"], + &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14"], ) .await; + // Read the deleted file without a deterministic range size requires an extra `stat.` + // Therefore, it won't go into the cache. assert!(store.read(p2).await.is_err()); let p3 = "test_file3"; @@ -376,13 +379,20 @@ async fn test_object_store_cache_policy() -> Result<()> { let _ = store.read(p3).await.unwrap(); let _ = store.read_with(p3).range(0..5).await.unwrap(); + assert_eq!(cache_layer.read_cache_stat().await, (3, 35)); + + // However, The real open file happens after the reader is created. + // The reader will throw an error during the reading + // instead of returning `NotFound` during the reader creation. // The entry count is 4, because we have the p2 `NotFound` cache. + assert!(store.read_with(p2).range(0..4).await.is_err()); assert_eq!(cache_layer.read_cache_stat().await, (4, 35)); + assert_cache_files( &cache_store, &[ - "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], &["Hello, object1!", "Hello, object3!", "Hello"], @@ -391,8 +401,8 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_lru_cache( &cache_layer, &[ - "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], ) @@ -409,7 +419,7 @@ async fn test_object_store_cache_policy() -> Result<()> { &cache_store, &[ "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], &["ello, object1!", "Hello, object3!", "Hello"], @@ -419,7 +429,7 @@ async fn test_object_store_cache_policy() -> Result<()> { &cache_layer, &[ "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], ) @@ -440,7 +450,7 @@ async fn test_object_store_cache_policy() -> Result<()> { &cache_layer, &[ "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], )