From 98ef74bff4507b730369b5a15ba69a31d3206903 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 23 Feb 2023 11:20:45 +0800 Subject: [PATCH] chore: Bump OpenDAL to v0.27 (#1057) * Bump OpenDAL to v0.27 Signed-off-by: Xuanwo * Make cargo check happy Signed-off-by: Xuanwo * Address comments Signed-off-by: Xuanwo * Address comments Signed-off-by: Xuanwo * Format toml Signed-off-by: Xuanwo * Make taplo happy Signed-off-by: Xuanwo --------- Signed-off-by: Xuanwo --- Cargo.lock | 143 +++++++++--- src/catalog/src/system.rs | 4 +- src/common/procedure/src/local.rs | 5 +- src/common/procedure/src/store.rs | 5 +- src/common/procedure/src/store/state_store.rs | 36 +-- src/datanode/src/instance.rs | 31 ++- src/datanode/src/sql.rs | 6 +- src/datanode/src/sql/copy_table.rs | 6 +- src/mito/src/table/test_util.rs | 6 +- src/object-store/Cargo.toml | 5 +- src/object-store/src/backend.rs | 1 + src/object-store/src/backend/azblob.rs | 2 +- src/object-store/src/backend/fs.rs | 2 +- src/object-store/src/backend/memory.rs | 2 +- src/object-store/src/backend/oss.rs | 15 ++ src/object-store/src/backend/s3.rs | 2 +- src/object-store/src/cache_policy.rs | 213 ++++++++++++------ src/object-store/src/lib.rs | 4 +- src/object-store/tests/object_store_test.rs | 34 +-- src/storage/src/compaction/writer.rs | 6 +- src/storage/src/engine.rs | 3 +- src/storage/src/file_purger.rs | 8 +- src/storage/src/manifest/region.rs | 5 +- src/storage/src/manifest/storage.rs | 5 +- src/storage/src/region/tests.rs | 5 +- src/storage/src/sst/parquet.rs | 15 +- src/storage/src/test_util/config_util.rs | 6 +- tests-integration/src/test_util.rs | 9 +- 28 files changed, 373 insertions(+), 211 deletions(-) create mode 100644 src/object-store/src/backend/oss.rs diff --git a/Cargo.lock b/Cargo.lock index c2c15a6a99..b93ffab824 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -653,6 +653,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "backon" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f34fac4d7cdaefa2deded0eda2d5d59dbfd43370ff3f856209e72340ae84c294" +dependencies = [ + "futures", + "pin-project", + "rand 0.8.5", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.67" @@ -729,25 +741,6 @@ dependencies = [ "serde", ] -[[package]] -name = "bincode" -version = "2.0.0-rc.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bb50c5a2ef4b9b1e7ae73e3a73b52ea24b20312d629f9c4df28260b7ad2c3c4" -dependencies = [ - "bincode_derive", - "serde", -] - -[[package]] -name = "bincode_derive" -version = "2.0.0-rc.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a45a23389446d2dd25dc8e73a7a3b3c43522b630cac068927f0649d43d719d2" -dependencies = [ - "virtue", -] - [[package]] name = "bindgen" version = "0.59.2" @@ -2172,7 +2165,7 @@ dependencies = [ "axum", "axum-macros", "axum-test-helper", - "backon", + "backon 0.2.0", "catalog", "client", "common-base", @@ -2248,6 +2241,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de" dependencies = [ "const-oid", + "pem-rfc7468", + "zeroize", ] [[package]] @@ -2336,6 +2331,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" dependencies = [ "block-buffer", + "const-oid", "crypto-common", "subtle", ] @@ -3464,6 +3460,9 @@ name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +dependencies = [ + "spin", +] [[package]] name = "lazycell" @@ -3662,7 +3661,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2f270b952b07995fe874b10a5ed7dd28c80aa2130e37a7de7ed667d034e0a521" dependencies = [ - "bincode 1.3.3", + "bincode", "cactus", "cfgrammar", "filetime", @@ -4321,6 +4320,23 @@ dependencies = [ "serde", ] +[[package]] +name = "num-bigint-dig" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2399c9463abc5f909349d8aa9ba080e0b88b3ce2885389b60b993f39b1a56905" +dependencies = [ + "byteorder", + "lazy_static", + "libm", + "num-integer", + "num-iter", + "num-traits", + "rand 0.8.5", + "smallvec", + "zeroize", +] + [[package]] name = "num-complex" version = "0.4.3" @@ -4480,16 +4496,15 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.25.1" +version = "0.27.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73829d3a057542556dc2c2d2b70700a44dda913cdb5483094c20ef9673ca283c" +checksum = "ef6f7b936f2f8483e19643357cb50d9ec9a49c506971ef69ca676913cf5afd91" dependencies = [ "anyhow", "async-compat", "async-trait", - "backon", + "backon 0.4.0", "base64 0.21.0", - "bincode 2.0.0-rc.2", "bytes", "flagset", "futures", @@ -4795,6 +4810,15 @@ dependencies = [ "base64 0.13.1", ] +[[package]] +name = "pem-rfc7468" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d159833a9105500e0398934e205e0773f0b27529557134ecfc51c27646adac" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.2.0" @@ -4996,6 +5020,28 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs1" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eff33bdbdfc54cc98a2eca766ebdec3e1b8fb7387523d5c9c9a2891da856f719" +dependencies = [ + "der", + "pkcs8", + "spki", + "zeroize", +] + +[[package]] +name = "pkcs8" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.26" @@ -5763,12 +5809,12 @@ dependencies = [ [[package]] name = "reqsign" -version = "0.8.1" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f446438814fde3785305a59a85a6d1b361ce2c9d29e58dd87c9103a242c40b6" +checksum = "ef4d5fefeaaa1e64f4aabb79da4ea68bf6d0e7935ad927728280d2a8e95735fc" dependencies = [ "anyhow", - "backon", + "backon 0.4.0", "base64 0.21.0", "bytes", "dirs", @@ -5781,6 +5827,8 @@ dependencies = [ "once_cell", "percent-encoding", "quick-xml", + "rand 0.8.5", + "rsa", "rust-ini", "serde", "serde_json", @@ -5905,6 +5953,27 @@ dependencies = [ "serde", ] +[[package]] +name = "rsa" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b3896c9b7790b70a9aa314a30e4ae114200992a19c96cbe0ca6070edd32ab8" +dependencies = [ + "byteorder", + "digest", + "num-bigint-dig", + "num-integer", + "num-iter", + "num-traits", + "pkcs1", + "pkcs8", + "rand_core 0.6.4", + "sha2", + "signature", + "subtle", + "zeroize", +] + [[package]] name = "rust-ini" version = "0.18.0" @@ -6076,7 +6145,7 @@ name = "rustpython-compiler-core" version = "0.1.2" source = "git+https://github.com/discord9/RustPython?rev=2e126345#2e12634569d01674724490193eb9638f056e51ca" dependencies = [ - "bincode 1.3.3", + "bincode", "bitflags", "bstr", "itertools", @@ -6826,6 +6895,10 @@ name = "signature" version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fe458c98333f9c8152221191a77e2a44e8325d0193484af2e9421a53019e57d" +dependencies = [ + "digest", + "rand_core 0.6.4", +] [[package]] name = "simba" @@ -8368,12 +8441,6 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" -[[package]] -name = "virtue" -version = "0.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b60dcd6a64dd45abf9bd426970c9843726da7fc08f44cd6fcebf68c21220a63" - [[package]] name = "vob" version = "3.0.2" @@ -8756,6 +8823,12 @@ dependencies = [ "lzma-sys", ] +[[package]] +name = "zeroize" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f" + [[package]] name = "zstd" version = "0.12.2+zstd.1.5.2" diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 71d96ba45e..cd55446d28 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -399,7 +399,7 @@ mod tests { use log_store::NoopLogStore; use mito::config::EngineConfig; use mito::engine::MitoEngine; - use object_store::ObjectStore; + use object_store::{ObjectStore, ObjectStoreBuilder}; use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; @@ -486,7 +486,7 @@ mod tests { .root(&store_dir) .build() .unwrap(); - let object_store = ObjectStore::new(accessor); + let object_store = ObjectStore::new(accessor).finish(); let noop_compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let table_engine = Arc::new(MitoEngine::new( EngineConfig::default(), diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 468c8f6dcd..7022231351 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -411,7 +411,8 @@ impl ProcedureManager for LocalManager { /// Create a new [ProcedureMeta] for test purpose. #[cfg(test)] mod test_util { - use object_store::services::fs::Builder; + use object_store::services::Fs as Builder; + use object_store::ObjectStoreBuilder; use tempdir::TempDir; use super::*; @@ -423,7 +424,7 @@ mod test_util { pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore { let store_dir = dir.path().to_str().unwrap(); let accessor = Builder::default().root(store_dir).build().unwrap(); - ObjectStore::new(accessor) + ObjectStore::new(accessor).finish() } } diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index 680c2ecf9d..5198e1c110 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -246,7 +246,8 @@ impl ParsedKey { #[cfg(test)] mod tests { use async_trait::async_trait; - use object_store::services::fs::Builder; + use object_store::services::Fs as Builder; + use object_store::ObjectStoreBuilder; use tempdir::TempDir; use super::*; @@ -255,7 +256,7 @@ mod tests { fn procedure_store_for_test(dir: &TempDir) -> ProcedureStore { let store_dir = dir.path().to_str().unwrap(); let accessor = Builder::default().root(store_dir).build().unwrap(); - let object_store = ObjectStore::new(accessor); + let object_store = ObjectStore::new(accessor).finish(); ProcedureStore::from(object_store) } diff --git a/src/common/procedure/src/store/state_store.rs b/src/common/procedure/src/store/state_store.rs index 052d26a45a..97cebdbd5a 100644 --- a/src/common/procedure/src/store/state_store.rs +++ b/src/common/procedure/src/store/state_store.rs @@ -20,9 +20,7 @@ use futures::{Stream, TryStreamExt}; use object_store::{ObjectMode, ObjectStore}; use snafu::ResultExt; -use crate::error::{ - DeleteStateSnafu, Error, ListStateSnafu, PutStateSnafu, ReadStateSnafu, Result, -}; +use crate::error::{DeleteStateSnafu, Error, PutStateSnafu, Result}; /// Key value from state store. type KeyValue = (String, Vec); @@ -72,22 +70,23 @@ impl StateStore for ObjectStateStore { async fn walk_top_down(&self, path: &str) -> Result { let path_string = path.to_string(); - let op = self.store.batch(); - // Note that there is no guarantee about the order between files and dirs - // at the same level. - // See https://docs.rs/opendal/0.25.2/opendal/raw/struct.TopDownWalker.html#note - let stream = op - .walk_top_down(path) - .context(ListStateSnafu { path })? - .map_err(move |e| Error::ListState { + + let lister = self + .store + .object(path) + .scan() + .await + .map_err(|e| Error::ListState { path: path_string.clone(), source: e, - }) + })?; + + let stream = lister .try_filter_map(|entry| async move { let key = entry.path(); - let key_value = match entry.mode().await.context(ReadStateSnafu { key })? { + let key_value = match entry.mode().await? { ObjectMode::FILE => { - let value = entry.read().await.context(ReadStateSnafu { key })?; + let value = entry.read().await?; Some((key.to_string(), value)) } @@ -95,6 +94,10 @@ impl StateStore for ObjectStateStore { }; Ok(key_value) + }) + .map_err(move |e| Error::ListState { + path: path_string.clone(), + source: e, }); Ok(Box::pin(stream)) @@ -112,7 +115,8 @@ impl StateStore for ObjectStateStore { #[cfg(test)] mod tests { - use object_store::services::fs::Builder; + use object_store::services::Fs as Builder; + use object_store::ObjectStoreBuilder; use tempdir::TempDir; use super::*; @@ -122,7 +126,7 @@ mod tests { let dir = TempDir::new("state_store").unwrap(); let store_dir = dir.path().to_str().unwrap(); let accessor = Builder::default().root(store_dir).build().unwrap(); - let object_store = ObjectStore::new(accessor); + let object_store = ObjectStore::new(accessor).finish(); let state_store = ObjectStateStore::new(object_store); let data: Vec<_> = state_store diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 4caa01b269..a684dd42c2 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use std::time::Duration; use std::{fs, path}; -use backon::ExponentialBackoff; use catalog::remote::MetaKvBackend; use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; use common_base::readable_size::ReadableSize; @@ -29,12 +28,10 @@ use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOpts; use mito::config::EngineConfig as TableEngineConfig; use mito::engine::MitoEngine; -use object_store::cache_policy::LruCachePolicy; -use object_store::layers::{CacheLayer, LoggingLayer, MetricsLayer, RetryLayer, TracingLayer}; -use object_store::services::fs::Builder as FsBuilder; -use object_store::services::oss::Builder as OSSBuilder; -use object_store::services::s3::Builder as S3Builder; -use object_store::{util, ObjectStore}; +use object_store::cache_policy::LruCacheLayer; +use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer}; +use object_store::services::{Fs as FsBuilder, Oss as OSSBuilder, S3 as S3Builder}; +use object_store::{util, ObjectStore, ObjectStoreBuilder}; use query::query_engine::{QueryEngineFactory, QueryEngineRef}; use servers::Mode; use snafu::prelude::*; @@ -227,7 +224,7 @@ pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result object_store.map(|object_store| { object_store - .layer(RetryLayer::new(ExponentialBackoff::default().with_jitter())) + .layer(RetryLayer::new().with_jitter()) .layer(MetricsLayer) .layer(LoggingLayer::default()) .layer(TracingLayer) @@ -258,7 +255,7 @@ pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Re config: store_config.clone(), })?; - create_object_store_with_cache(ObjectStore::new(accessor), store_config) + create_object_store_with_cache(ObjectStore::new(accessor).finish(), store_config) } fn create_object_store_with_cache( @@ -285,13 +282,13 @@ fn create_object_store_with_cache( if let Some(path) = cache_path { let cache_store = - ObjectStore::new(FsBuilder::default().root(path).build().with_context(|_| { - error::InitBackendSnafu { + FsBuilder::default() + .root(path) + .build() + .with_context(|_| error::InitBackendSnafu { config: store_config.clone(), - } - })?); - let policy = LruCachePolicy::new(cache_capacity.0 as usize); - let cache_layer = CacheLayer::new(cache_store).with_policy(policy); + })?; + let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize); Ok(object_store.layer(cache_layer)) } else { Ok(object_store) @@ -328,7 +325,7 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res config: store_config.clone(), })?; - create_object_store_with_cache(ObjectStore::new(accessor), store_config) + create_object_store_with_cache(ObjectStore::new(accessor).finish(), store_config) } pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result { @@ -351,7 +348,7 @@ pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Res config: store_config.clone(), })?; - Ok(ObjectStore::new(accessor)) + Ok(ObjectStore::new(accessor).finish()) } /// Create metasrv client instance and spawn heartbeat loop. diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 34667260d9..72b4342dee 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -147,8 +147,8 @@ mod tests { use log_store::NoopLogStore; use mito::config::EngineConfig as TableEngineConfig; use mito::engine::MitoEngine; - use object_store::services::fs::Builder; - use object_store::ObjectStore; + use object_store::services::Fs as Builder; + use object_store::{ObjectStore, ObjectStoreBuilder}; use query::parser::{QueryLanguageParser, QueryStatement}; use query::QueryEngineFactory; use session::context::QueryContext; @@ -213,7 +213,7 @@ mod tests { let dir = TempDir::new("setup_test_engine_and_table").unwrap(); let store_dir = dir.path().to_string_lossy(); let accessor = Builder::default().root(&store_dir).build().unwrap(); - let object_store = ObjectStore::new(accessor); + let object_store = ObjectStore::new(accessor).finish(); let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let sql = r#"insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), diff --git a/src/datanode/src/sql/copy_table.rs b/src/datanode/src/sql/copy_table.rs index 6159e33f01..8131604111 100644 --- a/src/datanode/src/sql/copy_table.rs +++ b/src/datanode/src/sql/copy_table.rs @@ -22,8 +22,8 @@ use datafusion::parquet::basic::{Compression, Encoding}; use datafusion::parquet::file::properties::WriterProperties; use datafusion::physical_plan::RecordBatchStream; use futures::TryStreamExt; -use object_store::services::fs::Builder; -use object_store::ObjectStore; +use object_store::services::Fs as Builder; +use object_store::{ObjectStore, ObjectStoreBuilder}; use snafu::ResultExt; use table::engine::TableReference; use table::requests::CopyTableRequest; @@ -53,7 +53,7 @@ impl SqlHandler { let stream = Box::pin(DfRecordBatchStreamAdapter::new(stream)); let accessor = Builder::default().build().unwrap(); - let object_store = ObjectStore::new(accessor); + let object_store = ObjectStore::new(accessor).finish(); let mut parquet_writer = ParquetWriter::new(req.file_name, stream, object_store); // TODO(jiachun): diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index 4a21e520f2..78eba284c3 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -21,8 +21,8 @@ use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef}; use datatypes::vectors::VectorRef; use log_store::NoopLogStore; -use object_store::services::fs::Builder; -use object_store::ObjectStore; +use object_store::services::Fs as Builder; +use object_store::{ObjectStore, ObjectStoreBuilder}; use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; @@ -99,7 +99,7 @@ pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) { let dir = TempDir::new(prefix).unwrap(); let store_dir = dir.path().to_string_lossy(); let accessor = Builder::default().root(&store_dir).build().unwrap(); - (dir, ObjectStore::new(accessor)) + (dir, ObjectStore::new(accessor).finish()) } pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest { diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index f18e1e9f7f..9390492e03 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -8,10 +8,7 @@ license.workspace = true lru = "0.9" async-trait = "0.1" futures = { version = "0.3" } -opendal = { version = "0.25.1", features = [ - "layers-tracing", - "layers-metrics", -] } +opendal = { version = "0.27", features = ["layers-tracing", "layers-metrics"] } tokio.workspace = true [dev-dependencies] diff --git a/src/object-store/src/backend.rs b/src/object-store/src/backend.rs index c4689d79d8..fb84212dfa 100644 --- a/src/object-store/src/backend.rs +++ b/src/object-store/src/backend.rs @@ -15,4 +15,5 @@ pub mod azblob; pub mod fs; pub mod memory; +pub mod oss; pub mod s3; diff --git a/src/object-store/src/backend/azblob.rs b/src/object-store/src/backend/azblob.rs index 755c77d60a..e4545682a1 100644 --- a/src/object-store/src/backend/azblob.rs +++ b/src/object-store/src/backend/azblob.rs @@ -12,4 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use opendal::services::azblob::Builder; +pub use opendal::services::Azblob as Builder; diff --git a/src/object-store/src/backend/fs.rs b/src/object-store/src/backend/fs.rs index bc1cebe5b2..20dadc0e46 100644 --- a/src/object-store/src/backend/fs.rs +++ b/src/object-store/src/backend/fs.rs @@ -12,4 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use opendal::services::fs::Builder; +pub use opendal::services::Fs as Builder; diff --git a/src/object-store/src/backend/memory.rs b/src/object-store/src/backend/memory.rs index 22fd16186a..456938bc98 100644 --- a/src/object-store/src/backend/memory.rs +++ b/src/object-store/src/backend/memory.rs @@ -12,4 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use opendal::services::memory::Builder; +pub use opendal::services::Memory as Builder; diff --git a/src/object-store/src/backend/oss.rs b/src/object-store/src/backend/oss.rs new file mode 100644 index 0000000000..c68791ad9e --- /dev/null +++ b/src/object-store/src/backend/oss.rs @@ -0,0 +1,15 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub use opendal::services::Oss as Builder; diff --git a/src/object-store/src/backend/s3.rs b/src/object-store/src/backend/s3.rs index 46faa2659d..4912b1ebc4 100644 --- a/src/object-store/src/backend/s3.rs +++ b/src/object-store/src/backend/s3.rs @@ -12,4 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use opendal::services::s3::Builder; +pub use opendal::services::S3 as Builder; diff --git a/src/object-store/src/cache_policy.rs b/src/object-store/src/cache_policy.rs index 5c60b5ad50..1ef6004c7e 100644 --- a/src/object-store/src/cache_policy.rs +++ b/src/object-store/src/cache_policy.rs @@ -12,112 +12,177 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io; use std::num::NonZeroUsize; use std::ops::DerefMut; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use async_trait::async_trait; -use futures::future::BoxFuture; +use futures::AsyncRead; use lru::LruCache; -use opendal::layers::CachePolicy; -use opendal::raw::output::Reader; -use opendal::raw::{Accessor, RpDelete, RpRead}; -use opendal::{ErrorKind, OpDelete, OpRead, OpWrite, Result}; +use opendal::ops::*; +use opendal::raw::*; +use opendal::{ErrorKind, Result}; use tokio::sync::Mutex; -#[derive(Debug)] -pub struct LruCachePolicy { +pub struct LruCacheLayer { + cache: Arc, lru_cache: Arc>>, } -impl LruCachePolicy { - pub fn new(capacity: usize) -> Self { +impl LruCacheLayer { + pub fn new(cache: Arc, capacity: usize) -> Self { Self { + cache, lru_cache: Arc::new(Mutex::new(LruCache::new( NonZeroUsize::new(capacity).unwrap(), ))), } } +} +impl Layer for LruCacheLayer { + type LayeredAccessor = LruCacheAccessor; + + fn layer(&self, inner: I) -> Self::LayeredAccessor { + LruCacheAccessor { + inner: Arc::new(inner), + cache: self.cache.clone(), + lru_cache: self.lru_cache.clone(), + } + } +} + +#[derive(Debug)] +pub struct LruCacheAccessor { + inner: Arc, + cache: Arc, + lru_cache: Arc>>, +} + +impl LruCacheAccessor { fn cache_path(&self, path: &str, args: &OpRead) -> String { format!("{}.cache-{}", path, args.range().to_header()) } } #[async_trait] -impl CachePolicy for LruCachePolicy { - fn on_read( - &self, - inner: Arc, - cache: Arc, - path: &str, - args: OpRead, - ) -> BoxFuture<'static, Result<(RpRead, Reader)>> { +impl LayeredAccessor for LruCacheAccessor { + type Inner = I; + type Reader = output::Reader; + type BlockingReader = I::BlockingReader; + type Pager = I::Pager; + type BlockingPager = I::BlockingPager; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let path = path.to_string(); let cache_path = self.cache_path(&path, &args); let lru_cache = self.lru_cache.clone(); - Box::pin(async move { - match cache.read(&cache_path, OpRead::default()).await { - Ok(v) => { - // update lru when cache hit - let mut lru_cache = lru_cache.lock().await; - lru_cache.get_or_insert(cache_path.clone(), || ()); - Ok(v) - } - Err(err) if err.kind() == ErrorKind::ObjectNotFound => { - let (rp, reader) = inner.read(&path, args.clone()).await?; - let size = rp.clone().into_metadata().content_length(); - let _ = cache - .write(&cache_path, OpWrite::new(size), Box::new(reader)) - .await?; - match cache.read(&cache_path, OpRead::default()).await { - Ok(v) => { - let r = { - // push new cache file name to lru - let mut lru_cache = lru_cache.lock().await; - lru_cache.push(cache_path.clone(), ()) - }; - // delete the evicted cache file - if let Some((k, _v)) = r { - let _ = cache.delete(&k, OpDelete::new()).await; - } - Ok(v) - } - Err(_) => inner.read(&path, args).await, - } - } - Err(_) => inner.read(&path, args).await, + + match self.cache.read(&cache_path, OpRead::default()).await { + Ok((rp, r)) => { + // update lru when cache hit + let mut lru_cache = lru_cache.lock().await; + lru_cache.get_or_insert(cache_path.clone(), || ()); + Ok(to_output_reader((rp, r))) } - }) + Err(err) if err.kind() == ErrorKind::ObjectNotFound => { + let (rp, reader) = self.inner.read(&path, args.clone()).await?; + let size = rp.clone().into_metadata().content_length(); + let _ = self + .cache + .write( + &cache_path, + OpWrite::new(size), + Box::new(ReadWrapper(reader)), + ) + .await?; + match self.cache.read(&cache_path, OpRead::default()).await { + Ok((rp, reader)) => { + let r = { + // push new cache file name to lru + let mut lru_cache = lru_cache.lock().await; + lru_cache.push(cache_path.clone(), ()) + }; + // delete the evicted cache file + if let Some((k, _v)) = r { + let _ = self.cache.delete(&k, OpDelete::new()).await; + } + return Ok(to_output_reader((rp, reader))); + } + Err(_) => return self.inner.read(&path, args).await.map(to_output_reader), + } + } + Err(_) => return self.inner.read(&path, args).await.map(to_output_reader), + } } - fn on_delete( - &self, - inner: Arc, - cache: Arc, - path: &str, - args: OpDelete, - ) -> BoxFuture<'static, Result> { + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + self.inner.blocking_read(path, args) + } + + async fn delete(&self, path: &str, args: OpDelete) -> Result { let path = path.to_string(); let lru_cache = self.lru_cache.clone(); - Box::pin(async move { - let cache_files: Vec = { - let mut guard = lru_cache.lock().await; - let lru = guard.deref_mut(); - let cache_files = lru - .iter() - .filter(|(k, _v)| k.starts_with(format!("{path}.cache-").as_str())) - .map(|(k, _v)| k.clone()) - .collect::>(); - for k in &cache_files { - lru.pop(k); - } - cache_files - }; - for file in cache_files { - let _ = cache.delete(&file, OpDelete::new()).await; + + let cache_files: Vec = { + let mut guard = lru_cache.lock().await; + let lru = guard.deref_mut(); + let cache_files = lru + .iter() + .filter(|(k, _v)| k.starts_with(format!("{path}.cache-").as_str())) + .map(|(k, _v)| k.clone()) + .collect::>(); + for k in &cache_files { + lru.pop(k); } - inner.delete(&path, args).await - }) + cache_files + }; + for file in cache_files { + let _ = self.cache.delete(&file, OpDelete::new()).await; + } + return self.inner.delete(&path, args).await; + } + + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + self.inner.list(path, args).await + } + + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + self.inner.scan(path, args).await + } + + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { + self.inner.blocking_list(path, args) + } + + fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> { + self.inner.blocking_scan(path, args) } } + +/// TODO: Workaround for output::Read doesn't implement input::Read +/// +/// Should be remove after opendal fixed it. +struct ReadWrapper(R); + +impl AsyncRead for ReadWrapper { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.0.poll_read(cx, buf) + } +} + +#[inline] +fn to_output_reader(input: (RpRead, R)) -> (RpRead, output::Reader) { + (input.0, Box::new(input.1)) +} diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index cd2d3b1d79..32f315ee63 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -13,8 +13,8 @@ // limitations under the License. pub use opendal::{ - layers, services, Error, ErrorKind, Layer, Object, ObjectLister, ObjectMetadata, ObjectMode, - Operator as ObjectStore, Result, + layers, services, Builder as ObjectStoreBuilder, Error, ErrorKind, Object, ObjectLister, + ObjectMetadata, ObjectMode, Operator as ObjectStore, Result, }; pub mod backend; pub mod cache_policy; diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 60a7791226..046b37cf92 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -13,15 +13,15 @@ // limitations under the License. use std::env; +use std::sync::Arc; use anyhow::Result; use common_telemetry::logging; use object_store::backend::{fs, s3}; -use object_store::cache_policy::LruCachePolicy; +use object_store::cache_policy::LruCacheLayer; use object_store::test_util::TempFolder; -use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore}; -use opendal::layers::CacheLayer; -use opendal::services::oss; +use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore, ObjectStoreBuilder}; +use opendal::services::Oss; use opendal::Operator; use tempdir::TempDir; @@ -100,7 +100,8 @@ async fn test_fs_backend() -> Result<()> { .root(&data_dir.path().to_string_lossy()) .atomic_write_dir(&tmp_dir.path().to_string_lossy()) .build()?, - ); + ) + .finish(); test_object_crud(&store).await?; test_object_list(&store).await?; @@ -124,7 +125,7 @@ async fn test_s3_backend() -> Result<()> { .bucket(&bucket) .build()?; - let store = ObjectStore::new(accessor); + let store = ObjectStore::new(accessor).finish(); let mut guard = TempFolder::new(&store, "/"); test_object_crud(&store).await?; @@ -145,14 +146,14 @@ async fn test_oss_backend() -> Result<()> { let root = uuid::Uuid::new_v4().to_string(); - let accessor = oss::Builder::default() + let accessor = Oss::default() .root(&root) .access_key_id(&env::var("GT_OSS_ACCESS_KEY_ID")?) .access_key_secret(&env::var("GT_OSS_ACCESS_KEY")?) .bucket(&bucket) .build()?; - let store = ObjectStore::new(accessor); + let store = ObjectStore::new(accessor).finish(); let mut guard = TempFolder::new(&store, "/"); test_object_crud(&store).await?; @@ -204,16 +205,15 @@ async fn test_object_store_cache_policy() -> Result<()> { // create file cache layer let cache_dir = TempDir::new("test_fs_cache")?; - let cache_op = ObjectStore::new( - fs::Builder::default() - .root(&cache_dir.path().to_string_lossy()) - .atomic_write_dir(&cache_dir.path().to_string_lossy()) - .build()?, - ); + let cache_acc = fs::Builder::default() + .root(&cache_dir.path().to_string_lossy()) + .atomic_write_dir(&cache_dir.path().to_string_lossy()) + .build()?; + let cache_store = ObjectStore::new(cache_acc.clone()).finish(); // create operator for cache dir to verify cache file - let cache_store = ObjectStore::from(cache_op.inner()); - let policy = LruCachePolicy::new(3); - let store = store.layer(CacheLayer::new(cache_op).with_policy(policy)); + let store = store + .layer(LruCacheLayer::new(Arc::new(cache_acc), 3)) + .finish(); // create several object handler. let o1 = store.object("test_file1"); diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index 61a891ff1a..43a392469f 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -92,7 +92,7 @@ mod tests { TimestampMillisecondVector, TimestampMillisecondVectorBuilder, UInt64VectorBuilder, }; use object_store::backend::fs::Builder; - use object_store::ObjectStore; + use object_store::{ObjectStore, ObjectStoreBuilder}; use store_api::storage::{ChunkReader, OpType, SequenceNumber}; use tempdir::TempDir; @@ -273,7 +273,7 @@ mod tests { let dir = TempDir::new("write_parquet").unwrap(); let path = dir.path().to_str().unwrap(); let backend = Builder::default().root(path).build().unwrap(); - let object_store = ObjectStore::new(backend); + let object_store = ObjectStore::new(backend).finish(); let seq = AtomicU64::new(0); let schema = schema_for_test(); @@ -350,7 +350,7 @@ mod tests { let dir = TempDir::new("write_parquet").unwrap(); let path = dir.path().to_str().unwrap(); let backend = Builder::default().root(path).build().unwrap(); - let object_store = ObjectStore::new(backend); + let object_store = ObjectStore::new(backend).finish(); let schema = schema_for_test(); let seq = AtomicU64::new(0); diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index b11beed5b2..068f1078cf 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -380,6 +380,7 @@ mod tests { use datatypes::type_id::LogicalTypeId; use log_store::test_util::log_store_util; use object_store::backend::fs::Builder; + use object_store::ObjectStoreBuilder; use store_api::storage::Region; use tempdir::TempDir; @@ -395,7 +396,7 @@ mod tests { let store_dir = dir.path().to_string_lossy(); let accessor = Builder::default().root(&store_dir).build().unwrap(); - let object_store = ObjectStore::new(accessor); + let object_store = ObjectStore::new(accessor).finish(); let config = EngineConfig::default(); diff --git a/src/storage/src/file_purger.rs b/src/storage/src/file_purger.rs index f395e37914..d5bec3c354 100644 --- a/src/storage/src/file_purger.rs +++ b/src/storage/src/file_purger.rs @@ -107,7 +107,7 @@ pub mod noop { #[cfg(test)] mod tests { use object_store::backend::fs::Builder; - use object_store::ObjectStore; + use object_store::{ObjectStore, ObjectStoreBuilder}; use store_api::storage::OpType; use tempdir::TempDir; @@ -172,7 +172,8 @@ mod tests { .root(dir.path().to_str().unwrap()) .build() .unwrap(), - ); + ) + .finish(); let sst_file_name = "test-file-purge-handler.parquet"; let noop_file_purger = Arc::new(LocalScheduler::new( @@ -209,7 +210,8 @@ mod tests { .root(dir.path().to_str().unwrap()) .build() .unwrap(), - ); + ) + .finish(); let sst_file_name = "test-file-purger.parquet"; let scheduler = Arc::new(LocalScheduler::new( SchedulerConfig::default(), diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index 5831ac6f2f..97def6f40a 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -23,7 +23,7 @@ mod tests { use std::sync::Arc; use object_store::backend::fs; - use object_store::ObjectStore; + use object_store::{ObjectStore, ObjectStoreBuilder}; use store_api::manifest::action::ProtocolAction; use store_api::manifest::{Manifest, MetaActionIterator, MAX_VERSION}; use tempdir::TempDir; @@ -41,7 +41,8 @@ mod tests { .root(&tmp_dir.path().to_string_lossy()) .build() .unwrap(), - ); + ) + .finish(); let manifest = RegionManifest::new("/manifest/", object_store); diff --git a/src/storage/src/manifest/storage.rs b/src/storage/src/manifest/storage.rs index 04634bcd57..8cf498ff3f 100644 --- a/src/storage/src/manifest/storage.rs +++ b/src/storage/src/manifest/storage.rs @@ -278,7 +278,7 @@ impl ManifestLogStorage for ManifestObjectStore { #[cfg(test)] mod tests { use object_store::backend::fs; - use object_store::ObjectStore; + use object_store::{ObjectStore, ObjectStoreBuilder}; use tempdir::TempDir; use super::*; @@ -292,7 +292,8 @@ mod tests { .root(&tmp_dir.path().to_string_lossy()) .build() .unwrap(), - ); + ) + .finish(); let log_store = ManifestObjectStore::new("/", object_store); diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index a48fac33e8..c6d8083d2b 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -30,7 +30,7 @@ use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef}; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::NoopLogStore; use object_store::backend::fs; -use object_store::ObjectStore; +use object_store::{ObjectStore, ObjectStoreBuilder}; use store_api::storage::{ consts, Chunk, ChunkReader, RegionMeta, ScanRequest, SequenceNumber, Snapshot, WriteRequest, }; @@ -286,7 +286,8 @@ async fn test_recover_region_manifets() { .root(&tmp_dir.path().to_string_lossy()) .build() .unwrap(), - ); + ) + .finish(); let manifest = RegionManifest::new("/manifest/", object_store.clone()); let region_meta = Arc::new(build_region_meta()); diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 0d6effa653..f79c5558fb 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -524,6 +524,7 @@ mod tests { use datatypes::types::{TimestampMillisecondType, TimestampType}; use datatypes::vectors::TimestampMillisecondVector; use object_store::backend::fs::Builder; + use object_store::ObjectStoreBuilder; use store_api::storage::OpType; use tempdir::TempDir; @@ -563,7 +564,7 @@ mod tests { let dir = TempDir::new("write_parquet").unwrap(); let path = dir.path().to_str().unwrap(); let backend = Builder::default().root(path).build().unwrap(); - let object_store = ObjectStore::new(backend); + let object_store = ObjectStore::new(backend).finish(); let sst_file_name = "test-flush.parquet"; let iter = memtable.iter(&IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); @@ -661,7 +662,7 @@ mod tests { let dir = TempDir::new("write_parquet").unwrap(); let path = dir.path().to_str().unwrap(); let backend = Builder::default().root(path).build().unwrap(); - let object_store = ObjectStore::new(backend); + let object_store = ObjectStore::new(backend).finish(); let sst_file_name = "test-read-large.parquet"; let iter = memtable.iter(&IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); @@ -683,7 +684,8 @@ mod tests { .root(dir.path().to_str().unwrap()) .build() .unwrap(), - ); + ) + .finish(); let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap()); let reader = ParquetReader::new( @@ -733,7 +735,7 @@ mod tests { let dir = TempDir::new("write_parquet").unwrap(); let path = dir.path().to_str().unwrap(); let backend = Builder::default().root(path).build().unwrap(); - let object_store = ObjectStore::new(backend); + let object_store = ObjectStore::new(backend).finish(); let sst_file_name = "test-read.parquet"; let iter = memtable.iter(&IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); @@ -755,7 +757,8 @@ mod tests { .root(dir.path().to_str().unwrap()) .build() .unwrap(), - ); + ) + .finish(); let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap()); let reader = ParquetReader::new( @@ -845,7 +848,7 @@ mod tests { let dir = TempDir::new("read-parquet-by-range").unwrap(); let path = dir.path().to_str().unwrap(); let backend = Builder::default().root(path).build().unwrap(); - let object_store = ObjectStore::new(backend); + let object_store = ObjectStore::new(backend).finish(); let sst_file_name = "test-read.parquet"; let iter = memtable.iter(&IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index 31459cecee..dcf7151518 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -16,8 +16,8 @@ use std::sync::Arc; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::LogConfig; -use object_store::backend::fs::Builder; -use object_store::ObjectStore; +use object_store::services::Fs as Builder; +use object_store::{ObjectStore, ObjectStoreBuilder}; use crate::background::JobPoolImpl; use crate::compaction::noop::NoopCompactionScheduler; @@ -44,7 +44,7 @@ pub async fn new_store_config( let manifest_dir = engine::region_manifest_dir(parent_dir, region_name); let accessor = Builder::default().root(store_dir).build().unwrap(); - let object_store = ObjectStore::new(accessor); + let object_store = ObjectStore::new(accessor).finish(); let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone())); let manifest = RegionManifest::new(&manifest_dir, object_store); let job_pool = Arc::new(JobPoolImpl {}); diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 517050e1eb..e9e1589028 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -31,10 +31,9 @@ use datanode::sql::SqlHandler; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; use frontend::instance::Instance as FeInstance; -use object_store::backend::s3; -use object_store::services::oss; +use object_store::backend::{oss, s3}; use object_store::test_util::TempFolder; -use object_store::ObjectStore; +use object_store::{ObjectStore, ObjectStoreBuilder}; use once_cell::sync::OnceCell; use rand::Rng; use servers::grpc::GrpcServer; @@ -116,7 +115,7 @@ fn get_test_store_config( let config = ObjectStoreConfig::Oss(oss_config); - let store = ObjectStore::new(accessor); + let store = ObjectStore::new(accessor).finish(); ( config, @@ -145,7 +144,7 @@ fn get_test_store_config( let config = ObjectStoreConfig::S3(s3_config); - let store = ObjectStore::new(accessor); + let store = ObjectStore::new(accessor).finish(); (config, Some(TempDirGuard::S3(TempFolder::new(&store, "/")))) }