diff --git a/Cargo.lock b/Cargo.lock index d363238d11..11981e1b5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8353,8 +8353,9 @@ dependencies = [ [[package]] name = "object_store_opendal" -version = "0.51.1" -source = "git+https://github.com/apache/opendal?rev=0ba8574b6d08d209056704d28a9a114beb3c1022#0ba8574b6d08d209056704d28a9a114beb3c1022" +version = "0.54.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ce697ee723fdc3eaf6c457abf4059034be15167022b18b619993802cd1443d5" dependencies = [ "async-trait", "bytes", @@ -8439,11 +8440,11 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "opendal" -version = "0.53.1" -source = "git+https://github.com/apache/opendal?rev=0ba8574b6d08d209056704d28a9a114beb3c1022#0ba8574b6d08d209056704d28a9a114beb3c1022" +version = "0.54.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb9838d0575c6dbaf3fcec7255af8d5771996d4af900bbb6fa9a314dec00a1a" dependencies = [ "anyhow", - "async-trait", "backon", "base64 0.22.1", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 31dbe0d4f3..f8016a83cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -161,7 +161,7 @@ nalgebra = "0.33" nix = { version = "0.30.1", default-features = false, features = ["event", "fs", "process"] } notify = "8.0" num_cpus = "1.16" -object_store_opendal = { git = "https://github.com/apache/opendal", rev = "0ba8574b6d08d209056704d28a9a114beb3c1022" } +object_store_opendal = "0.54" once_cell = "1.18" opentelemetry-proto = { version = "0.30", features = [ "gen-tonic", diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index b141b043d5..8a1a877567 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -21,7 +21,7 @@ humantime-serde.workspace = true lazy_static.workspace = true md5 = "0.7" moka = { workspace = true, features = ["future"] } -opendal = { git = "https://github.com/apache/opendal", rev = "0ba8574b6d08d209056704d28a9a114beb3c1022", features = [ +opendal = { version = "0.54", features = [ "layers-tracing", "layers-prometheus", "services-azblob", diff --git a/src/object-store/src/factory.rs b/src/object-store/src/factory.rs index 22b3de263f..7614454c07 100644 --- a/src/object-store/src/factory.rs +++ b/src/object-store/src/factory.rs @@ -16,6 +16,7 @@ use std::{fs, path}; use common_base::secrets::ExposeSecret; use common_telemetry::info; +use opendal::layers::HttpClientLayer; use opendal::services::{Fs, Gcs, Oss, S3}; use snafu::prelude::*; @@ -89,8 +90,9 @@ pub async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result Result let operator = ObjectStore::new(builder) .context(error::InitBackendSnafu)? + .layer(HttpClientLayer::new(client)) .finish(); - operator.update_http_client(|_| client); + Ok(operator) } @@ -136,8 +139,9 @@ pub async fn new_oss_object_store(oss_config: &OssConfig) -> Result let operator = ObjectStore::new(builder) .context(error::InitBackendSnafu)? + .layer(HttpClientLayer::new(client)) .finish(); - operator.update_http_client(|_| client); + Ok(operator) } @@ -169,7 +173,8 @@ pub async fn new_s3_object_store(s3_config: &S3Config) -> Result { let operator = ObjectStore::new(builder) .context(error::InitBackendSnafu)? + .layer(HttpClientLayer::new(client)) .finish(); - operator.update_http_client(|_| client); + Ok(operator) } diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index 9175192467..3b673bb381 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -99,13 +99,9 @@ pub struct LruCacheAccess { impl LayeredAccess for LruCacheAccess { type Inner = I; type Reader = Reader; - type BlockingReader = I::BlockingReader; type Writer = I::Writer; - type BlockingWriter = I::BlockingWriter; type Lister = I::Lister; - type BlockingLister = I::BlockingLister; type Deleter = CacheAwareDeleter; - type BlockingDeleter = I::BlockingDeleter; fn inner(&self) -> &Self::Inner { &self.inner @@ -135,25 +131,4 @@ impl LayeredAccess for LruCacheAccess { async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { self.inner.list(path, args).await } - - fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - // TODO(dennis): support blocking read cache - self.inner.blocking_read(path, args) - } - - fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - let result = self.inner.blocking_write(path, args); - - self.read_cache.invalidate_entries_with_prefix(path); - - result - } - - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - self.inner.blocking_list(path, args) - } - - fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> { - self.inner.blocking_delete() - } } diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index 741cd5cc74..b27a2801bc 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -160,7 +160,8 @@ impl ReadCache { .map_ok(|entry| async { let (path, mut meta) = entry.into_parts(); - if !cloned_op.info().full_capability().list_has_content_length { + // TODO(dennis): Use a better API, see https://github.com/apache/opendal/issues/6522 + if meta.content_length() == 0 { meta = cloned_op.stat(&path).await?; }