chore: bump opendal to 0.44 (#3058)

* chore: bump opendal to 0.44

* fix: fix test_object_store_cache_policy

* Revert "fix: fix test_object_store_cache_policy"

This reverts commit 46c37c343f66114e0f6ee7a0a3b9ee2b79c810af.

* fix: fix test_object_store_cache_policy

* fix: fix test_file_backend_with_lru_cache

* chore: apply suggestions from CR

* fix(mito): fix mito2 cache

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-01-05 18:05:41 +09:00
committed by GitHub
parent a89840f5f9
commit e0a43f37d7
9 changed files with 96 additions and 71 deletions

12
Cargo.lock generated
View File

@@ -5502,9 +5502,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "opendal"
version = "0.40.0"
version = "0.44.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddba7299bab261d3ae2f37617fb7f45b19ed872752bb4e22cf93a69d979366c5"
checksum = "c32736a48ef08a5d2212864e2295c8e54f4d6b352b7f49aa0c29a12fc410ff66"
dependencies = [
"anyhow",
"async-compat",
@@ -5515,15 +5515,15 @@ dependencies = [
"chrono",
"flagset",
"futures",
"getrandom",
"http",
"hyper",
"log",
"md-5",
"once_cell",
"parking_lot 0.12.1",
"percent-encoding",
"pin-project",
"quick-xml 0.29.0",
"quick-xml 0.30.0",
"reqsign",
"reqwest",
"serde",
@@ -6939,9 +6939,9 @@ dependencies = [
[[package]]
name = "quick-xml"
version = "0.29.0"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81b9228215d82c7b61490fec1de287136b5de6f5700f6e58ea9ad61a7964ca51"
checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956"
dependencies = [
"memchr",
"serde",

View File

@@ -87,7 +87,7 @@ impl StateStore for ObjectStateStore {
let mut lister = self
.store
.lister_with(path)
.delimiter("")
.recursive(true)
.await
.map_err(|e| {
BoxedError::new(PlainError::new(

View File

@@ -100,6 +100,14 @@ impl FileCache {
self.memory_index.insert(key, value).await;
}
async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
if self.local_store.is_exist(file_path).await? {
Ok(Some(self.local_store.reader(file_path).await?))
} else {
Ok(None)
}
}
/// Reads a file from the cache.
pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
if !self.memory_index.contains_key(&key) {
@@ -108,26 +116,29 @@ impl FileCache {
}
let file_path = self.cache_file_path(key);
match self.local_store.reader(&file_path).await {
Ok(reader) => {
match self.get_reader(&file_path).await {
Ok(Some(reader)) => {
CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
Some(reader)
return Some(reader);
}
Err(e) => {
if e.kind() != ErrorKind::NotFound {
warn!("Failed to get file for key {:?}, err: {}", key, e);
}
// We removes the file from the index.
self.memory_index.remove(&key).await;
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
None
}
Ok(None) => {}
}
// We removes the file from the index.
self.memory_index.remove(&key).await;
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
None
}
/// Removes a file from the cache explicitly.
pub(crate) async fn remove(&self, key: IndexKey) {
let file_path = self.cache_file_path(key);
self.memory_index.remove(&key).await;
if let Err(e) = self.local_store.delete(&file_path).await {
warn!(e; "Failed to delete a cached file {}", file_path);
}

View File

@@ -15,7 +15,7 @@ futures.workspace = true
lazy_static.workspace = true
md5 = "0.7"
moka = { workspace = true, features = ["future"] }
opendal = { version = "0.40", features = [
opendal = { version = "0.44", features = [
"layers-tracing",
] }
prometheus.workspace = true

View File

@@ -81,8 +81,8 @@ impl<I: Accessor, C: Accessor + Clone> LayeredAccessor for LruCacheAccessor<I, C
type BlockingReader = I::BlockingReader;
type Writer = I::Writer;
type BlockingWriter = I::BlockingWriter;
type Pager = I::Pager;
type BlockingPager = I::BlockingPager;
type Lister = I::Lister;
type BlockingLister = I::BlockingLister;
fn inner(&self) -> &Self::Inner {
&self.inner
@@ -112,7 +112,7 @@ impl<I: Accessor, C: Accessor + Clone> LayeredAccessor for LruCacheAccessor<I, C
result
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.inner.list(path, args).await
}
@@ -130,7 +130,7 @@ impl<I: Accessor, C: Accessor + Clone> LayeredAccessor for LruCacheAccessor<I, C
result
}
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
self.inner.blocking_list(path, args)
}
}

View File

@@ -18,7 +18,7 @@ use common_telemetry::logging::debug;
use futures::FutureExt;
use moka::future::Cache;
use moka::notification::ListenerFuture;
use opendal::raw::oio::{Page, Read, ReadExt, Reader, WriteExt};
use opendal::raw::oio::{ListExt, Read, ReadExt, Reader, WriteExt};
use opendal::raw::{Accessor, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead};
use opendal::{Error as OpendalError, ErrorKind, Result};
@@ -135,24 +135,22 @@ impl<C: Accessor + Clone> ReadCache<C> {
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
let (_, mut pager) = self.file_cache.list("/", OpList::default()).await?;
while let Some(entries) = pager.next().await? {
for entry in entries {
let read_key = entry.path();
while let Some(entry) = pager.next().await? {
let read_key = entry.path();
// We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly,
// because it's private field.
let size = {
let stat = self.file_cache.stat(read_key, OpStat::default()).await?;
// We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly,
// because it's private field.
let size = {
let stat = self.file_cache.stat(read_key, OpStat::default()).await?;
stat.into_metadata().content_length()
};
stat.into_metadata().content_length()
};
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
self.mem_cache
.insert(read_key.to_string(), ReadResult::Success(size as u32))
.await;
}
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
self.mem_cache
.insert(read_key.to_string(), ReadResult::Success(size as u32))
.await;
}
Ok(self.stat().await)
@@ -224,6 +222,22 @@ impl<C: Accessor + Clone> ReadCache<C> {
}
}
async fn try_write_cache<I>(&self, mut reader: I::Reader, read_key: &str) -> Result<usize>
where
I: Accessor,
{
let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?;
let mut total = 0;
while let Some(bytes) = reader.next().await {
let bytes = &bytes?;
total += bytes.len();
writer.write(bytes).await?;
}
// Call `close` to ensure data is written.
writer.close().await?;
Ok(total)
}
/// Read the file from remote storage. If success, write the content into local cache.
async fn read_remote<I>(
&self,
@@ -237,24 +251,15 @@ impl<C: Accessor + Clone> ReadCache<C> {
{
OBJECT_STORE_LRU_CACHE_MISS.inc();
let inner_result = inner.read(path, args).await;
let (_, reader) = inner.read(path, args).await?;
let result = self.try_write_cache::<I>(reader, read_key).await;
match inner_result {
Ok((rp, mut reader)) => {
let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?;
while let Some(bytes) = reader.next().await {
writer.write(&bytes?).await?;
}
// Call `close` to ensure data is written.
writer.close().await?;
let read_bytes = rp.metadata().content_length() as u32;
match result {
Ok(read_bytes) => {
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(read_bytes as i64);
Ok(ReadResult::Success(read_bytes))
Ok(ReadResult::Success(read_bytes as u32))
}
Err(e) if e.kind() == ErrorKind::NotFound => {

View File

@@ -124,8 +124,8 @@ impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
type BlockingReader = PrometheusMetricWrapper<A::BlockingReader>;
type Writer = PrometheusMetricWrapper<A::Writer>;
type BlockingWriter = PrometheusMetricWrapper<A::BlockingWriter>;
type Pager = A::Pager;
type BlockingPager = A::BlockingPager;
type Lister = A::Lister;
type BlockingLister = A::BlockingLister;
fn inner(&self) -> &Self::Inner {
&self.inner
@@ -243,7 +243,7 @@ impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
})
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::List.into_static()])
.inc();
@@ -388,7 +388,7 @@ impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
})
}
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::BlockingList.into_static()])
.inc();

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use opendal::raw::oio::Pager;
pub use opendal::raw::{normalize_path as raw_normalize_path, HttpClient};
pub use opendal::{
services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Lister, Metakey,

View File

@@ -248,7 +248,7 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
test_object_crud(&store).await?;
test_object_list(&store).await?;
assert_eq!(cache_layer.read_cache_stat().await, (4, 0));
assert_eq!(cache_layer.read_cache_stat().await, (0, 0));
Ok(())
}
@@ -303,10 +303,11 @@ async fn test_object_store_cache_policy() -> Result<()> {
// create file cache layer
let cache_dir = create_temp_dir("test_object_store_cache_policy_cache");
let atomic_temp_dir = create_temp_dir("test_object_store_cache_policy_cache_tmp");
let mut builder = Fs::default();
let _ = builder
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&cache_dir.path().to_string_lossy());
.atomic_write_dir(&atomic_temp_dir.path().to_string_lossy());
let file_cache = Arc::new(builder.build().unwrap());
let cache_store = OperatorBuilder::new(file_cache.clone()).finish();
@@ -334,9 +335,9 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert_cache_files(
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-",
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14",
],
&["Hello, object1!", "object2!", "Hello, object2!"],
)
@@ -344,9 +345,9 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-",
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14",
],
)
.await;
@@ -357,16 +358,18 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert_eq!(cache_layer.read_cache_stat().await, (1, 15));
assert_cache_files(
&cache_store,
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"],
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14"],
&["Hello, object1!"],
)
.await?;
assert_lru_cache(
&cache_layer,
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"],
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14"],
)
.await;
// Read the deleted file without a deterministic range size requires an extra `stat.`
// Therefore, it won't go into the cache.
assert!(store.read(p2).await.is_err());
let p3 = "test_file3";
@@ -376,13 +379,20 @@ async fn test_object_store_cache_policy() -> Result<()> {
let _ = store.read(p3).await.unwrap();
let _ = store.read_with(p3).range(0..5).await.unwrap();
assert_eq!(cache_layer.read_cache_stat().await, (3, 35));
// However, The real open file happens after the reader is created.
// The reader will throw an error during the reading
// instead of returning `NotFound` during the reader creation.
// The entry count is 4, because we have the p2 `NotFound` cache.
assert!(store.read_with(p2).range(0..4).await.is_err());
assert_eq!(cache_layer.read_cache_stat().await, (4, 35));
assert_cache_files(
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
&["Hello, object1!", "Hello, object3!", "Hello"],
@@ -391,8 +401,8 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)
@@ -409,7 +419,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
&["ello, object1!", "Hello, object3!", "Hello"],
@@ -419,7 +429,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)
@@ -440,7 +450,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)