refactor: Bump opendal to 0.51.1 (#5354)

* refactor: Bump opendal to 0.51.1

Signed-off-by: Xuanwo <github@xuanwo.io>

* Ignore dirs from cache

Signed-off-by: Xuanwo <github@xuanwo.io>

* Reduce extra alloc

Signed-off-by: Xuanwo <github@xuanwo.io>

---------

Signed-off-by: Xuanwo <github@xuanwo.io>
This commit is contained in:
Xuanwo
2025-01-14 15:28:09 +08:00
committed by GitHub
parent c370b4b40d
commit 87bd12d6df
9 changed files with 82 additions and 42 deletions

12
Cargo.lock generated
View File

@@ -3857,12 +3857,6 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "flagset"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3ea1ec5f8307826a5b71094dd91fc04d4ae75d5709b20ad351c7fb4815c86ec"
[[package]]
name = "flatbuffers"
version = "23.5.26"
@@ -7160,8 +7154,9 @@ checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9"
[[package]]
name = "opendal"
version = "0.50.2"
source = "git+https://github.com/GreptimeTeam/opendal.git?rev=c82605177f2feec83e49dcaa537c505639d94024#c82605177f2feec83e49dcaa537c505639d94024"
version = "0.51.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c9dcfa7a3615e3c60eb662ed6b46b6f244cf2658098f593c0c0915430b3a268"
dependencies = [
"anyhow",
"async-trait",
@@ -7170,7 +7165,6 @@ dependencies = [
"bytes",
"chrono",
"crc32c",
"flagset",
"futures",
"getrandom",
"http 1.1.0",

View File

@@ -189,7 +189,7 @@ impl StateStore for ObjectStateStore {
async fn batch_delete(&self, keys: &[String]) -> Result<()> {
self.store
.remove(keys.to_vec())
.delete_iter(keys.iter().map(String::as_str))
.await
.with_context(|_| DeleteStateSnafu {
key: format!("{:?}", keys),

View File

@@ -25,7 +25,7 @@ use futures::{FutureExt, TryStreamExt};
use moka::future::Cache;
use moka::notification::RemovalCause;
use object_store::util::join_path;
use object_store::{ErrorKind, Metakey, ObjectStore, Reader};
use object_store::{ErrorKind, ObjectStore, Reader};
use parquet::file::metadata::ParquetMetaData;
use snafu::ResultExt;
use store_api::storage::RegionId;
@@ -195,7 +195,6 @@ impl FileCache {
let mut lister = self
.local_store
.lister_with(FILE_DIR)
.metakey(Metakey::ContentLength)
.await
.context(OpenDalSnafu)?;
// Use i64 for total_size to reduce the risk of overflow.
@@ -209,6 +208,12 @@ impl FileCache {
let Some(key) = parse_index_key(entry.name()) else {
continue;
};
let meta = self
.local_store
.stat(entry.path())
.await
.context(OpenDalSnafu)?;
let file_size = meta.content_length() as u32;
self.memory_index
.insert(key, IndexValue { file_size })

View File

@@ -343,7 +343,7 @@ impl ManifestObjectStore {
);
self.object_store
.remove(paths)
.delete_iter(paths)
.await
.context(OpenDalSnafu)?;

View File

@@ -188,7 +188,7 @@ pub(crate) async fn remove_region_dir_once(
// no parquet file found, delete the region path
// first delete all files other than the marker
object_store
.remove(files_to_remove_first)
.delete_iter(files_to_remove_first)
.await
.context(OpenDalSnafu)?;
// then remove the marker with this dir

View File

@@ -17,7 +17,7 @@ futures.workspace = true
lazy_static.workspace = true
md5 = "0.7"
moka = { workspace = true, features = ["future"] }
opendal = { git = "https://github.com/GreptimeTeam/opendal.git", rev = "c82605177f2feec83e49dcaa537c505639d94024", features = [
opendal = { version = "0.51.1", features = [
"layers-tracing",
"layers-prometheus",
"services-azblob",

View File

@@ -16,8 +16,7 @@ use std::sync::Arc;
use opendal::raw::oio::Reader;
use opendal::raw::{
Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
RpWrite,
Access, Layer, LayeredAccess, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite,
};
use opendal::Result;
mod read_cache;
@@ -26,6 +25,8 @@ use std::time::Instant;
use common_telemetry::{error, info};
use read_cache::ReadCache;
use crate::layers::lru_cache::read_cache::CacheAwareDeleter;
/// An opendal layer with local LRU file cache supporting.
pub struct LruCacheLayer<C: Access> {
// The read cache
@@ -103,6 +104,8 @@ impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
type BlockingWriter = I::BlockingWriter;
type Lister = I::Lister;
type BlockingLister = I::BlockingLister;
type Deleter = CacheAwareDeleter<C, I::Deleter>;
type BlockingDeleter = I::BlockingDeleter;
fn inner(&self) -> &Self::Inner {
&self.inner
@@ -122,12 +125,11 @@ impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
result
}
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let result = self.inner.delete(path, args).await;
self.read_cache.invalidate_entries_with_prefix(path);
result
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
self.inner
.delete()
.await
.map(|(rp, deleter)| (rp, CacheAwareDeleter::new(self.read_cache.clone(), deleter)))
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
@@ -150,4 +152,8 @@ impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
self.inner.blocking_list(path, args)
}
fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
self.inner.blocking_delete()
}
}

View File

@@ -15,12 +15,12 @@
use std::sync::Arc;
use common_telemetry::debug;
use futures::FutureExt;
use futures::{FutureExt, TryStreamExt};
use moka::future::Cache;
use moka::notification::ListenerFuture;
use opendal::raw::oio::{Read, Reader, Write};
use opendal::raw::{Access, OpDelete, OpRead, OpStat, OpWrite, RpRead};
use opendal::{EntryMode, Error as OpendalError, ErrorKind, Metakey, OperatorBuilder, Result};
use opendal::raw::{oio, Access, OpDelete, OpRead, OpStat, OpWrite, RpRead};
use opendal::{Error as OpendalError, ErrorKind, OperatorBuilder, Result};
use crate::metrics::{
OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
@@ -95,7 +95,7 @@ impl<C> Clone for ReadCache<C> {
impl<C: Access> ReadCache<C> {
/// Create a [`ReadCache`] with capacity in bytes.
pub(crate) fn new(file_cache: Arc<C>, capacity: usize) -> Self {
let file_cache_cloned = file_cache.clone();
let file_cache_cloned = OperatorBuilder::new(file_cache.clone()).finish();
let eviction_listener =
move |read_key: Arc<String>, read_result: ReadResult, cause| -> ListenerFuture {
// Delete the file from local file cache when it's purged from mem_cache.
@@ -106,7 +106,7 @@ impl<C: Access> ReadCache<C> {
if let ReadResult::Success(size) = read_result {
OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64);
let result = file_cache_cloned.delete(&read_key, OpDelete::new()).await;
let result = file_cache_cloned.delete(&read_key).await;
debug!(
"Deleted local cache file `{}`, result: {:?}, cause: {:?}.",
read_key, result, cause
@@ -150,24 +150,36 @@ impl<C: Access> ReadCache<C> {
/// Return entry count and total approximate entry size in bytes.
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
let op = OperatorBuilder::new(self.file_cache.clone()).finish();
let cloned_op = op.clone();
let root = read_cache_root();
let mut entries = op
.list_with(&root)
.metakey(Metakey::ContentLength | Metakey::ContentType)
.concurrent(RECOVER_CACHE_LIST_CONCURRENT)
.lister_with(&root)
.await?
.map_ok(|entry| async {
let (path, mut meta) = entry.into_parts();
if !cloned_op.info().full_capability().list_has_content_length {
meta = cloned_op.stat(&path).await?;
}
Ok((path, meta))
})
.try_buffer_unordered(RECOVER_CACHE_LIST_CONCURRENT)
.try_collect::<Vec<_>>()
.await?;
while let Some(entry) = entries.pop() {
let read_key = entry.path();
let size = entry.metadata().content_length();
while let Some((read_key, metadata)) = entries.pop() {
if !metadata.is_file() {
continue;
}
let size = metadata.content_length();
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
// ignore root path
if entry.metadata().mode() == EntryMode::FILE {
self.mem_cache
.insert(read_key.to_string(), ReadResult::Success(size as u32))
.await;
}
self.mem_cache
.insert(read_key.to_string(), ReadResult::Success(size as u32))
.await;
}
Ok(self.cache_stat().await)
@@ -302,6 +314,29 @@ impl<C: Access> ReadCache<C> {
}
}
pub struct CacheAwareDeleter<C, D> {
cache: ReadCache<C>,
deleter: D,
}
impl<C: Access, D: oio::Delete> CacheAwareDeleter<C, D> {
pub(crate) fn new(cache: ReadCache<C>, deleter: D) -> Self {
Self { cache, deleter }
}
}
impl<C: Access, D: oio::Delete> oio::Delete for CacheAwareDeleter<C, D> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.cache.invalidate_entries_with_prefix(path);
self.deleter.delete(path, args)?;
Ok(())
}
async fn flush(&mut self) -> Result<usize> {
self.deleter.flush().await
}
}
fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
(input.0, Box::new(input.1))
}

View File

@@ -15,8 +15,8 @@
pub use opendal::raw::{Access, HttpClient};
pub use opendal::{
services, Buffer, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind,
FuturesAsyncReader, FuturesAsyncWriter, Lister, Metakey, Operator as ObjectStore, Reader,
Result, Writer,
FuturesAsyncReader, FuturesAsyncWriter, Lister, Operator as ObjectStore, Reader, Result,
Writer,
};
pub mod layers;