From a796dbf1a0759cd0bcc62ad6122c8164cd7c5d04 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Mon, 1 Jun 2026 15:45:44 +0800 Subject: [PATCH] chore: update opendal to 0.57 (#8204) * chore: update opendal to 0.57 Signed-off-by: shuiyisong * chore: remove duplicate compat to use opendal's Signed-off-by: shuiyisong --------- Signed-off-by: shuiyisong --- Cargo.lock | 87 +- Cargo.toml | 2 +- src/common/datasource/Cargo.toml | 1 + src/common/datasource/src/file_format.rs | 2 +- .../datasource/src/file_format/tests.rs | 2 +- src/common/datasource/src/object_store/oss.rs | 24 +- src/common/datasource/src/test_util.rs | 4 +- src/file-engine/Cargo.toml | 1 + src/file-engine/src/query/file_stream.rs | 2 +- src/object-store/Cargo.toml | 2 +- src/object-store/src/compat.rs | 1045 ----------------- src/object-store/src/layers/mock.rs | 30 +- src/object-store/src/lib.rs | 1 - 13 files changed, 98 insertions(+), 1105 deletions(-) delete mode 100644 src/object-store/src/compat.rs diff --git a/Cargo.lock b/Cargo.lock index 63ba289947..7c6cb93c2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2278,6 +2278,7 @@ dependencies = [ "futures", "lazy_static", "object-store", + "object_store_opendal", "orc-rust", "parquet", "paste", @@ -5102,6 +5103,7 @@ dependencies = [ "datatypes", "futures", "object-store", + "object_store_opendal", "serde", "serde_json", "snafu 0.8.6", @@ -9074,8 +9076,9 @@ dependencies = [ [[package]] name = "object_store_opendal" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eb12a624a41fce745838d0ef3701ff6c47797c13cd18ad3612fd2a3134fdbd8" dependencies = [ "async-trait", "bytes", @@ -9162,8 +9165,9 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "opendal" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c9c85ce253ff87225e7669979d877a20c98a06604ec9d6dd5f4473e08f1ae1" dependencies = [ "ctor", "opendal-core", @@ -9183,8 +9187,9 @@ dependencies = [ [[package]] name = "opendal-core" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4f8607c90e2c963a91467f50fb49fbc7fb3d573f88cea219ca59ccd3740b309" dependencies = [ "anyhow", "base64 0.22.1", @@ -9210,8 +9215,9 @@ dependencies = [ [[package]] name = "opendal-layer-concurrent-limit" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d6f81ba6960e3fae1882f253b114b21d7e444e1534f209c7737a79f6243eb6f" dependencies = [ "futures", "http 1.3.1", @@ -9221,8 +9227,9 @@ dependencies = [ [[package]] name = "opendal-layer-logging" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58ada45c6d81d1aa4c9305d0c7d4bc317c59c85866a0908a2d75a7a978aa5ee2" dependencies = [ "log", "opendal-core", @@ -9230,8 +9237,9 @@ dependencies = [ [[package]] name = "opendal-layer-observe-metrics-common" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "628b0228fdbd13c3d9d50eee4341f2eb82ca5b44991e4c68f07c84cc823e2d12" dependencies = [ "futures", "http 1.3.1", @@ -9240,8 +9248,9 @@ dependencies = [ [[package]] name = "opendal-layer-prometheus" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0487bdb1357097ec8654781bad03ef310282517738e2864ebde69e27aaafc5ec" dependencies = [ "opendal-core", "opendal-layer-observe-metrics-common", @@ -9250,8 +9259,9 @@ dependencies = [ [[package]] name = "opendal-layer-retry" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2a25a718afb81fad81cb9a0580a1cb989221fa2317f888c6a37f8dad408eb7" dependencies = [ "backon", "log", @@ -9260,8 +9270,9 @@ dependencies = [ [[package]] name = "opendal-layer-timeout" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e91f731724c213af81e9d03517859c8fc47b4578e64ad61ae4f099f10fe36e3" dependencies = [ "opendal-core", "tokio", @@ -9269,8 +9280,9 @@ dependencies = [ [[package]] name = "opendal-layer-tracing" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90c6fc9df6da1f0dafbdf55fa48525f1643aefbe7da8f46936e869e2a5b8a34f" dependencies = [ "futures", "http 1.3.1", @@ -9280,8 +9292,9 @@ dependencies = [ [[package]] name = "opendal-service-azblob" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0030644366ef5d8cbe3a4a5822bf99a4aafddc1666e9d24b44d158d9062fc76a" dependencies = [ "base64 0.22.1", "bytes", @@ -9300,8 +9313,9 @@ dependencies = [ [[package]] name = "opendal-service-azure-common" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b489f13c42e69d69bdd72952b634356ec43a7881a20259b38b540fcecdf4051" dependencies = [ "http 1.3.1", "opendal-core", @@ -9309,8 +9323,9 @@ dependencies = [ [[package]] name = "opendal-service-fs" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22e89a665fef0e6bd249cf5ea47fc174b7ba892159bee4b9382528b1ca873a2c" dependencies = [ "bytes", "log", @@ -9322,8 +9337,9 @@ dependencies = [ [[package]] name = "opendal-service-gcs" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48de101aac565ed06af4b47903c24eafd249075553ec1fb18256751c45148d47" dependencies = [ "async-trait", "bytes", @@ -9342,8 +9358,9 @@ dependencies = [ [[package]] name = "opendal-service-http" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb6af628a0bf14075b957179444927e1df40dc7addef382b585a05ef015a077b" dependencies = [ "http 1.3.1", "log", @@ -9353,8 +9370,9 @@ dependencies = [ [[package]] name = "opendal-service-oss" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "328fa55e8888cbdfe00826bfea2a79042422b720e8369e9e021e46121dea5ace" dependencies = [ "bytes", "http 1.3.1", @@ -9369,8 +9387,9 @@ dependencies = [ [[package]] name = "opendal-service-s3" -version = "0.56.0" -source = "git+https://github.com/apache/opendal.git?rev=4ad2d85296ffa6fdc2882f97d3c760ee243913f7#4ad2d85296ffa6fdc2882f97d3c760ee243913f7" +version = "0.57.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "313d46c9f5ae70bca26b7c3e3fbb9b639292625f28af73aa016f47e788af9deb" dependencies = [ "base64 0.22.1", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 32407f31cf..56200a24d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -178,7 +178,7 @@ nalgebra = "0.33" nix = { version = "0.30.1", default-features = false, features = ["event", "fs", "process"] } notify = "8.0" num_cpus = "1.16" -object_store_opendal = { git = "https://github.com/apache/opendal.git", rev = "4ad2d85296ffa6fdc2882f97d3c760ee243913f7" } +object_store_opendal = "0.57" once_cell = "1.18" opentelemetry-proto = { version = "0.31", features = [ "gen-tonic", diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index 470b5371f7..8b4053db2f 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -33,6 +33,7 @@ datatypes.workspace = true futures.workspace = true lazy_static.workspace = true object-store.workspace = true +object_store_opendal.workspace = true orc-rust = { version = "0.8", default-features = false, features = ["async"] } parquet.workspace = true paste.workspace = true diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index a6a358c9e4..e36f94c0d2 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -316,7 +316,7 @@ pub async fn file_to_stream( .with_file_compression_type(df_compression) .build(); - let store = Arc::new(object_store::compat::OpendalStore::new(store.clone())); + let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone())); let file_opener = config.file_source().create_file_opener(store, &config, 0)?; let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())?; diff --git a/src/common/datasource/src/file_format/tests.rs b/src/common/datasource/src/file_format/tests.rs index 93ab3b4409..a925f73d48 100644 --- a/src/common/datasource/src/file_format/tests.rs +++ b/src/common/datasource/src/file_format/tests.rs @@ -44,7 +44,7 @@ struct Test<'a> { impl Test<'_> { async fn run(self, store: &ObjectStore) { - let store = Arc::new(object_store::compat::OpendalStore::new(store.clone())); + let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone())); let file_opener = self .file_source .create_file_opener(store, &self.config, 0) diff --git a/src/common/datasource/src/object_store/oss.rs b/src/common/datasource/src/object_store/oss.rs index aded3eca2c..aacc17ac5e 100644 --- a/src/common/datasource/src/object_store/oss.rs +++ b/src/common/datasource/src/object_store/oss.rs @@ -27,12 +27,14 @@ const ACCESS_KEY_ID: &str = "access_key_id"; const ACCESS_KEY_SECRET: &str = "access_key_secret"; const ROOT: &str = "root"; const ALLOW_ANONYMOUS: &str = "allow_anonymous"; +const SKIP_SIGNATURE: &str = "skip_signature"; /// Check if the key is supported in OSS configuration. pub fn is_supported_in_oss(key: &str) -> bool { [ ROOT, ALLOW_ANONYMOUS, + SKIP_SIGNATURE, BUCKET, ENDPOINT, ACCESS_KEY_ID, @@ -61,18 +63,23 @@ pub fn build_oss_backend( builder = builder.access_key_secret(access_key_secret); } - if let Some(allow_anonymous) = connection.get(ALLOW_ANONYMOUS) { - let allow = allow_anonymous.as_str().parse::().map_err(|e| { + if let Some((key, value)) = connection + .get(SKIP_SIGNATURE) + .map(|value| (SKIP_SIGNATURE, value)) + .or_else(|| { + connection + .get(ALLOW_ANONYMOUS) + .map(|value| (ALLOW_ANONYMOUS, value)) + }) + { + let skip_signature = value.as_str().parse::().map_err(|e| { error::InvalidConnectionSnafu { - msg: format!( - "failed to parse the option {}={}, {}", - ALLOW_ANONYMOUS, allow_anonymous, e - ), + msg: format!("failed to parse the option {}={}, {}", key, value, e), } .build() })?; - if allow { - builder = builder.allow_anonymous(); + if skip_signature { + builder = builder.skip_signature(); } } @@ -93,6 +100,7 @@ mod tests { fn test_is_supported_in_oss() { assert!(is_supported_in_oss(ROOT)); assert!(is_supported_in_oss(ALLOW_ANONYMOUS)); + assert!(is_supported_in_oss(SKIP_SIGNATURE)); assert!(is_supported_in_oss(BUCKET)); assert!(is_supported_in_oss(ENDPOINT)); assert!(is_supported_in_oss(ACCESS_KEY_ID)); diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs index 0a13d9c6e8..ea2b0c768c 100644 --- a/src/common/datasource/src/test_util.rs +++ b/src/common/datasource/src/test_util.rs @@ -103,7 +103,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi test_util::TEST_BATCH_SIZE, schema.clone(), FileCompressionType::UNCOMPRESSED, - Arc::new(object_store::compat::OpendalStore::new(store.clone())), + Arc::new(object_store_opendal::OpendalStore::new(store.clone())), true, ); @@ -157,7 +157,7 @@ pub async fn setup_stream_to_csv_test( let csv_opener = csv_source .create_file_opener( - Arc::new(object_store::compat::OpendalStore::new(store.clone())), + Arc::new(object_store_opendal::OpendalStore::new(store.clone())), &config, 0, ) diff --git a/src/file-engine/Cargo.toml b/src/file-engine/Cargo.toml index 6c8c9e887d..9d031cb279 100644 --- a/src/file-engine/Cargo.toml +++ b/src/file-engine/Cargo.toml @@ -29,6 +29,7 @@ datafusion-expr.workspace = true datatypes.workspace = true futures.workspace = true object-store.workspace = true +object_store_opendal.workspace = true serde = { version = "1.0", features = ["derive"] } serde_json.workspace = true snafu.workspace = true diff --git a/src/file-engine/src/query/file_stream.rs b/src/file-engine/src/query/file_stream.rs index eec8f8961d..a480a50374 100644 --- a/src/file-engine/src/query/file_stream.rs +++ b/src/file-engine/src/query/file_stream.rs @@ -61,7 +61,7 @@ fn build_record_batch_stream( .with_file_group(FileGroup::new(files)) .build(); - let store = Arc::new(object_store::compat::OpendalStore::new( + let store = Arc::new(object_store_opendal::OpendalStore::new( scan_plan_config.store.clone(), )); diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 0815f066ba..9ca68bb780 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -24,7 +24,7 @@ derive_builder = { workspace = true, optional = true } futures.workspace = true humantime-serde.workspace = true lazy_static.workspace = true -opendal = { git = "https://github.com/apache/opendal.git", rev = "4ad2d85296ffa6fdc2882f97d3c760ee243913f7", features = [ +opendal = { version = "0.57", features = [ "layers-tracing", "layers-prometheus", "services-azblob", diff --git a/src/object-store/src/compat.rs b/src/object-store/src/compat.rs deleted file mode 100644 index 4498f8f3be..0000000000 --- a/src/object-store/src/compat.rs +++ /dev/null @@ -1,1045 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::fmt::{self, Debug, Display, Formatter}; -use std::future::IntoFuture; -use std::io; -use std::ops::Range; -use std::sync::Arc; - -use async_trait::async_trait; -use bytes::Bytes; -use datafusion_object_store::path::Path; -use datafusion_object_store::{ - Attribute, Attributes, CopyMode, CopyOptions, GetOptions, GetRange, GetResult, - GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore as ArrowObjectStore, - PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, UploadPart, -}; -use futures::stream::BoxStream; -use futures::{FutureExt, StreamExt, TryStreamExt}; -use opendal::options::CopyOptions as OpendalCopyOptions; -use opendal::raw::percent_decode_path; -use opendal::{Buffer, Operator, OperatorInfo, Writer}; -use tokio::sync::{Mutex, oneshot}; - -/// OpendalStore implements ObjectStore trait by using opendal. -/// -/// This allows users to use opendal as an object store without extra cost. -/// -/// Visit [`opendal::services`] for more information about supported services. -/// -/// ```no_run -/// use std::sync::Arc; -/// -/// use bytes::Bytes; -/// use object_store::path::Path; -/// use object_store::ObjectStore; -/// use object_store_opendal::OpendalStore; -/// use opendal::services::S3; -/// use opendal::{Builder, Operator}; -/// -/// #[tokio::main] -/// async fn main() { -/// let builder = S3::default() -/// .access_key_id("my_access_key") -/// .secret_access_key("my_secret_key") -/// .endpoint("my_endpoint") -/// .region("my_region"); -/// -/// // Create a new operator -/// let operator = Operator::new(builder).unwrap().finish(); -/// -/// // Create a new object store -/// let object_store = Arc::new(OpendalStore::new(operator)); -/// -/// let path = Path::from("data/nested/test.txt"); -/// let bytes = Bytes::from_static(b"hello, world! I am nested."); -/// -/// object_store.put(&path, bytes.clone().into()).await.unwrap(); -/// -/// let content = object_store -/// .get(&path) -/// .await -/// .unwrap() -/// .bytes() -/// .await -/// .unwrap(); -/// -/// assert_eq!(content, bytes); -/// } -/// ``` -#[derive(Clone)] -pub struct OpendalStore { - info: Arc, - inner: Operator, -} - -impl OpendalStore { - /// Create OpendalStore by given Operator. - pub fn new(op: Operator) -> Self { - Self { - info: op.info().into(), - inner: op, - } - } - - /// Get the Operator info. - pub fn info(&self) -> &OperatorInfo { - self.info.as_ref() - } - - /// Copy a file from one location to another. - async fn copy_request( - &self, - from: &Path, - to: &Path, - if_not_exists: bool, - ) -> datafusion_object_store::Result<()> { - let mut copy_options = OpendalCopyOptions::default(); - if if_not_exists { - copy_options.if_not_exists = true; - } - - // Perform the copy operation - self.inner - .copy_options( - &percent_decode_path(from.as_ref()), - &percent_decode_path(to.as_ref()), - copy_options, - ) - .await - .map_err(|err| { - if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists { - datafusion_object_store::Error::AlreadyExists { - path: to.to_string(), - source: Box::new(err), - } - } else { - format_object_store_error(err, from.as_ref()) - } - })?; - - Ok(()) - } -} - -impl Debug for OpendalStore { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("OpendalStore") - .field("scheme", &self.info.scheme()) - .field("name", &self.info.name()) - .field("root", &self.info.root()) - .field("capability", &self.info.full_capability()) - .finish() - } -} - -impl Display for OpendalStore { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - let info = self.inner.info(); - write!( - f, - "Opendal({}, bucket={}, root={})", - info.scheme(), - info.name(), - info.root() - ) - } -} - -impl From for OpendalStore { - fn from(value: Operator) -> Self { - Self::new(value) - } -} - -#[async_trait] -impl ArrowObjectStore for OpendalStore { - async fn put_opts( - &self, - location: &Path, - bytes: PutPayload, - opts: PutOptions, - ) -> datafusion_object_store::Result { - let decoded_location = percent_decode_path(location.as_ref()); - let mut future_write = self - .inner - .write_with(&decoded_location, Buffer::from_iter(bytes)); - let opts_mode = opts.mode.clone(); - match opts.mode { - PutMode::Overwrite => {} - PutMode::Create => { - future_write = future_write.if_not_exists(true); - } - PutMode::Update(update_version) => { - let Some(etag) = update_version.e_tag else { - return Err(datafusion_object_store::Error::NotSupported { - source: Box::new(opendal::Error::new( - opendal::ErrorKind::Unsupported, - "etag is required for conditional put", - )), - }); - }; - future_write = future_write.if_match(etag.as_str()); - } - } - let rp = future_write.await.map_err(|err| { - match format_object_store_error(err, location.as_ref()) { - datafusion_object_store::Error::Precondition { path, source } - if opts_mode == PutMode::Create => - { - datafusion_object_store::Error::AlreadyExists { path, source } - } - e => e, - } - })?; - - let e_tag = rp.etag().map(|s| s.to_string()); - let version = rp.version().map(|s| s.to_string()); - - Ok(PutResult { e_tag, version }) - } - - async fn put_multipart_opts( - &self, - location: &Path, - opts: PutMultipartOptions, - ) -> datafusion_object_store::Result> { - const DEFAULT_CONCURRENT: usize = 8; - - let mut options = opendal::options::WriteOptions { - concurrent: DEFAULT_CONCURRENT, - ..Default::default() - }; - - let mut user_metadata = HashMap::new(); - - for (key, value) in opts.attributes.iter() { - match key { - Attribute::CacheControl => { - options.cache_control = Some(value.to_string()); - } - Attribute::ContentDisposition => { - options.content_disposition = Some(value.to_string()); - } - Attribute::ContentEncoding => { - options.content_encoding = Some(value.to_string()); - } - Attribute::ContentLanguage => continue, - Attribute::ContentType => { - options.content_type = Some(value.to_string()); - } - Attribute::Metadata(k) => { - user_metadata.insert(k.to_string(), value.to_string()); - } - _ => {} - } - } - - if !user_metadata.is_empty() { - options.user_metadata = Some(user_metadata); - } - - let decoded_location = percent_decode_path(location.as_ref()); - let writer = self - .inner - .writer_options(&decoded_location, options) - .await - .map_err(|err| format_object_store_error(err, location.as_ref()))?; - let upload = OpendalMultipartUpload::new(writer, location.clone()); - - Ok(Box::new(upload)) - } - - async fn get_opts( - &self, - location: &Path, - options: GetOptions, - ) -> datafusion_object_store::Result { - let raw_location = percent_decode_path(location.as_ref()); - let meta = { - let mut s = self.inner.stat_with(&raw_location); - if let Some(version) = &options.version { - s = s.version(version.as_str()) - } - if let Some(if_match) = &options.if_match { - s = s.if_match(if_match.as_str()); - } - if let Some(if_none_match) = &options.if_none_match { - s = s.if_none_match(if_none_match.as_str()); - } - if let Some(if_modified_since) = - options.if_modified_since.and_then(datetime_to_timestamp) - { - s = s.if_modified_since(if_modified_since); - } - if let Some(if_unmodified_since) = - options.if_unmodified_since.and_then(datetime_to_timestamp) - { - s = s.if_unmodified_since(if_unmodified_since); - } - s.await - .map_err(|err| format_object_store_error(err, location.as_ref()))? - }; - - let mut attributes = Attributes::new(); - if let Some(user_meta) = meta.user_metadata() { - for (key, value) in user_meta { - attributes.insert( - Attribute::Metadata(key.clone().into()), - value.clone().into(), - ); - } - } - - let meta = ObjectMeta { - location: location.clone(), - last_modified: meta - .last_modified() - .and_then(timestamp_to_datetime) - .unwrap_or_default(), - size: meta.content_length(), - e_tag: meta.etag().map(|x| x.to_string()), - version: meta.version().map(|x| x.to_string()), - }; - - if options.head { - return Ok(GetResult { - payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())), - range: 0..0, - meta, - attributes, - }); - } - - let reader = { - let mut r = self.inner.reader_with(raw_location.as_ref()); - if let Some(version) = options.version { - r = r.version(version.as_str()); - } - if let Some(if_match) = options.if_match { - r = r.if_match(if_match.as_str()); - } - if let Some(if_none_match) = options.if_none_match { - r = r.if_none_match(if_none_match.as_str()); - } - if let Some(if_modified_since) = - options.if_modified_since.and_then(datetime_to_timestamp) - { - r = r.if_modified_since(if_modified_since); - } - if let Some(if_unmodified_since) = - options.if_unmodified_since.and_then(datetime_to_timestamp) - { - r = r.if_unmodified_since(if_unmodified_since); - } - r.await - .map_err(|err| format_object_store_error(err, location.as_ref()))? - }; - - let read_range = match options.range { - Some(GetRange::Bounded(r)) => { - if r.start >= r.end || r.start >= meta.size { - 0..0 - } else { - let end = r.end.min(meta.size); - r.start..end - } - } - Some(GetRange::Offset(r)) => { - if r < meta.size { - r..meta.size - } else { - 0..0 - } - } - Some(GetRange::Suffix(r)) if r < meta.size => (meta.size - r)..meta.size, - _ => 0..meta.size, - }; - - let stream = reader - .into_bytes_stream(read_range.start..read_range.end) - .await - .map_err(|err| format_object_store_error(err, location.as_ref()))? - .map_ok(|buf| buf) - .map_err(|err: io::Error| datafusion_object_store::Error::Generic { - store: "IoError", - source: Box::new(err), - }); - - Ok(GetResult { - payload: GetResultPayload::Stream(Box::pin(stream)), - range: read_range.start..read_range.end, - meta, - attributes, - }) - } - - async fn get_ranges( - &self, - location: &Path, - ranges: &[Range], - ) -> datafusion_object_store::Result> { - if ranges.is_empty() { - return Ok(Vec::new()); - } - - let raw_location = percent_decode_path(location.as_ref()); - let reader = self - .inner - .reader_with(raw_location.as_ref()) - .await - .map_err(|err| format_object_store_error(err, location.as_ref()))?; - let buffers = reader - .fetch(ranges.to_vec()) - .await - .map_err(|err| format_object_store_error(err, location.as_ref()))?; - - Ok(buffers.into_iter().map(|buf| buf.to_bytes()).collect()) - } - - fn delete_stream( - &self, - locations: BoxStream<'static, datafusion_object_store::Result>, - ) -> BoxStream<'static, datafusion_object_store::Result> { - let this = self.clone(); - locations - .map(move |location| { - let this = this.clone(); - async move { - let location = location?; - let decoded_location = percent_decode_path(location.as_ref()); - this.inner - .delete(&decoded_location) - .await - .map_err(|err| format_object_store_error(err, location.as_ref()))?; - Ok(location) - } - }) - .buffered(10) - .boxed() - } - - fn list( - &self, - prefix: Option<&Path>, - ) -> BoxStream<'static, datafusion_object_store::Result> { - // object_store `Path` always removes trailing slash - // need to add it back - let path = prefix.map_or("".into(), |x| { - format!("{}/", percent_decode_path(x.as_ref())) - }); - - let this = self.clone(); - let fut = async move { - let stream = this - .inner - .lister_with(&path) - .recursive(true) - .await - .map_err(|err| format_object_store_error(err, &path))?; - - let stream = stream.then(|res| async { - let entry = res.map_err(|err| format_object_store_error(err, ""))?; - let meta = entry.metadata(); - - Ok(format_object_meta(entry.path(), meta)) - }); - Ok::<_, datafusion_object_store::Error>(stream) - }; - - fut.into_stream().try_flatten().boxed() - } - - fn list_with_offset( - &self, - prefix: Option<&Path>, - offset: &Path, - ) -> BoxStream<'static, datafusion_object_store::Result> { - let path = prefix.map_or("".into(), |x| { - format!("{}/", percent_decode_path(x.as_ref())) - }); - let offset = offset.clone(); - - // clone self for 'static lifetime - // clone self is cheap - let this = self.clone(); - - let fut = async move { - let list_with_start_after = this.inner.info().full_capability().list_with_start_after; - let mut fut = this.inner.lister_with(&path).recursive(true); - - // Use native start_after support if possible. - if list_with_start_after { - fut = fut.start_after(offset.as_ref()); - } - - let lister = fut - .await - .map_err(|err| format_object_store_error(err, &path))? - .then(move |entry| { - let path = path.clone(); - let this = this.clone(); - async move { - let entry = entry.map_err(|err| format_object_store_error(err, &path))?; - let (path, metadata) = entry.into_parts(); - - // If it's a dir or last_modified is present, we can use it directly. - if metadata.is_dir() || metadata.last_modified().is_some() { - let object_meta = format_object_meta(&path, &metadata); - return Ok(object_meta); - } - - let metadata = this - .inner - .stat(&path) - .await - .map_err(|err| format_object_store_error(err, &path))?; - let object_meta = format_object_meta(&path, &metadata); - Ok::<_, datafusion_object_store::Error>(object_meta) - } - }) - .boxed(); - - let stream = if list_with_start_after { - lister - } else { - lister - .try_filter(move |entry| futures::future::ready(entry.location > offset)) - .boxed() - }; - - Ok::<_, datafusion_object_store::Error>(stream) - }; - - fut.into_stream().try_flatten().boxed() - } - - async fn list_with_delimiter( - &self, - prefix: Option<&Path>, - ) -> datafusion_object_store::Result { - let path = prefix.map_or("".into(), |x| { - format!("{}/", percent_decode_path(x.as_ref())) - }); - let mut stream = self - .inner - .lister_with(&path) - .into_future() - .await - .map_err(|err| format_object_store_error(err, &path))?; - - let mut common_prefixes = Vec::new(); - let mut objects = Vec::new(); - - while let Some(res) = stream.next().await { - let entry = res.map_err(|err| format_object_store_error(err, ""))?; - let meta = entry.metadata(); - - if meta.is_dir() { - common_prefixes.push(entry.path().into()); - } else if meta.last_modified().is_some() { - objects.push(format_object_meta(entry.path(), meta)); - } else { - let meta = self - .inner - .stat(entry.path()) - .await - .map_err(|err| format_object_store_error(err, entry.path()))?; - objects.push(format_object_meta(entry.path(), &meta)); - } - } - - Ok(ListResult { - common_prefixes, - objects, - }) - } - - async fn copy_opts( - &self, - from: &Path, - to: &Path, - options: CopyOptions, - ) -> datafusion_object_store::Result<()> { - let if_not_exists = options.mode == CopyMode::Create; - self.copy_request(from, to, if_not_exists).await - } -} - -/// `MultipartUpload` implementation based on `Writer` in opendal. -/// -/// # Notes -/// -/// OpenDAL writer can handle concurrent internally we don't generate real `UploadPart` like existing -/// implementation do. Instead, we just write the part and notify the next task to be written. -/// -/// The lock here doesn't really involve the write process, it's just for the notify mechanism. -struct OpendalMultipartUpload { - writer: Arc>, - location: Path, - next_notify: oneshot::Receiver<()>, -} - -impl OpendalMultipartUpload { - fn new(writer: Writer, location: Path) -> Self { - // an immediately dropped sender for the first part to write without waiting - let (_, rx) = oneshot::channel(); - - Self { - writer: Arc::new(Mutex::new(writer)), - location, - next_notify: rx, - } - } -} - -#[async_trait] -impl MultipartUpload for OpendalMultipartUpload { - fn put_part(&mut self, data: PutPayload) -> UploadPart { - let writer = self.writer.clone(); - let location = self.location.clone(); - - // Generate next notify which will be notified after the current part is written. - let (tx, rx) = oneshot::channel(); - // Fetch the notify for current part to wait for it to be written. - let last_rx = std::mem::replace(&mut self.next_notify, rx); - - async move { - // Wait for the previous part to be written - let _ = last_rx.await; - - let mut writer = writer.lock().await; - let result = writer - .write(Buffer::from_iter(data)) - .await - .map_err(|err| format_object_store_error(err, location.as_ref())); - - // Notify the next part to be written - drop(tx); - - result - } - .boxed() - } - - async fn complete(&mut self) -> datafusion_object_store::Result { - let mut writer = self.writer.lock().await; - let metadata = writer - .close() - .await - .map_err(|err| format_object_store_error(err, self.location.as_ref()))?; - - let e_tag = metadata.etag().map(|s| s.to_string()); - let version = metadata.version().map(|s| s.to_string()); - - Ok(PutResult { e_tag, version }) - } - - async fn abort(&mut self) -> datafusion_object_store::Result<()> { - let mut writer = self.writer.lock().await; - writer - .abort() - .await - .map_err(|err| format_object_store_error(err, self.location.as_ref())) - } -} - -impl Debug for OpendalMultipartUpload { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - f.debug_struct("OpendalMultipartUpload") - .field("location", &self.location) - .finish() - } -} - -fn format_object_store_error(err: opendal::Error, path: &str) -> datafusion_object_store::Error { - match err.kind() { - opendal::ErrorKind::NotFound => datafusion_object_store::Error::NotFound { - path: path.to_string(), - source: Box::new(err), - }, - opendal::ErrorKind::Unsupported => datafusion_object_store::Error::NotSupported { - source: Box::new(err), - }, - opendal::ErrorKind::AlreadyExists => datafusion_object_store::Error::AlreadyExists { - path: path.to_string(), - source: Box::new(err), - }, - opendal::ErrorKind::ConditionNotMatch => datafusion_object_store::Error::Precondition { - path: path.to_string(), - source: Box::new(err), - }, - kind => datafusion_object_store::Error::Generic { - store: kind.into_static(), - source: Box::new(err), - }, - } -} - -fn format_object_meta(path: &str, meta: &opendal::Metadata) -> ObjectMeta { - ObjectMeta { - location: path.into(), - last_modified: meta - .last_modified() - .and_then(timestamp_to_datetime) - .unwrap_or_default(), - size: meta.content_length(), - e_tag: meta.etag().map(|x| x.to_string()), - version: meta.version().map(|x| x.to_string()), - } -} - -fn timestamp_to_datetime(ts: opendal::raw::Timestamp) -> Option> { - let ts = ts.into_inner(); - chrono::DateTime::::from_timestamp(ts.as_second(), ts.subsec_nanosecond() as u32) -} - -fn datetime_to_timestamp(dt: chrono::DateTime) -> Option { - opendal::raw::Timestamp::new(dt.timestamp(), dt.timestamp_subsec_nanos() as i32).ok() -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use bytes::Bytes; - use datafusion_object_store::path::Path; - use datafusion_object_store::{ - ObjectStore as ArrowObjectStore, ObjectStoreExt, WriteMultipart, - }; - use opendal::{Operator, services}; - use rand::{Rng, RngCore}; - - use super::*; - - async fn create_test_object_store() -> Arc { - let op = Operator::new(services::Memory::default()).unwrap().finish(); - let object_store = Arc::new(OpendalStore::new(op)); - - let path: Path = "data/test.txt".into(); - let bytes = Bytes::from_static(b"hello, world!"); - object_store.put(&path, bytes.into()).await.unwrap(); - - let path: Path = "data/nested/test.txt".into(); - let bytes = Bytes::from_static(b"hello, world! I am nested."); - object_store.put(&path, bytes.into()).await.unwrap(); - - object_store - } - - #[tokio::test] - async fn test_basic() { - let op = Operator::new(services::Memory::default()).unwrap().finish(); - let object_store: Arc = Arc::new(OpendalStore::new(op)); - - // Retrieve a specific file - let path: Path = "data/test.txt".into(); - - let bytes = Bytes::from_static(b"hello, world!"); - object_store.put(&path, bytes.clone().into()).await.unwrap(); - - let meta = object_store.head(&path).await.unwrap(); - - assert_eq!(meta.size, 13); - - assert_eq!( - object_store - .get(&path) - .await - .unwrap() - .bytes() - .await - .unwrap(), - bytes - ); - } - - #[tokio::test] - async fn test_put_multipart() { - let op = Operator::new(services::Memory::default()).unwrap().finish(); - let object_store: Arc = Arc::new(OpendalStore::new(op)); - - let mut rng = rand::rng(); - - // Case complete - let path: Path = "data/test_complete.txt".into(); - let upload = object_store.put_multipart(&path).await.unwrap(); - - let mut write = WriteMultipart::new(upload); - - let mut all_bytes = vec![]; - let round = rng.random_range(1..=1024); - for _ in 0..round { - let size = rng.random_range(1..=1024); - let mut bytes = vec![0; size]; - rng.fill_bytes(&mut bytes); - - all_bytes.extend_from_slice(&bytes); - write.put(bytes.into()); - } - - let _ = write.finish().await.unwrap(); - - let meta = object_store.head(&path).await.unwrap(); - - assert_eq!(meta.size, all_bytes.len() as u64); - - assert_eq!( - object_store - .get(&path) - .await - .unwrap() - .bytes() - .await - .unwrap(), - Bytes::from(all_bytes) - ); - - // Case abort - let path: Path = "data/test_abort.txt".into(); - let mut upload = object_store.put_multipart(&path).await.unwrap(); - upload.put_part(vec![1; 1024].into()).await.unwrap(); - upload.abort().await.unwrap(); - - let res = object_store.head(&path).await; - let err = res.unwrap_err(); - - assert!(matches!( - err, - datafusion_object_store::Error::NotFound { .. } - )) - } - - #[tokio::test] - async fn test_list() { - let object_store = create_test_object_store().await; - let path: Path = "data/".into(); - let results = object_store.list(Some(&path)).collect::>().await; - assert_eq!(results.len(), 2); - let mut locations = results - .iter() - .map(|x| x.as_ref().unwrap().location.as_ref()) - .collect::>(); - - let expected_files = vec![ - ( - "data/nested/test.txt", - Bytes::from_static(b"hello, world! I am nested."), - ), - ("data/test.txt", Bytes::from_static(b"hello, world!")), - ]; - - let expected_locations = expected_files.iter().map(|x| x.0).collect::>(); - - locations.sort(); - assert_eq!(locations, expected_locations); - - for (location, bytes) in expected_files { - let path: Path = location.into(); - assert_eq!( - object_store - .get(&path) - .await - .unwrap() - .bytes() - .await - .unwrap(), - bytes - ); - } - } - - #[tokio::test] - async fn test_list_with_delimiter() { - let object_store = create_test_object_store().await; - let path: Path = "data/".into(); - let result = object_store.list_with_delimiter(Some(&path)).await.unwrap(); - assert_eq!(result.objects.len(), 1); - assert_eq!(result.common_prefixes.len(), 1); - assert_eq!(result.objects[0].location.as_ref(), "data/test.txt"); - assert_eq!(result.common_prefixes[0].as_ref(), "data/nested"); - } - - #[tokio::test] - async fn test_list_with_offset() { - let object_store = create_test_object_store().await; - let path: Path = "data/".into(); - let offset: Path = "data/nested/test.txt".into(); - let result = object_store - .list_with_offset(Some(&path), &offset) - .collect::>() - .await; - assert_eq!(result.len(), 1); - assert_eq!( - result[0].as_ref().unwrap().location.as_ref(), - "data/test.txt" - ); - } - - mod stat_counter { - use std::sync::atomic::{AtomicUsize, Ordering}; - - use super::*; - - #[derive(Debug, Clone)] - pub struct StatCounterLayer { - count: Arc, - } - - impl StatCounterLayer { - pub fn new(count: Arc) -> Self { - Self { count } - } - } - - impl opendal::raw::Layer for StatCounterLayer { - type LayeredAccess = StatCounterAccessor; - - fn layer(&self, inner: A) -> Self::LayeredAccess { - StatCounterAccessor { - inner, - count: self.count.clone(), - } - } - } - - #[derive(Debug, Clone)] - pub struct StatCounterAccessor { - inner: A, - count: Arc, - } - - impl opendal::raw::LayeredAccess for StatCounterAccessor { - type Inner = A; - type Reader = A::Reader; - type Writer = A::Writer; - type Lister = A::Lister; - type Deleter = A::Deleter; - - fn inner(&self) -> &Self::Inner { - &self.inner - } - - async fn stat( - &self, - path: &str, - args: opendal::raw::OpStat, - ) -> opendal::Result { - self.count.fetch_add(1, Ordering::SeqCst); - self.inner.stat(path, args).await - } - - async fn read( - &self, - path: &str, - args: opendal::raw::OpRead, - ) -> opendal::Result<(opendal::raw::RpRead, Self::Reader)> { - self.inner.read(path, args).await - } - - async fn write( - &self, - path: &str, - args: opendal::raw::OpWrite, - ) -> opendal::Result<(opendal::raw::RpWrite, Self::Writer)> { - self.inner.write(path, args).await - } - - async fn delete(&self) -> opendal::Result<(opendal::raw::RpDelete, Self::Deleter)> { - self.inner.delete().await - } - - async fn list( - &self, - path: &str, - args: opendal::raw::OpList, - ) -> opendal::Result<(opendal::raw::RpList, Self::Lister)> { - self.inner.list(path, args).await - } - - async fn copy( - &self, - from: &str, - to: &str, - args: opendal::raw::OpCopy, - ) -> opendal::Result { - self.inner.copy(from, to, args).await - } - - async fn rename( - &self, - from: &str, - to: &str, - args: opendal::raw::OpRename, - ) -> opendal::Result { - self.inner.rename(from, to, args).await - } - } - } - - #[tokio::test] - async fn test_get_ranges_no_stat() { - use std::sync::atomic::{AtomicUsize, Ordering}; - - // Create a stat counter and operator with tracking layer - let stat_count = Arc::new(AtomicUsize::new(0)); - let op = Operator::new(opendal::services::Memory::default()) - .unwrap() - .layer(stat_counter::StatCounterLayer::new(stat_count.clone())) - .finish(); - let store = OpendalStore::new(op); - - // Create a test file - let location = "test_get_range.txt".into(); - let value = Bytes::from_static(b"Hello, world!"); - store.put(&location, value.clone().into()).await.unwrap(); - - // Reset counter after put - stat_count.store(0, Ordering::SeqCst); - - // Test 1: get_ranges should NOT call stat() - let range = 0..5; - let ret = store - .get_ranges(&location, std::slice::from_ref(&range)) - .await - .unwrap(); - assert_eq!(vec![Bytes::from_static(b"Hello")], ret); - assert_eq!( - stat_count.load(Ordering::SeqCst), - 0, - "get_ranges should not call stat()" - ); - - // Reset counter - stat_count.store(0, Ordering::SeqCst); - - // Test 2: get_opts SHOULD call stat() to get metadata - let opts = datafusion_object_store::GetOptions { - range: Some(datafusion_object_store::GetRange::Bounded(0..5)), - ..Default::default() - }; - let ret = store.get_opts(&location, opts).await.unwrap(); - let data = ret.bytes().await.unwrap(); - assert_eq!(Bytes::from_static(b"Hello"), data); - assert!( - stat_count.load(Ordering::SeqCst) > 0, - "get_opts should call stat() to get metadata" - ); - - // Cleanup - store.delete(&location).await.unwrap(); - } -} diff --git a/src/object-store/src/layers/mock.rs b/src/object-store/src/layers/mock.rs index 3df8aae535..f4d3df54d7 100644 --- a/src/object-store/src/layers/mock.rs +++ b/src/object-store/src/layers/mock.rs @@ -21,7 +21,7 @@ pub use opendal::raw::{ Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite, oio, }; -use opendal::raw::{OpCopy, RpCopy}; +use opendal::raw::{OpCopier, OpCopy, RpCopy}; pub use opendal::{Buffer, Error, ErrorKind, Metadata, Result}; pub type MockWriterFactory = Arc oio::Writer + Send + Sync>; @@ -146,6 +146,7 @@ impl LayeredAccess for MockAccessor { type Writer = MockWriter; type Lister = MockLister; type Deleter = MockDeleter; + type Copier = oio::Copier; fn inner(&self) -> &Self::Inner { &self.inner @@ -222,15 +223,24 @@ impl LayeredAccess for MockAccessor { } } - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result { - let Some(copy_interceptor) = self.copy_interceptor.as_ref() else { - return self.inner.copy(from, to, args).await; - }; + async fn copy( + &self, + from: &str, + to: &str, + args: OpCopy, + opts: OpCopier, + ) -> Result<(RpCopy, Self::Copier)> { + if let Some(result) = self + .copy_interceptor + .as_ref() + .and_then(|copy_interceptor| copy_interceptor(from, to, args.clone())) + { + return result.map(|rp_copy| (rp_copy, Box::new(()) as oio::Copier)); + } - let Some(result) = copy_interceptor(from, to, args.clone()) else { - return self.inner.copy(from, to, args).await; - }; - - result + self.inner + .copy(from, to, args, opts) + .await + .map(|(rp_copy, copier)| (rp_copy, Box::new(copier) as oio::Copier)) } } diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index 3a5f72c5ce..f1f8b59082 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -18,7 +18,6 @@ pub use opendal::{ FuturesAsyncWriter, Lister, Operator as ObjectStore, Reader, Result, Writer, services, }; -pub mod compat; pub mod config; pub mod error; pub mod factory;