diff --git a/Cargo.lock b/Cargo.lock index 90062abaaf..f5de82b46e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8892,7 +8892,9 @@ name = "object-store" version = "1.0.0" dependencies = [ "anyhow", + "async-trait", "bytes", + "chrono", "common-base", "common-error", "common-macro", @@ -8902,9 +8904,11 @@ dependencies = [ "futures", "humantime-serde", "lazy_static", + "object_store 0.12.5", "opendal", "prometheus 0.14.0", "reqwest 0.12.28", + "reqwest 0.13.2", "serde", "snafu 0.8.6", "tokio", diff --git a/src/cli/src/data/snapshot_storage.rs b/src/cli/src/data/snapshot_storage.rs index 6bc71153df..be94b197df 100644 --- a/src/cli/src/data/snapshot_storage.rs +++ b/src/cli/src/data/snapshot_storage.rs @@ -486,7 +486,8 @@ impl SnapshotStorage for OpenDalStorage { async fn delete_snapshot(&self) -> Result<()> { self.object_store - .remove_all("/") + .delete_with("/") + .recursive(true) .await .context(StorageOperationSnafu { operation: "delete snapshot", diff --git a/src/cli/src/metadata/snapshot.rs b/src/cli/src/metadata/snapshot.rs index 648d3a687d..59b2599ad9 100644 --- a/src/cli/src/metadata/snapshot.rs +++ b/src/cli/src/metadata/snapshot.rs @@ -16,7 +16,7 @@ use async_trait::async_trait; use clap::{Parser, Subcommand}; use common_error::ext::BoxedError; use common_meta::snapshot::MetadataSnapshotManager; -use object_store::{ObjectStore, Scheme}; +use object_store::{ObjectStore, services}; use crate::Tool; use crate::common::{ObjectStoreConfig, StoreConfig, new_fs_object_store}; @@ -276,7 +276,7 @@ fn build_object_store_and_resolve_file_path( None => new_fs_object_store(fs_root)?, }; - let file_path = if matches!(object_store.info().scheme(), Scheme::Fs) { + let file_path = if object_store.info().scheme() == services::FS_SCHEME { resolve_relative_path_with_current_dir(file_path).map_err(BoxedError::new)? } else { file_path.to_string() diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index 7f4a7c65b4..a6a358c9e4 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -42,7 +42,6 @@ use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use futures::{StreamExt, TryStreamExt}; use object_store::ObjectStore; -use object_store_opendal::OpendalStore; use snafu::ResultExt; use tokio::io::AsyncWriteExt; use tokio_util::compat::FuturesAsyncWriteCompatExt; @@ -317,7 +316,7 @@ pub async fn file_to_stream( .with_file_compression_type(df_compression) .build(); - let store = Arc::new(OpendalStore::new(store.clone())); + let store = Arc::new(object_store::compat::OpendalStore::new(store.clone())); let file_opener = config.file_source().create_file_opener(store, &config, 0)?; let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())?; diff --git a/src/common/datasource/src/file_format/tests.rs b/src/common/datasource/src/file_format/tests.rs index 75d74b53cd..6ef669ab7b 100644 --- a/src/common/datasource/src/file_format/tests.rs +++ b/src/common/datasource/src/file_format/tests.rs @@ -44,7 +44,7 @@ struct Test<'a> { impl Test<'_> { async fn run(self, store: &ObjectStore) { - let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone())); + let store = Arc::new(object_store::compat::OpendalStore::new(store.clone())); let file_opener = self .file_source .create_file_opener(store, &self.config, 0) diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs index ea2b0c768c..0a13d9c6e8 100644 --- a/src/common/datasource/src/test_util.rs +++ b/src/common/datasource/src/test_util.rs @@ -103,7 +103,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi test_util::TEST_BATCH_SIZE, schema.clone(), FileCompressionType::UNCOMPRESSED, - Arc::new(object_store_opendal::OpendalStore::new(store.clone())), + Arc::new(object_store::compat::OpendalStore::new(store.clone())), true, ); @@ -157,7 +157,7 @@ pub async fn setup_stream_to_csv_test( let csv_opener = csv_source .create_file_opener( - Arc::new(object_store_opendal::OpendalStore::new(store.clone())), + Arc::new(object_store::compat::OpendalStore::new(store.clone())), &config, 0, ) diff --git a/src/file-engine/src/query/file_stream.rs b/src/file-engine/src/query/file_stream.rs index 73453589dc..931dfabe62 100644 --- a/src/file-engine/src/query/file_stream.rs +++ b/src/file-engine/src/query/file_stream.rs @@ -61,7 +61,7 @@ fn build_record_batch_stream( .with_file_group(FileGroup::new(files)) .build(); - let store = Arc::new(object_store_opendal::OpendalStore::new( + let store = Arc::new(object_store::compat::OpendalStore::new( scan_plan_config.store.clone(), )); diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 33180ebf46..b6adb6eb59 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -499,7 +499,7 @@ impl AccessLayer { let file_size = if file_size == 0 { None } else { Some(file_size) }; let last_modified_ms = metadata .last_modified() - .map(|ts| Timestamp::new_millisecond(ts.timestamp_millis())); + .map(|ts| Timestamp::new_millisecond(ts.into_inner().as_millisecond())); let entry = StorageSstEntry { file_path: path.to_string(), diff --git a/src/mito2/src/manifest/storage/staging.rs b/src/mito2/src/manifest/storage/staging.rs index 17d341605c..fb172d324f 100644 --- a/src/mito2/src/manifest/storage/staging.rs +++ b/src/mito2/src/manifest/storage/staging.rs @@ -191,13 +191,15 @@ impl StagingStorage { pub(crate) async fn clear(&self) -> Result<()> { self.delta_storage .object_store() - .remove_all(self.delta_storage.path()) + .delete_with(self.delta_storage.path()) + .recursive(true) .await .context(OpenDalSnafu)?; self.blob_storage .object_store() - .remove_all(self.blob_storage.path()) + .delete_with(self.blob_storage.path()) + .recursive(true) .await .context(OpenDalSnafu)?; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 6e485e42ac..dca26185ca 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -1009,7 +1009,7 @@ async fn preload_parquet_meta_cache_for_files( return 0; } - let allow_direct_load = matches!(object_store.info().scheme(), object_store::Scheme::Fs); + let allow_direct_load = object_store.info().scheme() == object_store::services::FS_SCHEME; // Sort by time range so we can prefer preloading newer files first. files.sort_by_key(|b| std::cmp::Reverse(b.meta_ref().time_range.1)); diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index af1c81b491..812c69c2f5 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -72,9 +72,9 @@ impl fmt::Debug for LocalFilePurger { /// Whether to enable GC for the file purger. pub fn should_enable_gc( global_gc_enabled: bool, - object_store_scheme: object_store::Scheme, + object_store_scheme: &'static str, ) -> bool { - global_gc_enabled && object_store_scheme != object_store::Scheme::Fs + global_gc_enabled && object_store_scheme != object_store::services::FS_SCHEME } #[cfg(debug_assertions)] @@ -82,7 +82,7 @@ pub fn should_enable_gc( /// so we need to enable GC for local file system. pub fn should_enable_gc( global_gc_enabled: bool, - _object_store_scheme: object_store::Scheme, + _object_store_scheme: &'static str, ) -> bool { global_gc_enabled } diff --git a/src/mito2/src/sst/index/store.rs b/src/mito2/src/sst/index/store.rs index 1662c6d876..62a5981298 100644 --- a/src/mito2/src/sst/index/store.rs +++ b/src/mito2/src/sst/index/store.rs @@ -142,10 +142,11 @@ impl InstrumentedStore { Ok(list) } - /// Proxies to [`ObjectStore::remove_all`]. + /// Recursively deletes all objects under the given path. pub async fn remove_all(&self, path: &str) -> Result<()> { self.object_store - .remove_all(path) + .delete_with(path) + .recursive(true) .await .context(OpenDalSnafu) } diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 5d7149768c..00a96b0d16 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -290,7 +290,8 @@ pub(crate) async fn remove_region_dir_once( .context(OpenDalSnafu)?; // then remove the marker with this dir object_store - .remove_all(region_path) + .delete_with(region_path) + .recursive(true) .await .context(OpenDalSnafu)?; Ok(true) diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index cc5507675f..46e220c5fe 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -12,7 +12,9 @@ services-memory = ["opendal/services-memory"] testing = ["derive_builder"] [dependencies] +async-trait.workspace = true bytes.workspace = true +chrono.workspace = true common-base.workspace = true common-error.workspace = true common-macro.workspace = true @@ -21,6 +23,7 @@ derive_builder = { workspace = true, optional = true } futures.workspace = true humantime-serde.workspace = true lazy_static.workspace = true +object_store_012 = { package = "object_store", version = "0.12.5" } opendal = { git = "https://github.com/apache/opendal.git", tag = "v0.56.0-rc.2", features = [ "layers-tracing", "layers-prometheus", @@ -33,6 +36,10 @@ opendal = { git = "https://github.com/apache/opendal.git", tag = "v0.56.0-rc.2", ] } prometheus.workspace = true reqwest.workspace = true +reqwest_013 = { package = "reqwest", version = "0.13", default-features = false, features = [ + "rustls", + "stream", +] } serde.workspace = true snafu.workspace = true tokio.workspace = true diff --git a/src/object-store/src/compat.rs b/src/object-store/src/compat.rs new file mode 100644 index 0000000000..36c86143d8 --- /dev/null +++ b/src/object-store/src/compat.rs @@ -0,0 +1,495 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fmt::{self, Debug, Display, Formatter}; +use std::io; +use std::sync::Arc; + +use futures::stream::BoxStream; +use futures::{FutureExt, StreamExt, TryStreamExt}; +use object_store_012::path::Path; +use object_store_012::{ + Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, + MultipartUpload, ObjectMeta, ObjectStore as ArrowObjectStore, PutMode, + PutMultipartOptions, PutPayload, PutResult, UploadPart, +}; +use opendal::options::{CopyOptions, WriteOptions}; +use opendal::raw::percent_decode_path; +use opendal::{Buffer, Operator, Writer}; +use tokio::sync::{Mutex, oneshot}; + +const DEFAULT_CONCURRENT: usize = 8; + +/// Adapter from Greptime's OpenDAL operator to `object_store` 0.12 used by DataFusion. +#[derive(Clone)] +pub struct OpendalStore { + inner: Operator, +} + +impl OpendalStore { + pub fn new(op: Operator) -> Self { + Self { inner: op } + } + + async fn copy_request(&self, from: &Path, to: &Path, if_not_exists: bool) -> object_store_012::Result<()> { + let options = CopyOptions { if_not_exists }; + + self.inner + .copy_options( + &percent_decode_path(from.as_ref()), + &percent_decode_path(to.as_ref()), + options, + ) + .await + .map_err(|err| { + if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists { + object_store_012::Error::AlreadyExists { + path: to.to_string(), + source: Box::new(err), + } + } else { + format_object_store_error(err, from.as_ref()) + } + })?; + + Ok(()) + } +} + +impl Debug for OpendalStore { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let info = self.inner.info(); + f.debug_struct("OpendalStore") + .field("scheme", &info.scheme()) + .field("name", &info.name()) + .field("root", &info.root()) + .field("capability", &info.full_capability()) + .finish() + } +} + +impl Display for OpendalStore { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let info = self.inner.info(); + write!( + f, + "Opendal({}, bucket={}, root={})", + info.scheme(), + info.name(), + info.root() + ) + } +} + +impl From for OpendalStore { + fn from(value: Operator) -> Self { + Self::new(value) + } +} + +#[async_trait::async_trait] +impl ArrowObjectStore for OpendalStore { + async fn put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: object_store_012::PutOptions, + ) -> object_store_012::Result { + let decoded_location = percent_decode_path(location.as_ref()); + let mut future_write = self + .inner + .write_with(&decoded_location, Buffer::from_iter(payload)); + let opts_mode = opts.mode.clone(); + + match opts.mode { + PutMode::Overwrite => {} + PutMode::Create => { + future_write = future_write.if_not_exists(true); + } + PutMode::Update(update_version) => { + let Some(etag) = update_version.e_tag else { + return Err(object_store_012::Error::NotSupported { + source: Box::new(opendal::Error::new( + opendal::ErrorKind::Unsupported, + "etag is required for conditional put", + )), + }); + }; + future_write = future_write.if_match(etag.as_str()); + } + } + + let rp = future_write.await.map_err(|err| match format_object_store_error(err, location.as_ref()) { + object_store_012::Error::Precondition { path, source } if opts_mode == PutMode::Create => { + object_store_012::Error::AlreadyExists { path, source } + } + err => err, + })?; + + Ok(PutResult { + e_tag: rp.etag().map(|s| s.to_string()), + version: rp.version().map(|s| s.to_string()), + }) + } + + async fn put_multipart_opts( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> object_store_012::Result> { + let mut options = WriteOptions { + concurrent: DEFAULT_CONCURRENT, + ..Default::default() + }; + + let mut user_metadata = HashMap::new(); + for (key, value) in opts.attributes.iter() { + match key { + Attribute::CacheControl => options.cache_control = Some(value.to_string()), + Attribute::ContentDisposition => { + options.content_disposition = Some(value.to_string()) + } + Attribute::ContentEncoding => options.content_encoding = Some(value.to_string()), + Attribute::ContentLanguage => {} + Attribute::ContentType => options.content_type = Some(value.to_string()), + Attribute::Metadata(key) => { + user_metadata.insert(key.to_string(), value.to_string()); + } + _ => {} + } + } + if !user_metadata.is_empty() { + options.user_metadata = Some(user_metadata); + } + + let decoded_location = percent_decode_path(location.as_ref()); + let writer = self + .inner + .writer_options(&decoded_location, options) + .await + .map_err(|err| format_object_store_error(err, location.as_ref()))?; + + Ok(Box::new(OpendalMultipartUpload::new(writer, location.clone()))) + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store_012::Result { + let raw_location = percent_decode_path(location.as_ref()); + let meta = { + let mut stat = self.inner.stat_with(&raw_location); + if let Some(version) = &options.version { + stat = stat.version(version.as_str()); + } + if let Some(if_match) = &options.if_match { + stat = stat.if_match(if_match.as_str()); + } + if let Some(if_none_match) = &options.if_none_match { + stat = stat.if_none_match(if_none_match.as_str()); + } + if let Some(if_modified_since) = options.if_modified_since.and_then(datetime_to_timestamp) { + stat = stat.if_modified_since(if_modified_since); + } + if let Some(if_unmodified_since) = + options.if_unmodified_since.and_then(datetime_to_timestamp) + { + stat = stat.if_unmodified_since(if_unmodified_since); + } + stat.await + .map_err(|err| format_object_store_error(err, location.as_ref()))? + }; + + let mut attributes = Attributes::new(); + if let Some(user_meta) = meta.user_metadata() { + for (key, value) in user_meta { + attributes.insert(Attribute::Metadata(key.clone().into()), value.clone().into()); + } + } + + let meta = ObjectMeta { + location: location.clone(), + last_modified: meta + .last_modified() + .and_then(timestamp_to_datetime) + .unwrap_or_default(), + size: meta.content_length(), + e_tag: meta.etag().map(|x| x.to_string()), + version: meta.version().map(|x| x.to_string()), + }; + + if options.head { + return Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())), + meta, + range: 0..0, + attributes, + }); + } + + let reader = { + let mut read = self.inner.reader_with(&raw_location); + if let Some(version) = options.version { + read = read.version(version.as_str()); + } + if let Some(if_match) = options.if_match { + read = read.if_match(if_match.as_str()); + } + if let Some(if_none_match) = options.if_none_match { + read = read.if_none_match(if_none_match.as_str()); + } + if let Some(if_modified_since) = options.if_modified_since.and_then(datetime_to_timestamp) { + read = read.if_modified_since(if_modified_since); + } + if let Some(if_unmodified_since) = + options.if_unmodified_since.and_then(datetime_to_timestamp) + { + read = read.if_unmodified_since(if_unmodified_since); + } + read.await + .map_err(|err| format_object_store_error(err, location.as_ref()))? + }; + + let read_range = match options.range { + Some(GetRange::Bounded(range)) => { + if range.start >= range.end || range.start >= meta.size { + 0..0 + } else { + let end = range.end.min(meta.size); + range.start..end + } + } + Some(GetRange::Offset(offset)) => { + if offset < meta.size { + offset..meta.size + } else { + 0..0 + } + } + Some(GetRange::Suffix(length)) if length < meta.size => (meta.size - length)..meta.size, + _ => 0..meta.size, + }; + + let stream = reader + .into_bytes_stream(read_range.clone()) + .await + .map_err(|err| format_object_store_error(err, location.as_ref()))? + .map_ok(|buf| buf) + .map_err(|err: io::Error| object_store_012::Error::Generic { + store: "IoError", + source: Box::new(err), + }); + + Ok(GetResult { + payload: GetResultPayload::Stream(Box::pin(stream)), + meta, + range: read_range, + attributes, + }) + } + + async fn delete(&self, location: &Path) -> object_store_012::Result<()> { + self.inner + .delete(&percent_decode_path(location.as_ref())) + .await + .map_err(|err| format_object_store_error(err, location.as_ref())) + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store_012::Result> { + let path = prefix.map_or_else(String::new, |prefix| { + format!("{}/", percent_decode_path(prefix.as_ref())) + }); + + let this = self.clone(); + let fut = async move { + let stream = this + .inner + .lister_with(&path) + .recursive(true) + .await + .map_err(|err| format_object_store_error(err, &path))?; + + Ok::<_, object_store_012::Error>(stream.then(|res| async { + let entry = res.map_err(|err| format_object_store_error(err, ""))?; + Ok(format_object_meta(entry.path(), entry.metadata())) + })) + }; + + fut.into_stream().try_flatten().boxed() + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store_012::Result { + let path = prefix.map_or_else(String::new, |prefix| { + format!("{}/", percent_decode_path(prefix.as_ref())) + }); + let mut stream = self + .inner + .lister_with(&path) + .await + .map_err(|err| format_object_store_error(err, &path))?; + + let mut common_prefixes = Vec::new(); + let mut objects = Vec::new(); + + while let Some(res) = stream.next().await { + let entry = res.map_err(|err| format_object_store_error(err, ""))?; + let meta = entry.metadata(); + + if meta.is_dir() { + common_prefixes.push(entry.path().into()); + } else if meta.last_modified().is_some() { + objects.push(format_object_meta(entry.path(), meta)); + } else { + let meta = self + .inner + .stat(entry.path()) + .await + .map_err(|err| format_object_store_error(err, entry.path()))?; + objects.push(format_object_meta(entry.path(), &meta)); + } + } + + Ok(ListResult { + common_prefixes, + objects, + }) + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store_012::Result<()> { + self.copy_request(from, to, false).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store_012::Result<()> { + self.copy_request(from, to, true).await + } +} + +struct OpendalMultipartUpload { + writer: Arc>, + location: Path, + next_notify: Option>, +} + +impl OpendalMultipartUpload { + fn new(writer: Writer, location: Path) -> Self { + Self { + writer: Arc::new(Mutex::new(writer)), + location, + next_notify: None, + } + } +} + +#[async_trait::async_trait] +impl MultipartUpload for OpendalMultipartUpload { + fn put_part(&mut self, data: PutPayload) -> UploadPart { + let writer = self.writer.clone(); + let location = self.location.clone(); + let (tx, rx) = oneshot::channel(); + let last_rx = self.next_notify.replace(rx); + + async move { + if let Some(last_rx) = last_rx { + let _ = last_rx.await; + } + + let mut writer = writer.lock().await; + let result = writer + .write(Buffer::from_iter(data)) + .await + .map_err(|err| format_object_store_error(err, location.as_ref())); + + let _ = tx.send(()); + result + } + .boxed() + } + + async fn complete(&mut self) -> object_store_012::Result { + let mut writer = self.writer.lock().await; + let metadata = writer + .close() + .await + .map_err(|err| format_object_store_error(err, self.location.as_ref()))?; + + Ok(PutResult { + e_tag: metadata.etag().map(|s| s.to_string()), + version: metadata.version().map(|s| s.to_string()), + }) + } + + async fn abort(&mut self) -> object_store_012::Result<()> { + let mut writer = self.writer.lock().await; + writer + .abort() + .await + .map_err(|err| format_object_store_error(err, self.location.as_ref())) + } +} + +impl Debug for OpendalMultipartUpload { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("OpendalMultipartUpload") + .field("location", &self.location) + .finish() + } +} + +fn format_object_store_error(err: opendal::Error, path: &str) -> object_store_012::Error { + match err.kind() { + opendal::ErrorKind::NotFound => object_store_012::Error::NotFound { + path: path.to_string(), + source: Box::new(err), + }, + opendal::ErrorKind::Unsupported => object_store_012::Error::NotSupported { + source: Box::new(err), + }, + opendal::ErrorKind::AlreadyExists => object_store_012::Error::AlreadyExists { + path: path.to_string(), + source: Box::new(err), + }, + opendal::ErrorKind::ConditionNotMatch => object_store_012::Error::Precondition { + path: path.to_string(), + source: Box::new(err), + }, + kind => object_store_012::Error::Generic { + store: kind.into_static(), + source: Box::new(err), + }, + } +} + +fn format_object_meta(path: &str, meta: &opendal::Metadata) -> ObjectMeta { + ObjectMeta { + location: path.into(), + last_modified: meta + .last_modified() + .and_then(timestamp_to_datetime) + .unwrap_or_default(), + size: meta.content_length(), + e_tag: meta.etag().map(|x| x.to_string()), + version: meta.version().map(|x| x.to_string()), + } +} + +fn timestamp_to_datetime(ts: opendal::raw::Timestamp) -> Option> { + let ts = ts.into_inner(); + chrono::DateTime::::from_timestamp(ts.as_second(), ts.subsec_nanosecond() as u32) +} + +fn datetime_to_timestamp(dt: chrono::DateTime) -> Option { + opendal::raw::Timestamp::new(dt.timestamp(), dt.timestamp_subsec_nanos() as i32).ok() +} diff --git a/src/object-store/src/error.rs b/src/object-store/src/error.rs index cb259ef5c8..8bc8f7f895 100644 --- a/src/object-store/src/error.rs +++ b/src/object-store/src/error.rs @@ -36,7 +36,7 @@ pub enum Error { #[snafu(implicit)] location: Location, #[snafu(source)] - error: reqwest::Error, + error: reqwest_013::Error, }, #[snafu(display("Failed to create directory {}", dir))] diff --git a/src/object-store/src/layers/mock.rs b/src/object-store/src/layers/mock.rs index e55af3bfe0..3df8aae535 100644 --- a/src/object-store/src/layers/mock.rs +++ b/src/object-store/src/layers/mock.rs @@ -131,12 +131,12 @@ pub struct MockDeleter { } impl oio::Delete for MockDeleter { - fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { - self.inner.delete(path, args) + async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { + self.inner.delete(path, args).await } - async fn flush(&mut self) -> Result { - self.inner.flush().await + async fn close(&mut self) -> Result<()> { + self.inner.close().await } } diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index e7ee6afcdb..686c2ae481 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -15,10 +15,11 @@ pub use opendal::raw::{Access, HttpClient}; pub use opendal::{ Buffer, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, FuturesAsyncReader, - FuturesAsyncWriter, Lister, Operator as ObjectStore, Reader, Result, Scheme, Writer, services, + FuturesAsyncWriter, Lister, Operator as ObjectStore, Reader, Result, Writer, services, }; pub mod config; +pub mod compat; pub mod error; pub mod factory; pub mod layers; diff --git a/src/object-store/src/test_util.rs b/src/object-store/src/test_util.rs index 3279db7b7c..af68978b0e 100644 --- a/src/object-store/src/test_util.rs +++ b/src/object-store/src/test_util.rs @@ -32,7 +32,7 @@ impl TempFolder { } pub async fn remove_all(&self) -> Result<()> { - self.store.remove_all(&self.path).await + self.store.delete_with(&self.path).recursive(true).await } } diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index a402b8237c..ede835d49e 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -131,7 +131,7 @@ pub fn normalize_path(path: &str) -> String { pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore { object_store .layer(LoggingLayer::new(DefaultLoggingInterceptor)) - .layer(TracingLayer) + .layer(TracingLayer::new()) .layer(crate::layers::build_prometheus_metrics_layer(path_label)) } @@ -213,7 +213,7 @@ pub(crate) fn build_http_client(config: &HttpClientConfig) -> error::Result