mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-02 13:20:40 +00:00
feat: bump opendal to v0.54 (#6792)
* feat: bump opendal to v0.54.0 Signed-off-by: Dennis Zhuang <killme2008@gmail.com> * chore: update cargo Signed-off-by: Dennis Zhuang <killme2008@gmail.com> --------- Signed-off-by: Dennis Zhuang <killme2008@gmail.com>
This commit is contained in:
@@ -21,7 +21,7 @@ humantime-serde.workspace = true
|
||||
lazy_static.workspace = true
|
||||
md5 = "0.7"
|
||||
moka = { workspace = true, features = ["future"] }
|
||||
opendal = { git = "https://github.com/apache/opendal", rev = "0ba8574b6d08d209056704d28a9a114beb3c1022", features = [
|
||||
opendal = { version = "0.54", features = [
|
||||
"layers-tracing",
|
||||
"layers-prometheus",
|
||||
"services-azblob",
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::{fs, path};
|
||||
|
||||
use common_base::secrets::ExposeSecret;
|
||||
use common_telemetry::info;
|
||||
use opendal::layers::HttpClientLayer;
|
||||
use opendal::services::{Fs, Gcs, Oss, S3};
|
||||
use snafu::prelude::*;
|
||||
|
||||
@@ -89,8 +90,9 @@ pub async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result<Obj
|
||||
|
||||
let operator = ObjectStore::new(builder)
|
||||
.context(error::InitBackendSnafu)?
|
||||
.layer(HttpClientLayer::new(client))
|
||||
.finish();
|
||||
operator.update_http_client(|_| client);
|
||||
|
||||
Ok(operator)
|
||||
}
|
||||
|
||||
@@ -113,8 +115,9 @@ pub async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<ObjectStore>
|
||||
|
||||
let operator = ObjectStore::new(builder)
|
||||
.context(error::InitBackendSnafu)?
|
||||
.layer(HttpClientLayer::new(client))
|
||||
.finish();
|
||||
operator.update_http_client(|_| client);
|
||||
|
||||
Ok(operator)
|
||||
}
|
||||
|
||||
@@ -136,8 +139,9 @@ pub async fn new_oss_object_store(oss_config: &OssConfig) -> Result<ObjectStore>
|
||||
|
||||
let operator = ObjectStore::new(builder)
|
||||
.context(error::InitBackendSnafu)?
|
||||
.layer(HttpClientLayer::new(client))
|
||||
.finish();
|
||||
operator.update_http_client(|_| client);
|
||||
|
||||
Ok(operator)
|
||||
}
|
||||
|
||||
@@ -169,7 +173,8 @@ pub async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectStore> {
|
||||
|
||||
let operator = ObjectStore::new(builder)
|
||||
.context(error::InitBackendSnafu)?
|
||||
.layer(HttpClientLayer::new(client))
|
||||
.finish();
|
||||
operator.update_http_client(|_| client);
|
||||
|
||||
Ok(operator)
|
||||
}
|
||||
|
||||
@@ -99,13 +99,9 @@ pub struct LruCacheAccess<I, C> {
|
||||
impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
|
||||
type Inner = I;
|
||||
type Reader = Reader;
|
||||
type BlockingReader = I::BlockingReader;
|
||||
type Writer = I::Writer;
|
||||
type BlockingWriter = I::BlockingWriter;
|
||||
type Lister = I::Lister;
|
||||
type BlockingLister = I::BlockingLister;
|
||||
type Deleter = CacheAwareDeleter<C, I::Deleter>;
|
||||
type BlockingDeleter = I::BlockingDeleter;
|
||||
|
||||
fn inner(&self) -> &Self::Inner {
|
||||
&self.inner
|
||||
@@ -135,25 +131,4 @@ impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
|
||||
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
|
||||
self.inner.list(path, args).await
|
||||
}
|
||||
|
||||
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
|
||||
// TODO(dennis): support blocking read cache
|
||||
self.inner.blocking_read(path, args)
|
||||
}
|
||||
|
||||
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
|
||||
let result = self.inner.blocking_write(path, args);
|
||||
|
||||
self.read_cache.invalidate_entries_with_prefix(path);
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
|
||||
self.inner.blocking_list(path, args)
|
||||
}
|
||||
|
||||
fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
|
||||
self.inner.blocking_delete()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,7 +160,8 @@ impl<C: Access> ReadCache<C> {
|
||||
.map_ok(|entry| async {
|
||||
let (path, mut meta) = entry.into_parts();
|
||||
|
||||
if !cloned_op.info().full_capability().list_has_content_length {
|
||||
// TODO(dennis): Use a better API, see https://github.com/apache/opendal/issues/6522
|
||||
if meta.content_length() == 0 {
|
||||
meta = cloned_op.stat(&path).await?;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user