mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-05 14:50:44 +00:00
chore: update opendal to 0.57 (#8204)
* chore: update opendal to 0.57 Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: remove duplicate compat to use opendal's Signed-off-by: shuiyisong <xixing.sys@gmail.com> --------- Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
@@ -33,6 +33,7 @@ datatypes.workspace = true
|
||||
futures.workspace = true
|
||||
lazy_static.workspace = true
|
||||
object-store.workspace = true
|
||||
object_store_opendal.workspace = true
|
||||
orc-rust = { version = "0.8", default-features = false, features = ["async"] }
|
||||
parquet.workspace = true
|
||||
paste.workspace = true
|
||||
|
||||
@@ -316,7 +316,7 @@ pub async fn file_to_stream(
|
||||
.with_file_compression_type(df_compression)
|
||||
.build();
|
||||
|
||||
let store = Arc::new(object_store::compat::OpendalStore::new(store.clone()));
|
||||
let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone()));
|
||||
let file_opener = config.file_source().create_file_opener(store, &config, 0)?;
|
||||
let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())?;
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ struct Test<'a> {
|
||||
|
||||
impl Test<'_> {
|
||||
async fn run(self, store: &ObjectStore) {
|
||||
let store = Arc::new(object_store::compat::OpendalStore::new(store.clone()));
|
||||
let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone()));
|
||||
let file_opener = self
|
||||
.file_source
|
||||
.create_file_opener(store, &self.config, 0)
|
||||
|
||||
@@ -27,12 +27,14 @@ const ACCESS_KEY_ID: &str = "access_key_id";
|
||||
const ACCESS_KEY_SECRET: &str = "access_key_secret";
|
||||
const ROOT: &str = "root";
|
||||
const ALLOW_ANONYMOUS: &str = "allow_anonymous";
|
||||
const SKIP_SIGNATURE: &str = "skip_signature";
|
||||
|
||||
/// Check if the key is supported in OSS configuration.
|
||||
pub fn is_supported_in_oss(key: &str) -> bool {
|
||||
[
|
||||
ROOT,
|
||||
ALLOW_ANONYMOUS,
|
||||
SKIP_SIGNATURE,
|
||||
BUCKET,
|
||||
ENDPOINT,
|
||||
ACCESS_KEY_ID,
|
||||
@@ -61,18 +63,23 @@ pub fn build_oss_backend(
|
||||
builder = builder.access_key_secret(access_key_secret);
|
||||
}
|
||||
|
||||
if let Some(allow_anonymous) = connection.get(ALLOW_ANONYMOUS) {
|
||||
let allow = allow_anonymous.as_str().parse::<bool>().map_err(|e| {
|
||||
if let Some((key, value)) = connection
|
||||
.get(SKIP_SIGNATURE)
|
||||
.map(|value| (SKIP_SIGNATURE, value))
|
||||
.or_else(|| {
|
||||
connection
|
||||
.get(ALLOW_ANONYMOUS)
|
||||
.map(|value| (ALLOW_ANONYMOUS, value))
|
||||
})
|
||||
{
|
||||
let skip_signature = value.as_str().parse::<bool>().map_err(|e| {
|
||||
error::InvalidConnectionSnafu {
|
||||
msg: format!(
|
||||
"failed to parse the option {}={}, {}",
|
||||
ALLOW_ANONYMOUS, allow_anonymous, e
|
||||
),
|
||||
msg: format!("failed to parse the option {}={}, {}", key, value, e),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
if allow {
|
||||
builder = builder.allow_anonymous();
|
||||
if skip_signature {
|
||||
builder = builder.skip_signature();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,6 +100,7 @@ mod tests {
|
||||
fn test_is_supported_in_oss() {
|
||||
assert!(is_supported_in_oss(ROOT));
|
||||
assert!(is_supported_in_oss(ALLOW_ANONYMOUS));
|
||||
assert!(is_supported_in_oss(SKIP_SIGNATURE));
|
||||
assert!(is_supported_in_oss(BUCKET));
|
||||
assert!(is_supported_in_oss(ENDPOINT));
|
||||
assert!(is_supported_in_oss(ACCESS_KEY_ID));
|
||||
|
||||
@@ -103,7 +103,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
|
||||
test_util::TEST_BATCH_SIZE,
|
||||
schema.clone(),
|
||||
FileCompressionType::UNCOMPRESSED,
|
||||
Arc::new(object_store::compat::OpendalStore::new(store.clone())),
|
||||
Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
|
||||
true,
|
||||
);
|
||||
|
||||
@@ -157,7 +157,7 @@ pub async fn setup_stream_to_csv_test(
|
||||
|
||||
let csv_opener = csv_source
|
||||
.create_file_opener(
|
||||
Arc::new(object_store::compat::OpendalStore::new(store.clone())),
|
||||
Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
|
||||
&config,
|
||||
0,
|
||||
)
|
||||
|
||||
@@ -29,6 +29,7 @@ datafusion-expr.workspace = true
|
||||
datatypes.workspace = true
|
||||
futures.workspace = true
|
||||
object-store.workspace = true
|
||||
object_store_opendal.workspace = true
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json.workspace = true
|
||||
snafu.workspace = true
|
||||
|
||||
@@ -61,7 +61,7 @@ fn build_record_batch_stream(
|
||||
.with_file_group(FileGroup::new(files))
|
||||
.build();
|
||||
|
||||
let store = Arc::new(object_store::compat::OpendalStore::new(
|
||||
let store = Arc::new(object_store_opendal::OpendalStore::new(
|
||||
scan_plan_config.store.clone(),
|
||||
));
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ derive_builder = { workspace = true, optional = true }
|
||||
futures.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
lazy_static.workspace = true
|
||||
opendal = { git = "https://github.com/apache/opendal.git", rev = "4ad2d85296ffa6fdc2882f97d3c760ee243913f7", features = [
|
||||
opendal = { version = "0.57", features = [
|
||||
"layers-tracing",
|
||||
"layers-prometheus",
|
||||
"services-azblob",
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -21,7 +21,7 @@ pub use opendal::raw::{
|
||||
Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
|
||||
RpWrite, oio,
|
||||
};
|
||||
use opendal::raw::{OpCopy, RpCopy};
|
||||
use opendal::raw::{OpCopier, OpCopy, RpCopy};
|
||||
pub use opendal::{Buffer, Error, ErrorKind, Metadata, Result};
|
||||
|
||||
pub type MockWriterFactory = Arc<dyn Fn(&str, OpWrite, oio::Writer) -> oio::Writer + Send + Sync>;
|
||||
@@ -146,6 +146,7 @@ impl<A: Access> LayeredAccess for MockAccessor<A> {
|
||||
type Writer = MockWriter;
|
||||
type Lister = MockLister;
|
||||
type Deleter = MockDeleter;
|
||||
type Copier = oio::Copier;
|
||||
|
||||
fn inner(&self) -> &Self::Inner {
|
||||
&self.inner
|
||||
@@ -222,15 +223,24 @@ impl<A: Access> LayeredAccess for MockAccessor<A> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
|
||||
let Some(copy_interceptor) = self.copy_interceptor.as_ref() else {
|
||||
return self.inner.copy(from, to, args).await;
|
||||
};
|
||||
async fn copy(
|
||||
&self,
|
||||
from: &str,
|
||||
to: &str,
|
||||
args: OpCopy,
|
||||
opts: OpCopier,
|
||||
) -> Result<(RpCopy, Self::Copier)> {
|
||||
if let Some(result) = self
|
||||
.copy_interceptor
|
||||
.as_ref()
|
||||
.and_then(|copy_interceptor| copy_interceptor(from, to, args.clone()))
|
||||
{
|
||||
return result.map(|rp_copy| (rp_copy, Box::new(()) as oio::Copier));
|
||||
}
|
||||
|
||||
let Some(result) = copy_interceptor(from, to, args.clone()) else {
|
||||
return self.inner.copy(from, to, args).await;
|
||||
};
|
||||
|
||||
result
|
||||
self.inner
|
||||
.copy(from, to, args, opts)
|
||||
.await
|
||||
.map(|(rp_copy, copier)| (rp_copy, Box::new(copier) as oio::Copier))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ pub use opendal::{
|
||||
FuturesAsyncWriter, Lister, Operator as ObjectStore, Reader, Result, Writer, services,
|
||||
};
|
||||
|
||||
pub mod compat;
|
||||
pub mod config;
|
||||
pub mod error;
|
||||
pub mod factory;
|
||||
|
||||
Reference in New Issue
Block a user