refactor: migrate OpenDal to 0.39 (#2383)

* chore: bump opendal to 7d552

* refactor: migrate OpenDal to 0.39

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-09-13 21:43:53 +09:00
committed by GitHub
parent d08b05c963
commit 93f3048f4f
11 changed files with 57 additions and 80 deletions

27
Cargo.lock generated
View File

@@ -5991,9 +5991,8 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "opendal"
version = "0.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3555168d4cc9a83c332e1416ff00e3be36a6d78447dff472829962afbc91bb3d"
version = "0.39.0"
source = "git+https://github.com/apache/incubator-opendal.git?rev=7d5524f35f29f7eda8131e8b0873590b7cbe34ab#7d5524f35f29f7eda8131e8b0873590b7cbe34ab"
dependencies = [
"anyhow",
"async-compat",
@@ -6013,7 +6012,7 @@ dependencies = [
"parking_lot",
"percent-encoding",
"pin-project",
"quick-xml 0.27.1",
"quick-xml 0.29.0",
"reqsign",
"reqwest",
"serde",
@@ -7238,19 +7237,9 @@ dependencies = [
[[package]]
name = "quick-xml"
version = "0.27.1"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffc053f057dd768a56f62cd7e434c42c831d296968997e9ac1f76ea7c2d14c41"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "quick-xml"
version = "0.28.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce5e73202a820a31f8a0ee32ada5e21029c81fd9e3ebf668a40832e4219d9d1"
checksum = "81b9228215d82c7b61490fec1de287136b5de6f5700f6e58ea9ad61a7964ca51"
dependencies = [
"memchr",
"serde",
@@ -7497,9 +7486,9 @@ dependencies = [
[[package]]
name = "reqsign"
version = "0.12.0"
version = "0.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b04f5fccb94d61c154f0d8520ec42e79afdc145f4b1a392faa269874995fda66"
checksum = "3228e570df74d69d3d3236a71371f1edd748a3e4eb728ea1f29d403bc10fc727"
dependencies = [
"anyhow",
"async-trait",
@@ -7514,7 +7503,7 @@ dependencies = [
"log",
"once_cell",
"percent-encoding",
"quick-xml 0.28.2",
"quick-xml 0.29.0",
"rand",
"reqwest",
"rsa",

View File

@@ -51,7 +51,7 @@ impl Lister {
Source::Dir => {
let streamer = self
.object_store
.list(&self.path)
.lister_with(&self.path)
.await
.context(error::ListObjectsSnafu { path: &self.path })?;
@@ -76,7 +76,16 @@ impl Lister {
path: &file_full_path,
},
)?;
Ok(vec![Entry::new(&file_full_path)])
Ok(self
.object_store
.list_with(&self.path)
.await
.context(error::ListObjectsSnafu { path: &self.path })?
.into_iter()
.find(|f| f.name() == filename)
.map(|f| vec![f])
.unwrap_or_default())
}
}
}

View File

@@ -458,7 +458,7 @@ mod tests {
use common_error::status_code::StatusCode;
use common_test_util::temp_dir::create_temp_dir;
use futures_util::future::BoxFuture;
use futures_util::{FutureExt, TryStreamExt};
use futures_util::FutureExt;
use object_store::ObjectStore;
use super::*;
@@ -492,11 +492,7 @@ mod tests {
) {
let dir = proc_path!(procedure_store, "{procedure_id}/");
let lister = object_store.list(&dir).await.unwrap();
let mut files_in_dir: Vec<_> = lister
.map_ok(|de| de.name().to_string())
.try_collect()
.await
.unwrap();
let mut files_in_dir: Vec<_> = lister.into_iter().map(|de| de.name().to_string()).collect();
files_in_dir.sort_unstable();
assert_eq!(files, files_in_dir);
}

View File

@@ -20,7 +20,7 @@ use async_trait::async_trait;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use futures::{Stream, StreamExt};
use object_store::{EntryMode, Metakey, ObjectStore};
use object_store::{EntryMode, ObjectStore};
use snafu::ResultExt;
use crate::error::{DeleteStateSnafu, ListStateSnafu, PutStateSnafu, Result};
@@ -86,7 +86,8 @@ impl StateStore for ObjectStateStore {
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream> {
let mut lister = self
.store
.scan(path)
.lister_with(path)
.delimiter("")
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
@@ -110,17 +111,8 @@ impl StateStore for ObjectStateStore {
})
.context(ListStateSnafu { path: &path_string })?;
let key = entry.path();
let metadata = store
.metadata(&entry, Metakey::Mode)
.await
.map_err(|e| {
BoxedError::new(PlainError::new(
e.to_string(),
StatusCode::StorageUnavailable,
))
})
.context(ListStateSnafu { path: key })?;
if let EntryMode::FILE = metadata.mode() {
if let EntryMode::FILE = entry.metadata().mode() {
let value = store
.read(key)
.await

View File

@@ -38,7 +38,7 @@ use datatypes::arrow::compute::can_cast_types;
use datatypes::arrow::datatypes::{Schema, SchemaRef};
use datatypes::vectors::Helper;
use futures_util::StreamExt;
use object_store::{Entry, EntryMode, Metakey, ObjectStore};
use object_store::{Entry, EntryMode, ObjectStore};
use regex::Regex;
use session::context::QueryContextRef;
use snafu::ResultExt;
@@ -256,11 +256,7 @@ impl StatementExecutor {
let table_schema = table.schema().arrow_schema().clone();
for entry in entries.iter() {
let metadata = object_store
.metadata(entry, Metakey::Mode)
.await
.context(error::ReadObjectSnafu { path: entry.path() })?;
if metadata.mode() != EntryMode::FILE {
if entry.metadata().mode() != EntryMode::FILE {
continue;
}
let path = entry.path();

View File

@@ -168,7 +168,7 @@ impl ManifestObjectStore {
where
F: Fn(Entry) -> Option<R>,
{
let streamer = match self.object_store.list(&self.path).await {
let streamer = match self.object_store.lister_with(&self.path).await {
Ok(streamer) => streamer,
Err(e) if e.kind() == ErrorKind::NotFound => {
debug!("Manifest directory does not exists: {}", self.path);

View File

@@ -19,14 +19,14 @@ use std::time::Duration;
use common_query::Output;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use futures::StreamExt;
use futures::TryStreamExt;
use object_store::util::join_path;
use object_store::{EntryMode, Metakey, ObjectStore};
use object_store::{EntryMode, ObjectStore};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::time::sleep;
use crate::error::{OpenDalSnafu, RegionNotFoundSnafu, Result};
use crate::error::{self, OpenDalSnafu, RegionNotFoundSnafu, Result};
use crate::region::RegionMapRef;
use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
@@ -115,17 +115,16 @@ pub(crate) async fn remove_region_dir_once(
let mut has_parquet_file = false;
// record all paths that neither ends with .parquet nor the marker file
let mut files_to_remove_first = vec![];
let mut files = object_store.scan(region_path).await.context(OpenDalSnafu)?;
while let Some(file) = files.next().await {
let file = file.context(OpenDalSnafu)?;
let mut files = object_store
.lister_with(region_path)
.await
.context(OpenDalSnafu)?;
while let Some(file) = files.try_next().await.context(error::OpenDalSnafu)? {
if file.path().ends_with(".parquet") {
has_parquet_file = true;
break;
} else if !file.path().ends_with(DROPPING_MARKER_FILE) {
let meta = object_store
.metadata(&file, Metakey::Mode)
.await
.context(OpenDalSnafu)?;
let meta = file.metadata();
if meta.mode() == EntryMode::FILE {
files_to_remove_first.push(file.path().to_string());
}

View File

@@ -11,7 +11,10 @@ futures = { version = "0.3" }
lru = "0.9"
md5 = "0.7"
metrics.workspace = true
opendal = { version = "0.36", features = ["layers-tracing", "layers-metrics"] }
opendal = { git = "https://github.com/apache/incubator-opendal.git", rev = "7d5524f35f29f7eda8131e8b0873590b7cbe34ab", features = [
"layers-tracing",
"layers-metrics",
] }
pin-project = "1.0"
tokio.workspace = true
uuid.workspace = true

View File

@@ -19,10 +19,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use lru::LruCache;
use metrics::increment_counter;
use opendal::raw::oio::{Page, Read, ReadExt, Reader, Write};
use opendal::raw::oio::{Page, Read, ReadExt, Reader, WriteExt};
use opendal::raw::{
Accessor, Layer, LayeredAccessor, OpAppend, OpDelete, OpList, OpRead, OpWrite, RpAppend,
RpDelete, RpList, RpRead, RpWrite,
Accessor, Layer, LayeredAccessor, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
RpWrite,
};
use opendal::{ErrorKind, Result};
use tokio::sync::Mutex;
@@ -114,7 +114,6 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
type BlockingWriter = I::BlockingWriter;
type Pager = I::Pager;
type BlockingPager = I::BlockingPager;
type Appender = I::Appender;
fn inner(&self) -> &Self::Inner {
&self.inner
@@ -146,7 +145,7 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
let (_, mut writer) = self.cache.write(&cache_path, OpWrite::new()).await?;
while let Some(bytes) = reader.next().await {
writer.write(bytes?).await?;
writer.write(&bytes?).await?;
}
writer.close().await?;
@@ -178,10 +177,6 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
self.inner.write(path, args).await
}
async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner.append(path, args).await
}
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let cache_path = md5::compute(path);
let lru_cache = &self.lru_cache;

View File

@@ -21,7 +21,7 @@ use common_test_util::temp_dir::create_temp_dir;
use object_store::layers::LruCacheLayer;
use object_store::services::{Fs, S3};
use object_store::test_util::TempFolder;
use object_store::{util, ObjectStore, ObjectStoreBuilder};
use object_store::{ObjectStore, ObjectStoreBuilder};
use opendal::raw::Accessor;
use opendal::services::{Azblob, Gcs, Oss};
use opendal::{EntryMode, Operator, OperatorBuilder};
@@ -37,7 +37,7 @@ async fn test_object_crud(store: &ObjectStore) -> Result<()> {
assert_eq!("Hello, World!", String::from_utf8(bs)?);
// Read range from object;
let bs = store.range_read(file_name, 1..=11).await?;
let bs = store.read_with(file_name).range(1..=11).await?;
assert_eq!("ello, World", String::from_utf8(bs)?);
// Get object's Metadata
@@ -62,8 +62,7 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> {
store.write(p3, "Hello, object3!").await?;
// List objects
let lister = store.list("/").await?;
let entries = util::collect(lister).await?;
let entries = store.list("/").await?;
assert_eq!(3, entries.len());
store.delete(p1).await?;
@@ -71,7 +70,7 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> {
// List objects again
// Only o2 is exists
let entries = util::collect(store.list("/").await?).await?;
let entries = store.list("/").await?;
assert_eq!(1, entries.len());
assert_eq!(p2, entries.get(0).unwrap().path());
@@ -79,7 +78,7 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> {
assert_eq!("Hello, object2!", String::from_utf8(content)?);
store.delete(p2).await?;
let entries = util::collect(store.list("/").await?).await?;
let entries = store.list("/").await?;
assert!(entries.is_empty());
Ok(())
}
@@ -225,8 +224,7 @@ async fn assert_cache_files(
file_names: &[&str],
file_contents: &[&str],
) -> Result<()> {
let obs = store.list("/").await?;
let objects = util::collect(obs).await?;
let objects = store.list("/").await?;
// compare the cache file with the expected cache file; ignore orders
for o in objects {
@@ -284,10 +282,10 @@ async fn test_object_store_cache_policy() -> Result<()> {
store.write(p2, "Hello, object2!").await.unwrap();
// create cache by read object
let _ = store.range_read(p1, 0..).await?;
let _ = store.read_with(p1).range(0..).await?;
let _ = store.read(p1).await?;
let _ = store.range_read(p2, 0..).await?;
let _ = store.range_read(p2, 7..).await?;
let _ = store.read_with(p2).range(0..).await?;
let _ = store.read_with(p2).range(7..).await?;
let _ = store.read(p2).await?;
assert_cache_files(
@@ -327,7 +325,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
store.write(p3, "Hello, object3!").await.unwrap();
let _ = store.read(p3).await.unwrap();
let _ = store.range_read(p3, 0..5).await.unwrap();
let _ = store.read_with(p3).range(0..5).await.unwrap();
assert_cache_files(
&cache_store,

View File

@@ -177,7 +177,7 @@ impl ManifestObjectStore {
{
let streamer = self
.object_store
.list(&self.path)
.lister_with(&self.path)
.await
.context(ListObjectsSnafu { path: &self.path })?;
streamer