chore: update the opendal to 0.56 rc2 (#8003)

* chore: update opendal version

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update opendal version

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: fix test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: grpc init

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: dep versions

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: remove aws-lc-rs in reqwest

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: rebase main and fix compile

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: remove unused deps

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* Revert "fix: remove aws-lc-rs in reqwest"

This reverts commit 90bfafca06.

* chore: remove aws-lc-sys from blacklist

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: fix sqlness

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add tls deps

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: idemptent install in rds

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: use aws-lc-sys as possible

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: lint

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: address comments

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: address CR issue

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: sync opendal compat adapter with upstream

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: address compat clippy warnings

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
Co-authored-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
shuiyisong
2026-04-26 17:59:48 +08:00
committed by shuiyisong
parent 8d2f92c01a
commit 65f1a41fbb
48 changed files with 2123 additions and 266 deletions

View File

@@ -1,3 +1,2 @@
native-tls
openssl
aws-lc-sys

965
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -110,6 +110,7 @@ arrow-schema = { version = "57.3", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
# Remember to update axum-extra, axum-macros when updating axum
arrow_object_store = { package = "object_store", version = "0.13.2" }
axum = "0.8"
axum-extra = "0.10"
axum-macros = "0.5"
@@ -141,6 +142,7 @@ datafusion-physical-expr = "=52.1"
datafusion-physical-plan = "=52.1"
datafusion-sql = "=52.1"
datafusion-substrait = "=52.1"
datafusion_object_store = { package = "object_store", version = "0.12.5" }
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
@@ -174,7 +176,7 @@ nalgebra = "0.33"
nix = { version = "0.30.1", default-features = false, features = ["event", "fs", "process"] }
notify = "8.0"
num_cpus = "1.16"
object_store_opendal = "0.54"
object_store_opendal = { git = "https://github.com/apache/opendal.git", tag = "v0.56.0-rc.2" }
once_cell = "1.18"
opentelemetry-proto = { version = "0.31", features = [
"gen-tonic",
@@ -201,14 +203,17 @@ rand = "0.9"
ratelimit = "0.10"
regex = "1.12"
regex-automata = "0.4"
reqwest = { version = "0.12", default-features = false, features = [
reqwest = { version = "0.13", default-features = false, features = [
"form",
"json",
"rustls-tls-native-roots",
"query",
"rustls",
"stream",
"multipart",
] }
url = "2.3"
# Branch: feat/request-timeout
hostname = "0.4.0"
rskafka = { git = "https://github.com/GreptimeTeam/rskafka.git", rev = "f5688f83e7da591cda3f2674c2408b4c0ed4ed50", features = [
"transport-tls",
] }
@@ -216,8 +221,6 @@ rstest = "0.25"
rstest_reuse = "0.7"
rust_decimal = "1.33"
rustc-hash = "2.0"
# It is worth noting that we should try to avoid using aws-lc-rs until it can be compiled on various platforms.
hostname = "0.4.0"
rustls = { version = "0.23.25", default-features = false }
sea-query = "0.32"
serde = { version = "1.0", features = ["derive"] }
@@ -232,7 +235,8 @@ sqlx = { version = "0.8", default-features = false, features = [
"any",
"macros",
"json",
"runtime-tokio-rustls",
"runtime-tokio",
"tls-rustls-aws-lc-rs",
"rust_decimal",
] }
strum = { version = "0.27", features = ["derive"] }
@@ -244,7 +248,7 @@ tokio-rustls = { version = "0.26.2", default-features = false }
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.14", features = ["tls-ring", "gzip", "zstd"] }
tonic = { version = "0.14", features = ["tls-aws-lc", "gzip", "zstd"] }
tower = "0.5"
tower-http = "0.6"
tracing = "0.1"

View File

@@ -486,7 +486,8 @@ impl SnapshotStorage for OpenDalStorage {
async fn delete_snapshot(&self) -> Result<()> {
self.object_store
.remove_all("/")
.delete_with("/")
.recursive(true)
.await
.context(StorageOperationSnafu {
operation: "delete snapshot",

View File

@@ -16,7 +16,7 @@ use async_trait::async_trait;
use clap::{Parser, Subcommand};
use common_error::ext::BoxedError;
use common_meta::snapshot::MetadataSnapshotManager;
use object_store::{ObjectStore, Scheme};
use object_store::{ObjectStore, services};
use crate::Tool;
use crate::common::{ObjectStoreConfig, StoreConfig, new_fs_object_store};
@@ -276,7 +276,7 @@ fn build_object_store_and_resolve_file_path(
None => new_fs_object_store(fs_root)?,
};
let file_path = if matches!(object_store.info().scheme(), Scheme::Fs) {
let file_path = if object_store.info().scheme() == services::FS_SCHEME {
resolve_relative_path_with_current_dir(file_path).map_err(BoxedError::new)?
} else {
file_path.to_string()

View File

@@ -22,7 +22,7 @@ use cmd::options::GlobalOptions;
use cmd::{App, cli, datanode, flownode, frontend, metasrv, standalone};
use common_base::Plugins;
use common_version::{product_name, verbose_version, version};
use servers::install_ring_crypto_provider;
use servers::install_default_crypto_provider;
#[derive(Parser)]
#[command(name = product_name(), author, version, long_version = verbose_version(), about)]
@@ -98,7 +98,7 @@ async fn main() -> Result<()> {
async fn main_body() -> Result<()> {
setup_human_panic();
install_ring_crypto_provider().map_err(|msg| InitTlsProviderSnafu { msg }.build())?;
install_default_crypto_provider().map_err(|msg| InitTlsProviderSnafu { msg }.build())?;
start(Command::parse()).await
}

View File

@@ -32,7 +32,6 @@ datatypes.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store.workspace = true
object_store_opendal.workspace = true
orc-rust = { version = "0.7", default-features = false, features = ["async"] }
parquet.workspace = true
paste.workspace = true

View File

@@ -42,7 +42,6 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::{StreamExt, TryStreamExt};
use object_store::ObjectStore;
use object_store_opendal::OpendalStore;
use snafu::ResultExt;
use tokio::io::AsyncWriteExt;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
@@ -317,7 +316,7 @@ pub async fn file_to_stream(
.with_file_compression_type(df_compression)
.build();
let store = Arc::new(OpendalStore::new(store.clone()));
let store = Arc::new(object_store::compat::OpendalStore::new(store.clone()));
let file_opener = config.file_source().create_file_opener(store, &config, 0)?;
let stream = FileStream::new(&config, 0, file_opener, &ExecutionPlanMetricsSet::new())?;

View File

@@ -44,7 +44,7 @@ struct Test<'a> {
impl Test<'_> {
async fn run(self, store: &ObjectStore) {
let store = Arc::new(object_store_opendal::OpendalStore::new(store.clone()));
let store = Arc::new(object_store::compat::OpendalStore::new(store.clone()));
let file_opener = self
.file_source
.create_file_opener(store, &self.config, 0)

View File

@@ -103,7 +103,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
test_util::TEST_BATCH_SIZE,
schema.clone(),
FileCompressionType::UNCOMPRESSED,
Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
Arc::new(object_store::compat::OpendalStore::new(store.clone())),
true,
);
@@ -157,7 +157,7 @@ pub async fn setup_stream_to_csv_test(
let csv_opener = csv_source
.create_file_opener(
Arc::new(object_store_opendal::OpendalStore::new(store.clone())),
Arc::new(object_store::compat::OpendalStore::new(store.clone())),
&config,
0,
)

View File

@@ -15,7 +15,7 @@ pg_kvbackend = [
"dep:rustls-native-certs",
"dep:rustls",
]
mysql_kvbackend = ["dep:sqlx"]
mysql_kvbackend = ["dep:sqlx", "dep:rustls"]
enterprise = ["prost-types"]
[lints]
@@ -67,7 +67,12 @@ prost-types = { workspace = true, optional = true }
rand.workspace = true
regex.workspace = true
rskafka.workspace = true
rustls = { workspace = true, default-features = false, features = ["ring", "logging", "std", "tls12"], optional = true }
rustls = { workspace = true, default-features = false, features = [
"aws_lc_rs",
"logging",
"std",
"tls12",
], optional = true }
rustls-native-certs = { version = "0.7", optional = true }
rustls-pemfile = { version = "2.0", optional = true }
serde.workspace = true

View File

@@ -15,12 +15,14 @@
use std::any::Any;
use std::collections::HashMap;
use std::marker::PhantomData;
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use std::sync::OnceLock;
use std::time::Duration;
use backon::{BackoffBuilder, ExponentialBuilder};
use common_telemetry::debug;
use common_telemetry::{debug, info};
use crate::error::{Error, RdsTransactionRetryFailedSnafu, Result};
use crate::error::{Error, RdsTransactionRetryFailedSnafu, Result, UnexpectedSnafu};
use crate::kv_backend::txn::{
Compare, Txn as KvTxn, TxnOp, TxnOpResponse, TxnResponse as KvTxnResponse,
};
@@ -51,6 +53,38 @@ pub use mysql::MySqlStore;
const RDS_STORE_TXN_RETRY_COUNT: usize = 3;
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
static RUSTLS_CRYPTO_PROVIDER_INIT: OnceLock<std::result::Result<(), String>> = OnceLock::new();
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
pub(crate) fn ensure_rustls_crypto_provider_installed() -> Result<()> {
RUSTLS_CRYPTO_PROVIDER_INIT
.get_or_init(|| {
if rustls::crypto::CryptoProvider::get_default().is_some() {
return Ok(());
}
match rustls::crypto::CryptoProvider::install_default(
rustls::crypto::aws_lc_rs::default_provider(),
) {
Ok(()) => Ok(()),
Err(_provider) if rustls::crypto::CryptoProvider::get_default().is_some() => {
Ok(())
}
Err(provider) => Err(format!(
"Failed to install rustls CryptoProvider, existing default: {:?}, attempted provider: {:?}",
rustls::crypto::CryptoProvider::get_default(),
provider
)),
}
})
.clone()
.map_err(|err_msg| {
info!("Failed to install rustls crypto provider: {err_msg}");
UnexpectedSnafu { err_msg }.build()
})
}
/// Query executor for rds. It can execute queries or generate a transaction executor.
#[async_trait::async_trait]
pub trait Executor: Send + Sync {

View File

@@ -32,6 +32,7 @@ use crate::kv_backend::rds::{
Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RDS_STORE_OP_BATCH_DELETE,
RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT, RDS_STORE_OP_RANGE_DELETE,
RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT, RdsStore, Transaction,
ensure_rustls_crypto_provider_installed,
};
use crate::rpc::KeyValue;
use crate::rpc::store::{
@@ -620,6 +621,7 @@ impl MySqlStore {
/// Create [MySqlStore] impl of [KvBackendRef] from url.
pub async fn with_url(url: &str, table_name: &str, max_txn_ops: usize) -> Result<KvBackendRef> {
ensure_rustls_crypto_provider_installed()?;
let pool = MySqlPool::connect(url)
.await
.context(CreateMySqlPoolSnafu)?;
@@ -687,6 +689,7 @@ mod tests {
if endpoints.is_empty() {
return None;
}
ensure_rustls_crypto_provider_installed().unwrap();
Some(MySqlPool::connect(&endpoints).await.unwrap())
}
@@ -984,6 +987,7 @@ mod tests {
async fn test_mysql_with_tls() {
common_telemetry::init_default_ut_logging();
maybe_skip_mysql_integration_test!();
ensure_rustls_crypto_provider_installed().unwrap();
let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
let opts = endpoint
@@ -998,6 +1002,7 @@ mod tests {
async fn test_mysql_with_mtls() {
common_telemetry::init_default_ut_logging();
maybe_skip_mysql_integration_test!();
ensure_rustls_crypto_provider_installed().unwrap();
let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
let certs_dir = test_certs_dir();
@@ -1015,6 +1020,7 @@ mod tests {
async fn test_mysql_with_tls_verify_ca() {
common_telemetry::init_default_ut_logging();
maybe_skip_mysql_integration_test!();
ensure_rustls_crypto_provider_installed().unwrap();
let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
let certs_dir = test_certs_dir();
@@ -1033,6 +1039,7 @@ mod tests {
async fn test_mysql_with_tls_verify_ident() {
common_telemetry::init_default_ut_logging();
maybe_skip_mysql_integration_test!();
ensure_rustls_crypto_provider_installed().unwrap();
let endpoint = std::env::var("GT_MYSQL_ENDPOINTS").unwrap();
let certs_dir = test_certs_dir();

View File

@@ -41,6 +41,7 @@ use crate::kv_backend::rds::{
Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RDS_STORE_OP_BATCH_DELETE,
RDS_STORE_OP_BATCH_GET, RDS_STORE_OP_BATCH_PUT, RDS_STORE_OP_RANGE_DELETE,
RDS_STORE_OP_RANGE_QUERY, RDS_STORE_TXN_RETRY_COUNT, RdsStore, Transaction,
ensure_rustls_crypto_provider_installed,
};
use crate::rpc::KeyValue;
use crate::rpc::store::{
@@ -422,6 +423,7 @@ pub fn create_postgres_tls_connector(tls_config: &TlsOption) -> Result<MakeRustl
"Creating PostgreSQL TLS connector with mode: {:?}",
tls_config.mode
);
ensure_rustls_crypto_provider_installed()?;
let config_builder = match tls_config.mode {
TlsMode::Disable => {
@@ -516,7 +518,7 @@ impl ServerCertVerifier for AcceptAnyVerifier {
fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
// Support all signature schemes
rustls::crypto::ring::default_provider()
rustls::crypto::aws_lc_rs::default_provider()
.signature_verification_algorithms
.supported_schemes()
}
@@ -544,7 +546,7 @@ impl ServerCertVerifier for NoHostnameVerification {
&self.roots,
intermediates,
now,
rustls::crypto::ring::default_provider()
rustls::crypto::aws_lc_rs::default_provider()
.signature_verification_algorithms
.all,
)?;
@@ -562,7 +564,7 @@ impl ServerCertVerifier for NoHostnameVerification {
message,
cert,
dss,
&rustls::crypto::ring::default_provider().signature_verification_algorithms,
&rustls::crypto::aws_lc_rs::default_provider().signature_verification_algorithms,
)
}
@@ -576,13 +578,13 @@ impl ServerCertVerifier for NoHostnameVerification {
message,
cert,
dss,
&rustls::crypto::ring::default_provider().signature_verification_algorithms,
&rustls::crypto::aws_lc_rs::default_provider().signature_verification_algorithms,
)
}
fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
// Support all signature schemes
rustls::crypto::ring::default_provider()
rustls::crypto::aws_lc_rs::default_provider()
.signature_verification_algorithms
.supported_schemes()
}

View File

@@ -19,7 +19,7 @@ futures-util.workspace = true
humantime-serde.workspace = true
num_cpus.workspace = true
rskafka.workspace = true
rustls = { workspace = true, default-features = false, features = ["ring", "logging", "std", "tls12"] }
rustls = { workspace = true, default-features = false, features = ["aws_lc_rs", "logging", "std", "tls12"] }
rustls-native-certs = "0.7"
rustls-pemfile = "2.1"
serde.workspace = true

View File

@@ -30,7 +30,6 @@ datafusion-orc.workspace = true
datatypes.workspace = true
futures.workspace = true
object-store.workspace = true
object_store_opendal.workspace = true
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true
snafu.workspace = true

View File

@@ -61,7 +61,7 @@ fn build_record_batch_stream(
.with_file_group(FileGroup::new(files))
.build();
let store = Arc::new(object_store_opendal::OpendalStore::new(
let store = Arc::new(object_store::compat::OpendalStore::new(
scan_plan_config.store.clone(),
));

View File

@@ -151,6 +151,8 @@ ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22,
.map(|mut e| {
let i = e.file_path.rfind('/').unwrap();
e.file_path.replace_range(i..(i + 37), "/<file_id>");
e.file_size = None;
e.last_modified_ms = None;
format!("\n{:?}", e)
})
.sorted()

View File

@@ -499,7 +499,7 @@ impl AccessLayer {
let file_size = if file_size == 0 { None } else { Some(file_size) };
let last_modified_ms = metadata
.last_modified()
.map(|ts| Timestamp::new_millisecond(ts.timestamp_millis()));
.map(|ts| Timestamp::new_millisecond(ts.into_inner().as_millisecond()));
let entry = StorageSstEntry {
file_path: path.to_string(),

View File

@@ -970,6 +970,8 @@ async fn test_list_ssts_with_format(
.map(|mut e| {
let i = e.file_path.rfind('/').unwrap();
e.file_path.replace_range(i..(i + 37), "/<file_id>");
e.file_size = None;
e.last_modified_ms = None;
format!("\n{:?}", e)
})
.sorted()

View File

@@ -26,8 +26,8 @@ use common_recordbatch::RecordBatches;
use datatypes::value::Value;
use object_store::Buffer;
use object_store::layers::mock::{
Entry, Error as MockError, ErrorKind, List, Lister, Metadata, MockLayerBuilder,
Result as MockResult, Write, Writer,
Delete, Deleter, Entry, Error as MockError, ErrorKind, List, Lister, Metadata,
MockLayerBuilder, OpDelete, Result as MockResult, Write, Writer,
};
use partition::expr::{PartitionExpr, col};
use store_api::region_engine::{
@@ -1152,6 +1152,23 @@ impl Write for MockWriter {
}
}
struct MockDeleter {
inner: Deleter,
}
impl Delete for MockDeleter {
async fn delete(&mut self, path: &str, args: OpDelete) -> MockResult<()> {
if path.contains("staging") {
return Err(MockError::new(ErrorKind::Unexpected, "mock error"));
}
self.inner.delete(path, args).await
}
async fn close(&mut self) -> MockResult<()> {
self.inner.close().await
}
}
async fn test_enter_staging_error(env: &mut TestEnv, flat_format: bool) {
let partition_expr = default_partition_expr();
let engine = env
@@ -1201,6 +1218,7 @@ async fn test_enter_staging_clean_staging_manifest_error_with_format(flat_format
inner: lister,
})
}))
.deleter_factory(Arc::new(|deleter| Box::new(MockDeleter { inner: deleter })))
.build()
.unwrap();
let mut env = TestEnv::new().await.with_mock_layer(mock_layer);

View File

@@ -191,13 +191,15 @@ impl StagingStorage {
pub(crate) async fn clear(&self) -> Result<()> {
self.delta_storage
.object_store()
.remove_all(self.delta_storage.path())
.delete_with(self.delta_storage.path())
.recursive(true)
.await
.context(OpenDalSnafu)?;
self.blob_storage
.object_store()
.remove_all(self.blob_storage.path())
.delete_with(self.blob_storage.path())
.recursive(true)
.await
.context(OpenDalSnafu)?;

View File

@@ -495,23 +495,23 @@ async fn test_checkpoint_bypass_in_staging_mode() {
assert_eq!(last_version, 16);
}
/// A deleter that fails on `flush`, simulating the S3 batch-delete failure
/// A deleter that fails on `close`, simulating the S3 batch-delete failure
/// described in issue #7986.
struct FailingDeleter {
inner: oio::Deleter,
flush_calls: Arc<AtomicUsize>,
close_calls: Arc<AtomicUsize>,
}
impl oio::Delete for FailingDeleter {
fn delete(&mut self, path: &str, args: OpDelete) -> MockResult<()> {
self.inner.delete(path, args)
async fn delete(&mut self, path: &str, args: OpDelete) -> MockResult<()> {
self.inner.delete(path, args).await
}
async fn flush(&mut self) -> MockResult<usize> {
self.flush_calls.fetch_add(1, Ordering::Relaxed);
async fn close(&mut self) -> MockResult<()> {
self.close_calls.fetch_add(1, Ordering::Relaxed);
Err(MockError::new(
ErrorKind::Unexpected,
"mock manifest delete flush failure",
"mock manifest delete close failure",
))
}
}
@@ -520,13 +520,13 @@ impl oio::Delete for FailingDeleter {
async fn checkpoint_advances_and_recovery_works_when_delete_fails() {
common_telemetry::init_default_ut_logging();
let flush_calls = Arc::new(AtomicUsize::new(0));
let factory_flush_calls = flush_calls.clone();
let close_calls = Arc::new(AtomicUsize::new(0));
let factory_close_calls = close_calls.clone();
let mock_layer = MockLayerBuilder::default()
.deleter_factory(Arc::new(move |inner| {
Box::new(FailingDeleter {
inner,
flush_calls: factory_flush_calls.clone(),
close_calls: factory_close_calls.clone(),
})
}))
.build()
@@ -548,7 +548,7 @@ async fn checkpoint_advances_and_recovery_works_when_delete_fails() {
}
// The checkpointer must have attempted to delete stale files at least once.
assert!(flush_calls.load(Ordering::Relaxed) > 0);
assert!(close_calls.load(Ordering::Relaxed) > 0);
// Despite delete failures, the in-memory checkpoint marker advances so
// subsequent `maybe_do_checkpoint` calls compute correct ranges.

View File

@@ -1007,7 +1007,7 @@ async fn preload_parquet_meta_cache_for_files(
return 0;
}
let allow_direct_load = matches!(object_store.info().scheme(), object_store::Scheme::Fs);
let allow_direct_load = object_store.info().scheme() == object_store::services::FS_SCHEME;
// Sort by time range so we can prefer preloading newer files first.
files.sort_by_key(|b| std::cmp::Reverse(b.meta_ref().time_range.1));

View File

@@ -70,20 +70,14 @@ impl fmt::Debug for LocalFilePurger {
#[cfg(not(debug_assertions))]
/// Whether to enable GC for the file purger.
pub fn should_enable_gc(
global_gc_enabled: bool,
object_store_scheme: object_store::Scheme,
) -> bool {
global_gc_enabled && object_store_scheme != object_store::Scheme::Fs
pub fn should_enable_gc(global_gc_enabled: bool, object_store_scheme: &'static str) -> bool {
global_gc_enabled && object_store_scheme != object_store::services::FS_SCHEME
}
#[cfg(debug_assertions)]
/// For debug build, we may use Fs as the object store scheme,
/// so we need to enable GC for local file system.
pub fn should_enable_gc(
global_gc_enabled: bool,
_object_store_scheme: object_store::Scheme,
) -> bool {
pub fn should_enable_gc(global_gc_enabled: bool, _object_store_scheme: &'static str) -> bool {
global_gc_enabled
}

View File

@@ -142,10 +142,11 @@ impl InstrumentedStore {
Ok(list)
}
/// Proxies to [`ObjectStore::remove_all`].
/// Recursively deletes all objects under the given path.
pub async fn remove_all(&self, path: &str) -> Result<()> {
self.object_store
.remove_all(path)
.delete_with(path)
.recursive(true)
.await
.context(OpenDalSnafu)
}

View File

@@ -290,7 +290,8 @@ pub(crate) async fn remove_region_dir_once(
.context(OpenDalSnafu)?;
// then remove the marker with this dir
object_store
.remove_all(region_path)
.delete_with(region_path)
.recursive(true)
.await
.context(OpenDalSnafu)?;
Ok(true)

View File

@@ -12,16 +12,19 @@ services-memory = ["opendal/services-memory"]
testing = ["derive_builder"]
[dependencies]
async-trait.workspace = true
bytes.workspace = true
chrono.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
datafusion_object_store.workspace = true
derive_builder = { workspace = true, optional = true }
futures.workspace = true
humantime-serde.workspace = true
lazy_static.workspace = true
opendal = { version = "0.54", features = [
opendal = { git = "https://github.com/apache/opendal.git", tag = "v0.56.0-rc.2", features = [
"layers-tracing",
"layers-prometheus",
"services-azblob",
@@ -40,6 +43,10 @@ uuid.workspace = true
[dev-dependencies]
anyhow = "1.0"
arrow_object_store.workspace = true
common-telemetry.workspace = true
common-test-util.workspace = true
object_store_opendal.workspace = true
rand.workspace = true
tempfile.workspace = true
tokio.workspace = true

File diff suppressed because it is too large Load Diff

View File

@@ -131,12 +131,12 @@ pub struct MockDeleter {
}
impl oio::Delete for MockDeleter {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args)
async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args).await
}
async fn flush(&mut self) -> Result<usize> {
self.inner.flush().await
async fn close(&mut self) -> Result<()> {
self.inner.close().await
}
}

View File

@@ -15,9 +15,10 @@
pub use opendal::raw::{Access, HttpClient};
pub use opendal::{
Buffer, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, FuturesAsyncReader,
FuturesAsyncWriter, Lister, Operator as ObjectStore, Reader, Result, Scheme, Writer, services,
FuturesAsyncWriter, Lister, Operator as ObjectStore, Reader, Result, Writer, services,
};
pub mod compat;
pub mod config;
pub mod error;
pub mod factory;

View File

@@ -32,7 +32,7 @@ impl TempFolder {
}
pub async fn remove_all(&self) -> Result<()> {
self.store.remove_all(&self.path).await
self.store.delete_with(&self.path).recursive(true).await
}
}

View File

@@ -131,7 +131,7 @@ pub fn normalize_path(path: &str) -> String {
pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
object_store
.layer(LoggingLayer::new(DefaultLoggingInterceptor))
.layer(TracingLayer)
.layer(TracingLayer::new())
.layer(crate::layers::build_prometheus_metrics_layer(path_label))
}

View File

@@ -13,17 +13,22 @@
// limitations under the License.
use std::env;
use std::sync::Arc;
use anyhow::Result;
use arrow_object_store::path::Path;
use arrow_object_store::{ObjectStore as ArrowObjectStore, ObjectStoreExt};
use bytes::Bytes;
use common_telemetry::info;
use common_test_util::temp_dir::create_temp_dir;
use futures::TryStreamExt;
use object_store::ObjectStore;
use object_store::services::{Fs, S3};
use object_store::test_util::TempFolder;
use object_store_opendal::OpendalStore;
use opendal::EntryMode;
use opendal::services::{Azblob, Gcs, Oss};
use opendal::services::{Azblob, Gcs, Memory, Oss};
use prometheus::{Encoder, TextEncoder};
use tempfile::TempDir;
async fn test_object_crud(store: &ObjectStore) -> Result<()> {
// Create object handler.
@@ -220,10 +225,39 @@ fn assert_opendal_metrics() {
);
}
fn create_temp_dir(prefix: &str) -> Result<TempDir> {
Ok(tempfile::Builder::new().prefix(prefix).tempdir()?)
}
#[tokio::test]
async fn test_opendal_memory_smoke() -> Result<()> {
let op = opendal::Operator::new(Memory::default())?.finish();
let store: OpendalStore = OpendalStore::new(op);
assert_eq!("memory", store.info().scheme());
assert!(format!("{store}").contains("memory"));
let store: Arc<dyn ArrowObjectStore> = Arc::new(store);
let location = Path::from("smoke/test.txt");
store
.put(&location, Bytes::from_static(b"hello, memory").into())
.await?;
let content = store.get(&location).await?.bytes().await?;
assert_eq!(content, Bytes::from_static(b"hello, memory"));
let listed = store
.list(Some(&Path::from("smoke")))
.try_collect::<Vec<_>>()
.await?;
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].location, location);
Ok(())
}
#[tokio::test]
async fn test_fs_backend() -> Result<()> {
let data_dir = create_temp_dir("test_fs_backend");
let tmp_dir = create_temp_dir("test_fs_backend");
let data_dir = create_temp_dir("test_fs_backend")?;
let tmp_dir = create_temp_dir("test_fs_backend")?;
let builder = Fs::default()
.root(&data_dir.path().to_string_lossy())
.atomic_write_dir(&tmp_dir.path().to_string_lossy());

View File

@@ -55,7 +55,6 @@ meter-core.workspace = true
meter-macros.workspace = true
moka = { workspace = true, features = ["future"] }
object-store.workspace = true
object_store_opendal.workspace = true
partition.workspace = true
prometheus.workspace = true
prost.workspace = true

View File

@@ -94,7 +94,7 @@ parking_lot.workspace = true
partition.workspace = true
pg_interval = { version = "0.5.2", package = "pg_interval_2" }
pgwire = { version = "0.38.3", default-features = false, features = [
"server-api-ring",
"server-api-aws-lc-rs",
"pg-ext-types",
] }
pin-project = "1.0"
@@ -110,7 +110,7 @@ regex.workspace = true
reqwest.workspace = true
rust-embed = { version = "6.6", optional = true, features = ["debug-embed"] }
rust_decimal = { workspace = true, features = ["db-postgres"] }
rustls = { workspace = true, default-features = false, features = ["ring", "logging", "std", "tls12"] }
rustls = { workspace = true, default-features = false, features = ["aws_lc_rs", "logging", "std", "tls12"] }
rustls-pemfile = "2.0"
rustls-pki-types = "1.0"
serde.workspace = true
@@ -153,7 +153,7 @@ common-test-util.workspace = true
criterion = "0.5"
json5 = "0.4"
mysql_async = { version = "0.35", default-features = false, features = [
"default-rustls-ring",
"default-rustls",
] }
permutation = "0.4"
rand.workspace = true

View File

@@ -19,6 +19,7 @@ fn main() {
#[cfg(feature = "dashboard")]
fn fetch_dashboard_assets() {
use std::path::PathBuf;
use std::process::{Command, Stdio};
let message = "Failed to fetch dashboard assets";
@@ -30,7 +31,16 @@ or it's a network error, just try again or enable/disable some proxy."#;
let mut dir = std::env::current_dir().unwrap();
dir.pop();
dir.pop();
dir.push("scripts");
let scripts_dir = dir.join("scripts");
let dashboard_dist = dir.join(PathBuf::from("src/servers/dashboard/dist"));
if dashboard_dist.join("index.html").exists() {
println!("cargo:rerun-if-changed=dashboard/VERSION");
println!("cargo:rerun-if-changed=dashboard/dist");
return;
}
dir = scripts_dir;
let out_dir = std::env::var("OUT_DIR").unwrap();

View File

@@ -50,6 +50,7 @@ use tonic::{Request, Response, Status};
use tonic_reflection::server::v1::{ServerReflection, ServerReflectionServer};
use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu};
use crate::install_default_crypto_provider;
use crate::metrics::MetricsMiddlewareLayer;
use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
@@ -357,6 +358,11 @@ impl Server for GrpcServer {
let mut builder = tonic::transport::Server::builder().layer(metrics_layer);
if let Some(tls_config) = self.tls_config.clone() {
// tonic builds the underlying rustls server config here, which requires a
// process-level crypto provider to be installed first.
if let Err(err) = install_default_crypto_provider() {
warn!("Failed to install default rustls crypto provider: {err}");
}
builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
}

View File

@@ -64,13 +64,13 @@ pub struct SqlPlan {
schema: Option<Schema>,
}
/// Install the ring crypto provider for rustls process-wide. see:
/// Install the default crypto provider for rustls process-wide. see:
///
/// https://docs.rs/rustls/latest/rustls/crypto/struct.CryptoProvider.html#using-the-per-process-default-cryptoprovider
///
/// for more information.
pub fn install_ring_crypto_provider() -> Result<(), String> {
rustls::crypto::CryptoProvider::install_default(rustls::crypto::ring::default_provider())
pub fn install_default_crypto_provider() -> Result<(), String> {
rustls::crypto::CryptoProvider::install_default(rustls::crypto::aws_lc_rs::default_provider())
.map_err(|ret| {
format!(
"CryptoProvider already installed as: {:?}, but providing {:?}",

View File

@@ -239,7 +239,7 @@ pub fn maybe_watch_server_tls_config(
#[cfg(test)]
mod tests {
use super::*;
use crate::install_ring_crypto_provider;
use crate::install_default_crypto_provider;
use crate::tls::TlsMode::Disable;
#[test]
@@ -510,7 +510,7 @@ mod tests {
#[test]
fn test_tls_file_change_watch() {
common_telemetry::init_default_ut_logging();
let _ = install_ring_crypto_provider();
let _ = install_default_crypto_provider();
let dir = tempfile::tempdir().unwrap();
let cert_path = dir.path().join("server.crt");

View File

@@ -27,7 +27,7 @@ use datatypes::value::Value;
use mysql_async::prelude::*;
use mysql_async::{Conn, Row, SslOpts};
use servers::error::Result;
use servers::install_ring_crypto_provider;
use servers::install_default_crypto_provider;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::server::Server;
use servers::tls::{ReloadableTlsServerConfig, TlsOption};
@@ -45,7 +45,7 @@ struct MysqlOpts<'a> {
}
fn create_mysql_server(table: TableRef, opts: MysqlOpts<'_>) -> Result<Box<dyn Server>> {
let _ = install_ring_crypto_provider();
let _ = install_default_crypto_provider();
let query_handler = create_testing_sql_query_handler(table);
let io_runtime = RuntimeBuilder::default()
.worker_threads(4)

View File

@@ -27,7 +27,7 @@ use rustls::client::danger::{ServerCertVerified, ServerCertVerifier};
use rustls::{Error, SignatureScheme};
use rustls_pki_types::{CertificateDer, ServerName};
use servers::error::Result;
use servers::install_ring_crypto_provider;
use servers::install_default_crypto_provider;
use servers::postgres::PostgresServer;
use servers::server::Server;
use servers::tls::{ReloadableTlsServerConfig, TlsOption};
@@ -365,7 +365,7 @@ async fn test_extended_query() -> Result<()> {
async fn start_test_server(server_tls: TlsOption) -> Result<u16> {
common_telemetry::init_default_ut_logging();
let _ = install_ring_crypto_provider();
let _ = install_default_crypto_provider();
let table = MemTable::default_numbers_table();
let mut pg_server = create_postgres_server(table, false, server_tls, None)?;

View File

@@ -46,6 +46,7 @@ paste.workspace = true
rand = { workspace = true }
rand_chacha = "0.9"
reqwest = { workspace = true }
rustls = { workspace = true, default-features = false, features = ["aws_lc_rs", "std", "tls12"] }
schemars = "0.8"
serde = { workspace = true }
serde_json = { workspace = true }

View File

@@ -21,5 +21,19 @@ pub mod translator;
pub mod utils;
pub mod validator;
use std::sync::OnceLock;
#[cfg(test)]
pub mod test_utils;
static RUSTLS_CRYPTO_PROVIDER_INIT: OnceLock<()> = OnceLock::new();
pub fn install_rustls_crypto_provider() {
RUSTLS_CRYPTO_PROVIDER_INIT.get_or_init(|| {
if rustls::crypto::CryptoProvider::get_default().is_none() {
let _ = rustls::crypto::CryptoProvider::install_default(
rustls::crypto::aws_lc_rs::default_provider(),
);
}
});
}

View File

@@ -52,6 +52,8 @@ const GT_MYSQL_ADDR: &str = "GT_MYSQL_ADDR";
/// Connects to GreptimeDB via env variables.
pub async fn init_greptime_connections_via_env() -> Connections {
crate::install_rustls_crypto_provider();
let _ = dotenv::dotenv();
let mysql = if let Ok(addr) = env::var(GT_MYSQL_ADDR) {
Some(addr)

View File

@@ -129,19 +129,20 @@ SELECT * FROM information_schema.ssts_index_meta ORDER BY meta_json;
-- SQLNESS REPLACE (\s+\d+\s+) <NUM>
-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) <UUID>
-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,9}) <DATETIME>
-- SQLNESS REPLACE (/public/\d+/\d+_\d+) /public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>
SELECT * FROM information_schema.ssts_storage order by file_path;
+---------------------------------------------------------------------------------------------+-----------+------------------+---------+
| file_path | file_size | last_modified_ms | node_id |
+---------------------------------------------------------------------------------------------+-----------+------------------+---------+
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet | | |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin | | |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet | | |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin | | |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet | | |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin | | |<NUM>|
+---------------------------------------------------------------------------------------------+-----------+------------------+---------+
+---------------------------------------------------------------------------------------------+-----------+-------------------------+---------+
| file_path | file_size | last_modified_ms | node_id |
+---------------------------------------------------------------------------------------------+-----------+-------------------------+---------+
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| <DATETIME> |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>| <DATETIME> |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| <DATETIME> |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>| <DATETIME> |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| <DATETIME> |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>| <DATETIME> |<NUM>|
+---------------------------------------------------------------------------------------------+-----------+-------------------------+---------+
INSERT INTO sst_case VALUES
(24, 'foo', 'foo', 100),
@@ -205,23 +206,24 @@ SELECT * FROM information_schema.ssts_index_meta ORDER BY meta_json;
-- SQLNESS REPLACE (\s+\d+\s+) <NUM>
-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) <UUID>
-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,9}) <DATETIME>
-- SQLNESS REPLACE (/public/\d+/\d+_\d+) /public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>
SELECT * FROM information_schema.ssts_storage order by file_path;
+---------------------------------------------------------------------------------------------+-----------+------------------+---------+
| file_path | file_size | last_modified_ms | node_id |
+---------------------------------------------------------------------------------------------+-----------+------------------+---------+
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet | | |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet | | |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin | | |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin | | |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet | | |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet | | |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin | | |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin | | |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet | | |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin | | |<NUM>|
+---------------------------------------------------------------------------------------------+-----------+------------------+---------+
+---------------------------------------------------------------------------------------------+-----------+-------------------------+---------+
| file_path | file_size | last_modified_ms | node_id |
+---------------------------------------------------------------------------------------------+-----------+-------------------------+---------+
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| <DATETIME> |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| <DATETIME> |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>| <DATETIME> |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>| <DATETIME> |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| <DATETIME> |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| <DATETIME> |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>| <DATETIME> |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>| <DATETIME> |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/<UUID>.parquet |<NUM>| <DATETIME> |<NUM>|
| data/greptime/public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>/index/<UUID>.puffin |<NUM>| <DATETIME> |<NUM>|
+---------------------------------------------------------------------------------------------+-----------+-------------------------+---------+
DROP TABLE sst_case;

View File

@@ -40,6 +40,7 @@ SELECT * FROM information_schema.ssts_index_meta ORDER BY meta_json;
-- SQLNESS REPLACE (\s+\d+\s+) <NUM>
-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) <UUID>
-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,9}) <DATETIME>
-- SQLNESS REPLACE (/public/\d+/\d+_\d+) /public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>
SELECT * FROM information_schema.ssts_storage order by file_path;
@@ -67,6 +68,7 @@ SELECT * FROM information_schema.ssts_index_meta ORDER BY meta_json;
-- SQLNESS REPLACE (\s+\d+\s+) <NUM>
-- SQLNESS REPLACE ([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}) <UUID>
-- SQLNESS REPLACE (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3,9}) <DATETIME>
-- SQLNESS REPLACE (/public/\d+/\d+_\d+) /public/<TABLE_ID>/<REGION_ID>_<REGION_NUMBER>
SELECT * FROM information_schema.ssts_storage order by file_path;

View File

@@ -19,9 +19,9 @@ datatypes = { workspace = true }
flate2 = "1.0"
hex = "0.4"
local-ip-address = "0.6"
mysql = { version = "26", default-features = false, features = ["minimal", "rustls-tls-ring"] }
mysql = { version = "26", default-features = false, features = ["minimal", "rustls-tls"] }
num_cpus = "1.16"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "stream"] }
serde.workspace = true
serde_json.workspace = true
sha2 = "0.10"