mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-18 14:00:39 +00:00
chore: update opendal version
Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -8892,7 +8892,9 @@ name = "object-store"
|
||||
version = "1.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"common-base",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -8902,9 +8904,11 @@ dependencies = [
|
||||
"futures",
|
||||
"humantime-serde",
|
||||
"lazy_static",
|
||||
"object_store 0.12.5",
|
||||
"opendal",
|
||||
"prometheus 0.14.0",
|
||||
"reqwest 0.12.28",
|
||||
"reqwest 0.13.2",
|
||||
"serde",
|
||||
"snafu 0.8.6",
|
||||
"tokio",
|
||||
|
||||
@@ -486,7 +486,8 @@ impl SnapshotStorage for OpenDalStorage {
|
||||
|
||||
async fn delete_snapshot(&self) -> Result<()> {
|
||||
self.object_store
|
||||
.remove_all("/")
|
||||
.delete_with("/")
|
||||
.recursive(true)
|
||||
.await
|
||||
.context(StorageOperationSnafu {
|
||||
operation: "delete snapshot",
|
||||
|
||||
@@ -16,7 +16,7 @@ use async_trait::async_trait;
|
||||
use clap::{Parser, Subcommand};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::snapshot::MetadataSnapshotManager;
|
||||
use object_store::{ObjectStore, Scheme};
|
||||
use object_store::{ObjectStore, services};
|
||||
|
||||
use crate::Tool;
|
||||
use crate::common::{ObjectStoreConfig, StoreConfig, new_fs_object_store};
|
||||
@@ -276,7 +276,7 @@ fn build_object_store_and_resolve_file_path(
|
||||
None => new_fs_object_store(fs_root)?,
|
||||
};
|
||||
|
||||
let file_path = if matches!(object_store.info().scheme(), Scheme::Fs) {
|
||||
let file_path = if object_store.info().scheme() == services::FS_SCHEME {
|
||||
resolve_relative_path_with_current_dir(file_path).map_err(BoxedError::new)?
|
||||
} else {
|
||||
file_path.to_string()
|
||||
|
||||
@@ -42,7 +42,6 @@ use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use object_store::ObjectStore;
|
||||
use object_store_opendal::OpendalStore;
|
||||
use snafu::ResultExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio_util::compat::FuturesAsyncWriteCompatExt;
|
||||
@@ -317,7 +316,7 @@ pub async fn file_to_stream(
|
||||
.with_file_compression_type(df_compression)
|
||||
.build();
|
||||
|
||||
let store = Arc::new(OpendalStore::new(store.clone()));
|
||||
let store = Arc::new(object_store::compat::OpendalStore::new(store.clone()));
|
||||
let file_opener = config.file_source().create_file_opener(store, &config, 0)?;
|
||||
let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())?;
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ struct Test<'a> {
|
||||
|
||||
impl Test<'_> {
|
||||
async fn run(self, store: &ObjectStore) {
|
||||
let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone()));
|
||||
let store = Arc::new(object_store::compat::OpendalStore::new(store.clone()));
|
||||
let file_opener = self
|
||||
.file_source
|
||||
.create_file_opener(store, &self.config, 0)
|
||||
|
||||
@@ -103,7 +103,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
|
||||
test_util::TEST_BATCH_SIZE,
|
||||
schema.clone(),
|
||||
FileCompressionType::UNCOMPRESSED,
|
||||
Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
|
||||
Arc::new(object_store::compat::OpendalStore::new(store.clone())),
|
||||
true,
|
||||
);
|
||||
|
||||
@@ -157,7 +157,7 @@ pub async fn setup_stream_to_csv_test(
|
||||
|
||||
let csv_opener = csv_source
|
||||
.create_file_opener(
|
||||
Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
|
||||
Arc::new(object_store::compat::OpendalStore::new(store.clone())),
|
||||
&config,
|
||||
0,
|
||||
)
|
||||
|
||||
@@ -61,7 +61,7 @@ fn build_record_batch_stream(
|
||||
.with_file_group(FileGroup::new(files))
|
||||
.build();
|
||||
|
||||
let store = Arc::new(object_store_opendal::OpendalStore::new(
|
||||
let store = Arc::new(object_store::compat::OpendalStore::new(
|
||||
scan_plan_config.store.clone(),
|
||||
));
|
||||
|
||||
|
||||
@@ -499,7 +499,7 @@ impl AccessLayer {
|
||||
let file_size = if file_size == 0 { None } else { Some(file_size) };
|
||||
let last_modified_ms = metadata
|
||||
.last_modified()
|
||||
.map(|ts| Timestamp::new_millisecond(ts.timestamp_millis()));
|
||||
.map(|ts| Timestamp::new_millisecond(ts.into_inner().as_millisecond()));
|
||||
|
||||
let entry = StorageSstEntry {
|
||||
file_path: path.to_string(),
|
||||
|
||||
@@ -191,13 +191,15 @@ impl StagingStorage {
|
||||
pub(crate) async fn clear(&self) -> Result<()> {
|
||||
self.delta_storage
|
||||
.object_store()
|
||||
.remove_all(self.delta_storage.path())
|
||||
.delete_with(self.delta_storage.path())
|
||||
.recursive(true)
|
||||
.await
|
||||
.context(OpenDalSnafu)?;
|
||||
|
||||
self.blob_storage
|
||||
.object_store()
|
||||
.remove_all(self.blob_storage.path())
|
||||
.delete_with(self.blob_storage.path())
|
||||
.recursive(true)
|
||||
.await
|
||||
.context(OpenDalSnafu)?;
|
||||
|
||||
|
||||
@@ -1009,7 +1009,7 @@ async fn preload_parquet_meta_cache_for_files(
|
||||
return 0;
|
||||
}
|
||||
|
||||
let allow_direct_load = matches!(object_store.info().scheme(), object_store::Scheme::Fs);
|
||||
let allow_direct_load = object_store.info().scheme() == object_store::services::FS_SCHEME;
|
||||
|
||||
// Sort by time range so we can prefer preloading newer files first.
|
||||
files.sort_by_key(|b| std::cmp::Reverse(b.meta_ref().time_range.1));
|
||||
|
||||
@@ -72,9 +72,9 @@ impl fmt::Debug for LocalFilePurger {
|
||||
/// Whether to enable GC for the file purger.
|
||||
pub fn should_enable_gc(
|
||||
global_gc_enabled: bool,
|
||||
object_store_scheme: object_store::Scheme,
|
||||
object_store_scheme: &'static str,
|
||||
) -> bool {
|
||||
global_gc_enabled && object_store_scheme != object_store::Scheme::Fs
|
||||
global_gc_enabled && object_store_scheme != object_store::services::FS_SCHEME
|
||||
}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
@@ -82,7 +82,7 @@ pub fn should_enable_gc(
|
||||
/// so we need to enable GC for local file system.
|
||||
pub fn should_enable_gc(
|
||||
global_gc_enabled: bool,
|
||||
_object_store_scheme: object_store::Scheme,
|
||||
_object_store_scheme: &'static str,
|
||||
) -> bool {
|
||||
global_gc_enabled
|
||||
}
|
||||
|
||||
@@ -142,10 +142,11 @@ impl InstrumentedStore {
|
||||
Ok(list)
|
||||
}
|
||||
|
||||
/// Proxies to [`ObjectStore::remove_all`].
|
||||
/// Recursively deletes all objects under the given path.
|
||||
pub async fn remove_all(&self, path: &str) -> Result<()> {
|
||||
self.object_store
|
||||
.remove_all(path)
|
||||
.delete_with(path)
|
||||
.recursive(true)
|
||||
.await
|
||||
.context(OpenDalSnafu)
|
||||
}
|
||||
|
||||
@@ -290,7 +290,8 @@ pub(crate) async fn remove_region_dir_once(
|
||||
.context(OpenDalSnafu)?;
|
||||
// then remove the marker with this dir
|
||||
object_store
|
||||
.remove_all(region_path)
|
||||
.delete_with(region_path)
|
||||
.recursive(true)
|
||||
.await
|
||||
.context(OpenDalSnafu)?;
|
||||
Ok(true)
|
||||
|
||||
@@ -12,7 +12,9 @@ services-memory = ["opendal/services-memory"]
|
||||
testing = ["derive_builder"]
|
||||
|
||||
[dependencies]
|
||||
async-trait.workspace = true
|
||||
bytes.workspace = true
|
||||
chrono.workspace = true
|
||||
common-base.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
@@ -21,6 +23,7 @@ derive_builder = { workspace = true, optional = true }
|
||||
futures.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
lazy_static.workspace = true
|
||||
object_store_012 = { package = "object_store", version = "0.12.5" }
|
||||
opendal = { git = "https://github.com/apache/opendal.git", tag = "v0.56.0-rc.2", features = [
|
||||
"layers-tracing",
|
||||
"layers-prometheus",
|
||||
@@ -33,6 +36,10 @@ opendal = { git = "https://github.com/apache/opendal.git", tag = "v0.56.0-rc.2",
|
||||
] }
|
||||
prometheus.workspace = true
|
||||
reqwest.workspace = true
|
||||
reqwest_013 = { package = "reqwest", version = "0.13", default-features = false, features = [
|
||||
"rustls",
|
||||
"stream",
|
||||
] }
|
||||
serde.workspace = true
|
||||
snafu.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
495
src/object-store/src/compat.rs
Normal file
495
src/object-store/src/compat.rs
Normal file
@@ -0,0 +1,495 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{self, Debug, Display, Formatter};
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::stream::BoxStream;
|
||||
use futures::{FutureExt, StreamExt, TryStreamExt};
|
||||
use object_store_012::path::Path;
|
||||
use object_store_012::{
|
||||
Attribute, Attributes, GetOptions, GetRange, GetResult, GetResultPayload, ListResult,
|
||||
MultipartUpload, ObjectMeta, ObjectStore as ArrowObjectStore, PutMode,
|
||||
PutMultipartOptions, PutPayload, PutResult, UploadPart,
|
||||
};
|
||||
use opendal::options::{CopyOptions, WriteOptions};
|
||||
use opendal::raw::percent_decode_path;
|
||||
use opendal::{Buffer, Operator, Writer};
|
||||
use tokio::sync::{Mutex, oneshot};
|
||||
|
||||
const DEFAULT_CONCURRENT: usize = 8;
|
||||
|
||||
/// Adapter from Greptime's OpenDAL operator to `object_store` 0.12 used by DataFusion.
|
||||
#[derive(Clone)]
|
||||
pub struct OpendalStore {
|
||||
inner: Operator,
|
||||
}
|
||||
|
||||
impl OpendalStore {
|
||||
pub fn new(op: Operator) -> Self {
|
||||
Self { inner: op }
|
||||
}
|
||||
|
||||
async fn copy_request(&self, from: &Path, to: &Path, if_not_exists: bool) -> object_store_012::Result<()> {
|
||||
let options = CopyOptions { if_not_exists };
|
||||
|
||||
self.inner
|
||||
.copy_options(
|
||||
&percent_decode_path(from.as_ref()),
|
||||
&percent_decode_path(to.as_ref()),
|
||||
options,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
if if_not_exists && err.kind() == opendal::ErrorKind::AlreadyExists {
|
||||
object_store_012::Error::AlreadyExists {
|
||||
path: to.to_string(),
|
||||
source: Box::new(err),
|
||||
}
|
||||
} else {
|
||||
format_object_store_error(err, from.as_ref())
|
||||
}
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for OpendalStore {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
let info = self.inner.info();
|
||||
f.debug_struct("OpendalStore")
|
||||
.field("scheme", &info.scheme())
|
||||
.field("name", &info.name())
|
||||
.field("root", &info.root())
|
||||
.field("capability", &info.full_capability())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for OpendalStore {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
let info = self.inner.info();
|
||||
write!(
|
||||
f,
|
||||
"Opendal({}, bucket={}, root={})",
|
||||
info.scheme(),
|
||||
info.name(),
|
||||
info.root()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Operator> for OpendalStore {
|
||||
fn from(value: Operator) -> Self {
|
||||
Self::new(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ArrowObjectStore for OpendalStore {
|
||||
async fn put_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
payload: PutPayload,
|
||||
opts: object_store_012::PutOptions,
|
||||
) -> object_store_012::Result<PutResult> {
|
||||
let decoded_location = percent_decode_path(location.as_ref());
|
||||
let mut future_write = self
|
||||
.inner
|
||||
.write_with(&decoded_location, Buffer::from_iter(payload));
|
||||
let opts_mode = opts.mode.clone();
|
||||
|
||||
match opts.mode {
|
||||
PutMode::Overwrite => {}
|
||||
PutMode::Create => {
|
||||
future_write = future_write.if_not_exists(true);
|
||||
}
|
||||
PutMode::Update(update_version) => {
|
||||
let Some(etag) = update_version.e_tag else {
|
||||
return Err(object_store_012::Error::NotSupported {
|
||||
source: Box::new(opendal::Error::new(
|
||||
opendal::ErrorKind::Unsupported,
|
||||
"etag is required for conditional put",
|
||||
)),
|
||||
});
|
||||
};
|
||||
future_write = future_write.if_match(etag.as_str());
|
||||
}
|
||||
}
|
||||
|
||||
let rp = future_write.await.map_err(|err| match format_object_store_error(err, location.as_ref()) {
|
||||
object_store_012::Error::Precondition { path, source } if opts_mode == PutMode::Create => {
|
||||
object_store_012::Error::AlreadyExists { path, source }
|
||||
}
|
||||
err => err,
|
||||
})?;
|
||||
|
||||
Ok(PutResult {
|
||||
e_tag: rp.etag().map(|s| s.to_string()),
|
||||
version: rp.version().map(|s| s.to_string()),
|
||||
})
|
||||
}
|
||||
|
||||
async fn put_multipart_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
opts: PutMultipartOptions,
|
||||
) -> object_store_012::Result<Box<dyn MultipartUpload>> {
|
||||
let mut options = WriteOptions {
|
||||
concurrent: DEFAULT_CONCURRENT,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut user_metadata = HashMap::new();
|
||||
for (key, value) in opts.attributes.iter() {
|
||||
match key {
|
||||
Attribute::CacheControl => options.cache_control = Some(value.to_string()),
|
||||
Attribute::ContentDisposition => {
|
||||
options.content_disposition = Some(value.to_string())
|
||||
}
|
||||
Attribute::ContentEncoding => options.content_encoding = Some(value.to_string()),
|
||||
Attribute::ContentLanguage => {}
|
||||
Attribute::ContentType => options.content_type = Some(value.to_string()),
|
||||
Attribute::Metadata(key) => {
|
||||
user_metadata.insert(key.to_string(), value.to_string());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
if !user_metadata.is_empty() {
|
||||
options.user_metadata = Some(user_metadata);
|
||||
}
|
||||
|
||||
let decoded_location = percent_decode_path(location.as_ref());
|
||||
let writer = self
|
||||
.inner
|
||||
.writer_options(&decoded_location, options)
|
||||
.await
|
||||
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
|
||||
|
||||
Ok(Box::new(OpendalMultipartUpload::new(writer, location.clone())))
|
||||
}
|
||||
|
||||
async fn get_opts(
|
||||
&self,
|
||||
location: &Path,
|
||||
options: GetOptions,
|
||||
) -> object_store_012::Result<GetResult> {
|
||||
let raw_location = percent_decode_path(location.as_ref());
|
||||
let meta = {
|
||||
let mut stat = self.inner.stat_with(&raw_location);
|
||||
if let Some(version) = &options.version {
|
||||
stat = stat.version(version.as_str());
|
||||
}
|
||||
if let Some(if_match) = &options.if_match {
|
||||
stat = stat.if_match(if_match.as_str());
|
||||
}
|
||||
if let Some(if_none_match) = &options.if_none_match {
|
||||
stat = stat.if_none_match(if_none_match.as_str());
|
||||
}
|
||||
if let Some(if_modified_since) = options.if_modified_since.and_then(datetime_to_timestamp) {
|
||||
stat = stat.if_modified_since(if_modified_since);
|
||||
}
|
||||
if let Some(if_unmodified_since) =
|
||||
options.if_unmodified_since.and_then(datetime_to_timestamp)
|
||||
{
|
||||
stat = stat.if_unmodified_since(if_unmodified_since);
|
||||
}
|
||||
stat.await
|
||||
.map_err(|err| format_object_store_error(err, location.as_ref()))?
|
||||
};
|
||||
|
||||
let mut attributes = Attributes::new();
|
||||
if let Some(user_meta) = meta.user_metadata() {
|
||||
for (key, value) in user_meta {
|
||||
attributes.insert(Attribute::Metadata(key.clone().into()), value.clone().into());
|
||||
}
|
||||
}
|
||||
|
||||
let meta = ObjectMeta {
|
||||
location: location.clone(),
|
||||
last_modified: meta
|
||||
.last_modified()
|
||||
.and_then(timestamp_to_datetime)
|
||||
.unwrap_or_default(),
|
||||
size: meta.content_length(),
|
||||
e_tag: meta.etag().map(|x| x.to_string()),
|
||||
version: meta.version().map(|x| x.to_string()),
|
||||
};
|
||||
|
||||
if options.head {
|
||||
return Ok(GetResult {
|
||||
payload: GetResultPayload::Stream(Box::pin(futures::stream::empty())),
|
||||
meta,
|
||||
range: 0..0,
|
||||
attributes,
|
||||
});
|
||||
}
|
||||
|
||||
let reader = {
|
||||
let mut read = self.inner.reader_with(&raw_location);
|
||||
if let Some(version) = options.version {
|
||||
read = read.version(version.as_str());
|
||||
}
|
||||
if let Some(if_match) = options.if_match {
|
||||
read = read.if_match(if_match.as_str());
|
||||
}
|
||||
if let Some(if_none_match) = options.if_none_match {
|
||||
read = read.if_none_match(if_none_match.as_str());
|
||||
}
|
||||
if let Some(if_modified_since) = options.if_modified_since.and_then(datetime_to_timestamp) {
|
||||
read = read.if_modified_since(if_modified_since);
|
||||
}
|
||||
if let Some(if_unmodified_since) =
|
||||
options.if_unmodified_since.and_then(datetime_to_timestamp)
|
||||
{
|
||||
read = read.if_unmodified_since(if_unmodified_since);
|
||||
}
|
||||
read.await
|
||||
.map_err(|err| format_object_store_error(err, location.as_ref()))?
|
||||
};
|
||||
|
||||
let read_range = match options.range {
|
||||
Some(GetRange::Bounded(range)) => {
|
||||
if range.start >= range.end || range.start >= meta.size {
|
||||
0..0
|
||||
} else {
|
||||
let end = range.end.min(meta.size);
|
||||
range.start..end
|
||||
}
|
||||
}
|
||||
Some(GetRange::Offset(offset)) => {
|
||||
if offset < meta.size {
|
||||
offset..meta.size
|
||||
} else {
|
||||
0..0
|
||||
}
|
||||
}
|
||||
Some(GetRange::Suffix(length)) if length < meta.size => (meta.size - length)..meta.size,
|
||||
_ => 0..meta.size,
|
||||
};
|
||||
|
||||
let stream = reader
|
||||
.into_bytes_stream(read_range.clone())
|
||||
.await
|
||||
.map_err(|err| format_object_store_error(err, location.as_ref()))?
|
||||
.map_ok(|buf| buf)
|
||||
.map_err(|err: io::Error| object_store_012::Error::Generic {
|
||||
store: "IoError",
|
||||
source: Box::new(err),
|
||||
});
|
||||
|
||||
Ok(GetResult {
|
||||
payload: GetResultPayload::Stream(Box::pin(stream)),
|
||||
meta,
|
||||
range: read_range,
|
||||
attributes,
|
||||
})
|
||||
}
|
||||
|
||||
async fn delete(&self, location: &Path) -> object_store_012::Result<()> {
|
||||
self.inner
|
||||
.delete(&percent_decode_path(location.as_ref()))
|
||||
.await
|
||||
.map_err(|err| format_object_store_error(err, location.as_ref()))
|
||||
}
|
||||
|
||||
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store_012::Result<ObjectMeta>> {
|
||||
let path = prefix.map_or_else(String::new, |prefix| {
|
||||
format!("{}/", percent_decode_path(prefix.as_ref()))
|
||||
});
|
||||
|
||||
let this = self.clone();
|
||||
let fut = async move {
|
||||
let stream = this
|
||||
.inner
|
||||
.lister_with(&path)
|
||||
.recursive(true)
|
||||
.await
|
||||
.map_err(|err| format_object_store_error(err, &path))?;
|
||||
|
||||
Ok::<_, object_store_012::Error>(stream.then(|res| async {
|
||||
let entry = res.map_err(|err| format_object_store_error(err, ""))?;
|
||||
Ok(format_object_meta(entry.path(), entry.metadata()))
|
||||
}))
|
||||
};
|
||||
|
||||
fut.into_stream().try_flatten().boxed()
|
||||
}
|
||||
|
||||
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store_012::Result<ListResult> {
|
||||
let path = prefix.map_or_else(String::new, |prefix| {
|
||||
format!("{}/", percent_decode_path(prefix.as_ref()))
|
||||
});
|
||||
let mut stream = self
|
||||
.inner
|
||||
.lister_with(&path)
|
||||
.await
|
||||
.map_err(|err| format_object_store_error(err, &path))?;
|
||||
|
||||
let mut common_prefixes = Vec::new();
|
||||
let mut objects = Vec::new();
|
||||
|
||||
while let Some(res) = stream.next().await {
|
||||
let entry = res.map_err(|err| format_object_store_error(err, ""))?;
|
||||
let meta = entry.metadata();
|
||||
|
||||
if meta.is_dir() {
|
||||
common_prefixes.push(entry.path().into());
|
||||
} else if meta.last_modified().is_some() {
|
||||
objects.push(format_object_meta(entry.path(), meta));
|
||||
} else {
|
||||
let meta = self
|
||||
.inner
|
||||
.stat(entry.path())
|
||||
.await
|
||||
.map_err(|err| format_object_store_error(err, entry.path()))?;
|
||||
objects.push(format_object_meta(entry.path(), &meta));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ListResult {
|
||||
common_prefixes,
|
||||
objects,
|
||||
})
|
||||
}
|
||||
|
||||
async fn copy(&self, from: &Path, to: &Path) -> object_store_012::Result<()> {
|
||||
self.copy_request(from, to, false).await
|
||||
}
|
||||
|
||||
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store_012::Result<()> {
|
||||
self.copy_request(from, to, true).await
|
||||
}
|
||||
}
|
||||
|
||||
struct OpendalMultipartUpload {
|
||||
writer: Arc<Mutex<Writer>>,
|
||||
location: Path,
|
||||
next_notify: Option<oneshot::Receiver<()>>,
|
||||
}
|
||||
|
||||
impl OpendalMultipartUpload {
|
||||
fn new(writer: Writer, location: Path) -> Self {
|
||||
Self {
|
||||
writer: Arc::new(Mutex::new(writer)),
|
||||
location,
|
||||
next_notify: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl MultipartUpload for OpendalMultipartUpload {
|
||||
fn put_part(&mut self, data: PutPayload) -> UploadPart {
|
||||
let writer = self.writer.clone();
|
||||
let location = self.location.clone();
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let last_rx = self.next_notify.replace(rx);
|
||||
|
||||
async move {
|
||||
if let Some(last_rx) = last_rx {
|
||||
let _ = last_rx.await;
|
||||
}
|
||||
|
||||
let mut writer = writer.lock().await;
|
||||
let result = writer
|
||||
.write(Buffer::from_iter(data))
|
||||
.await
|
||||
.map_err(|err| format_object_store_error(err, location.as_ref()));
|
||||
|
||||
let _ = tx.send(());
|
||||
result
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
|
||||
async fn complete(&mut self) -> object_store_012::Result<PutResult> {
|
||||
let mut writer = self.writer.lock().await;
|
||||
let metadata = writer
|
||||
.close()
|
||||
.await
|
||||
.map_err(|err| format_object_store_error(err, self.location.as_ref()))?;
|
||||
|
||||
Ok(PutResult {
|
||||
e_tag: metadata.etag().map(|s| s.to_string()),
|
||||
version: metadata.version().map(|s| s.to_string()),
|
||||
})
|
||||
}
|
||||
|
||||
async fn abort(&mut self) -> object_store_012::Result<()> {
|
||||
let mut writer = self.writer.lock().await;
|
||||
writer
|
||||
.abort()
|
||||
.await
|
||||
.map_err(|err| format_object_store_error(err, self.location.as_ref()))
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for OpendalMultipartUpload {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("OpendalMultipartUpload")
|
||||
.field("location", &self.location)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
fn format_object_store_error(err: opendal::Error, path: &str) -> object_store_012::Error {
|
||||
match err.kind() {
|
||||
opendal::ErrorKind::NotFound => object_store_012::Error::NotFound {
|
||||
path: path.to_string(),
|
||||
source: Box::new(err),
|
||||
},
|
||||
opendal::ErrorKind::Unsupported => object_store_012::Error::NotSupported {
|
||||
source: Box::new(err),
|
||||
},
|
||||
opendal::ErrorKind::AlreadyExists => object_store_012::Error::AlreadyExists {
|
||||
path: path.to_string(),
|
||||
source: Box::new(err),
|
||||
},
|
||||
opendal::ErrorKind::ConditionNotMatch => object_store_012::Error::Precondition {
|
||||
path: path.to_string(),
|
||||
source: Box::new(err),
|
||||
},
|
||||
kind => object_store_012::Error::Generic {
|
||||
store: kind.into_static(),
|
||||
source: Box::new(err),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn format_object_meta(path: &str, meta: &opendal::Metadata) -> ObjectMeta {
|
||||
ObjectMeta {
|
||||
location: path.into(),
|
||||
last_modified: meta
|
||||
.last_modified()
|
||||
.and_then(timestamp_to_datetime)
|
||||
.unwrap_or_default(),
|
||||
size: meta.content_length(),
|
||||
e_tag: meta.etag().map(|x| x.to_string()),
|
||||
version: meta.version().map(|x| x.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn timestamp_to_datetime(ts: opendal::raw::Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
|
||||
let ts = ts.into_inner();
|
||||
chrono::DateTime::<chrono::Utc>::from_timestamp(ts.as_second(), ts.subsec_nanosecond() as u32)
|
||||
}
|
||||
|
||||
fn datetime_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Option<opendal::raw::Timestamp> {
|
||||
opendal::raw::Timestamp::new(dt.timestamp(), dt.timestamp_subsec_nanos() as i32).ok()
|
||||
}
|
||||
@@ -36,7 +36,7 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: reqwest::Error,
|
||||
error: reqwest_013::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create directory {}", dir))]
|
||||
|
||||
@@ -131,12 +131,12 @@ pub struct MockDeleter {
|
||||
}
|
||||
|
||||
impl oio::Delete for MockDeleter {
|
||||
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
|
||||
self.inner.delete(path, args)
|
||||
async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
|
||||
self.inner.delete(path, args).await
|
||||
}
|
||||
|
||||
async fn flush(&mut self) -> Result<usize> {
|
||||
self.inner.flush().await
|
||||
async fn close(&mut self) -> Result<()> {
|
||||
self.inner.close().await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,10 +15,11 @@
|
||||
pub use opendal::raw::{Access, HttpClient};
|
||||
pub use opendal::{
|
||||
Buffer, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, FuturesAsyncReader,
|
||||
FuturesAsyncWriter, Lister, Operator as ObjectStore, Reader, Result, Scheme, Writer, services,
|
||||
FuturesAsyncWriter, Lister, Operator as ObjectStore, Reader, Result, Writer, services,
|
||||
};
|
||||
|
||||
pub mod config;
|
||||
pub mod compat;
|
||||
pub mod error;
|
||||
pub mod factory;
|
||||
pub mod layers;
|
||||
|
||||
@@ -32,7 +32,7 @@ impl TempFolder {
|
||||
}
|
||||
|
||||
pub async fn remove_all(&self) -> Result<()> {
|
||||
self.store.remove_all(&self.path).await
|
||||
self.store.delete_with(&self.path).recursive(true).await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -131,7 +131,7 @@ pub fn normalize_path(path: &str) -> String {
|
||||
pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
|
||||
object_store
|
||||
.layer(LoggingLayer::new(DefaultLoggingInterceptor))
|
||||
.layer(TracingLayer)
|
||||
.layer(TracingLayer::new())
|
||||
.layer(crate::layers::build_prometheus_metrics_layer(path_label))
|
||||
}
|
||||
|
||||
@@ -213,7 +213,7 @@ pub(crate) fn build_http_client(config: &HttpClientConfig) -> error::Result<Http
|
||||
);
|
||||
}
|
||||
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
let client = reqwest_013::ClientBuilder::new()
|
||||
.pool_max_idle_per_host(config.pool_max_idle_per_host as usize)
|
||||
.connect_timeout(config.connect_timeout)
|
||||
.pool_idle_timeout(config.pool_idle_timeout)
|
||||
|
||||
@@ -19,6 +19,7 @@ fn main() {
|
||||
|
||||
#[cfg(feature = "dashboard")]
|
||||
fn fetch_dashboard_assets() {
|
||||
use std::path::PathBuf;
|
||||
use std::process::{Command, Stdio};
|
||||
|
||||
let message = "Failed to fetch dashboard assets";
|
||||
@@ -30,7 +31,16 @@ or it's a network error, just try again or enable/disable some proxy."#;
|
||||
let mut dir = std::env::current_dir().unwrap();
|
||||
dir.pop();
|
||||
dir.pop();
|
||||
dir.push("scripts");
|
||||
let scripts_dir = dir.join("scripts");
|
||||
let dashboard_dist = dir.join(PathBuf::from("src/servers/dashboard/dist"));
|
||||
|
||||
if dashboard_dist.join("index.html").exists() {
|
||||
println!("cargo:rerun-if-changed=dashboard/VERSION");
|
||||
println!("cargo:rerun-if-changed=dashboard/dist");
|
||||
return;
|
||||
}
|
||||
|
||||
dir = scripts_dir;
|
||||
|
||||
let out_dir = std::env::var("OUT_DIR").unwrap();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user