diff --git a/Cargo.lock b/Cargo.lock index 1a612a1698..c3801b32ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1608,6 +1608,7 @@ dependencies = [ name = "common-procedure" version = "0.1.1" dependencies = [ + "async-stream", "async-trait", "backon 0.4.0", "common-error", @@ -4581,11 +4582,13 @@ version = "0.1.1" dependencies = [ "anyhow", "async-trait", + "bytes", "common-telemetry", "common-test-util", "futures", "lru 0.9.0", "opendal", + "pin-project", "tokio", "uuid", ] @@ -4624,9 +4627,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.27.2" +version = "0.30.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef6f7b936f2f8483e19643357cb50d9ec9a49c506971ef69ca676913cf5afd91" +checksum = "bc08c1c75e26f33f13f22c46b0c222132df5e12519fd67046ecf061b58e8c26f" dependencies = [ "anyhow", "async-compat", diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 1b86529635..c49c26f616 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -400,7 +400,7 @@ mod tests { use log_store::NoopLogStore; use mito::config::EngineConfig; use mito::engine::MitoEngine; - use object_store::{ObjectStore, ObjectStoreBuilder}; + use object_store::ObjectStore; use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; @@ -482,11 +482,9 @@ mod tests { pub async fn prepare_table_engine() -> (TempDir, TableEngineRef) { let dir = create_temp_dir("system-table-test"); let store_dir = dir.path().to_string_lossy(); - let accessor = object_store::services::Fs::default() - .root(&store_dir) - .build() - .unwrap(); - let object_store = ObjectStore::new(accessor).finish(); + let mut builder = object_store::services::Fs::default(); + builder.root(&store_dir); + let object_store = ObjectStore::new(builder).unwrap().finish(); let noop_compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let table_engine = Arc::new(MitoEngine::new( EngineConfig::default(), diff --git a/src/common/datasource/src/lister.rs b/src/common/datasource/src/lister.rs index 7102b26708..80b251fd85 100644 --- a/src/common/datasource/src/lister.rs +++ b/src/common/datasource/src/lister.rs @@ -13,7 +13,7 @@ // limitations under the License. use futures::{future, TryStreamExt}; -use object_store::{Object, ObjectStore}; +use object_store::{Entry, ObjectStore}; use regex::Regex; use snafu::ResultExt; @@ -46,13 +46,12 @@ impl Lister { } } - pub async fn list(&self) -> Result> { + pub async fn list(&self) -> Result> { match &self.source { Source::Dir => { let streamer = self .object_store - .object(&self.path) - .list() + .list(&self.path) .await .context(error::ListObjectsSnafu { path: &self.path })?; @@ -70,11 +69,14 @@ impl Lister { .context(error::ListObjectsSnafu { path: &self.path }) } Source::Filename(filename) => { - let obj = self - .object_store - .object(&format!("{}{}", self.path, filename)); - - Ok(vec![obj]) + // make sure this file exists + let file_full_path = format!("{}{}", self.path, filename); + let _ = self.object_store.stat(&file_full_path).await.context( + error::ListObjectsSnafu { + path: &file_full_path, + }, + )?; + Ok(vec![Entry::new(&file_full_path)]) } } } diff --git a/src/common/datasource/src/object_store/fs.rs b/src/common/datasource/src/object_store/fs.rs index c2c50ce793..78a481b294 100644 --- a/src/common/datasource/src/object_store/fs.rs +++ b/src/common/datasource/src/object_store/fs.rs @@ -13,16 +13,16 @@ // limitations under the License. use object_store::services::Fs; -use object_store::{ObjectStore, ObjectStoreBuilder}; +use object_store::ObjectStore; use snafu::ResultExt; -use crate::error::{self, Result}; +use crate::error::{BuildBackendSnafu, Result}; pub fn build_fs_backend(root: &str) -> Result { - let accessor = Fs::default() - .root(root) - .build() - .context(error::BuildBackendSnafu)?; - - Ok(ObjectStore::new(accessor).finish()) + let mut builder = Fs::default(); + builder.root(root); + let object_store = ObjectStore::new(builder) + .context(BuildBackendSnafu)? + .finish(); + Ok(object_store) } diff --git a/src/common/datasource/src/object_store/s3.rs b/src/common/datasource/src/object_store/s3.rs index d501b2a1ac..482da1bcef 100644 --- a/src/common/datasource/src/object_store/s3.rs +++ b/src/common/datasource/src/object_store/s3.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use object_store::services::S3; -use object_store::{ObjectStore, ObjectStoreBuilder}; +use object_store::ObjectStore; use snafu::ResultExt; use crate::error::{self, Result}; @@ -73,7 +73,7 @@ pub fn build_s3_backend( } } - let accessor = builder.build().context(error::BuildBackendSnafu)?; - - Ok(ObjectStore::new(accessor).finish()) + Ok(ObjectStore::new(builder) + .context(error::BuildBackendSnafu)? + .finish()) } diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index f1e5f66863..109bf4df77 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] async-trait.workspace = true +async-stream.workspace = true common-error = { path = "../error" } common-runtime = { path = "../runtime" } common-telemetry = { path = "../telemetry" } diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 05150df4b8..df14e70b0f 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -423,7 +423,6 @@ impl ProcedureManager for LocalManager { mod test_util { use common_test_util::temp_dir::TempDir; use object_store::services::Fs as Builder; - use object_store::ObjectStoreBuilder; use super::*; @@ -433,8 +432,9 @@ mod test_util { pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore { let store_dir = dir.path().to_str().unwrap(); - let accessor = Builder::default().root(store_dir).build().unwrap(); - ObjectStore::new(accessor).finish() + let mut builder = Builder::default(); + builder.root(store_dir); + ObjectStore::new(builder).unwrap().finish() } } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index d665c5b5e3..fc09accc42 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -473,8 +473,7 @@ mod tests { async fn check_files(object_store: &ObjectStore, procedure_id: ProcedureId, files: &[&str]) { let dir = format!("{procedure_id}/"); - let object = object_store.object(&dir); - let lister = object.list().await.unwrap(); + let lister = object_store.list(&dir).await.unwrap(); let mut files_in_dir: Vec<_> = lister .map_ok(|de| de.name().to_string()) .try_collect() diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index fff460ed7a..07d0fb8eb9 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -248,15 +248,15 @@ mod tests { use async_trait::async_trait; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use object_store::services::Fs as Builder; - use object_store::ObjectStoreBuilder; use super::*; use crate::{Context, LockKey, Procedure, Status}; fn procedure_store_for_test(dir: &TempDir) -> ProcedureStore { let store_dir = dir.path().to_str().unwrap(); - let accessor = Builder::default().root(store_dir).build().unwrap(); - let object_store = ObjectStore::new(accessor).finish(); + let mut builder = Builder::default(); + builder.root(store_dir); + let object_store = ObjectStore::new(builder).unwrap().finish(); ProcedureStore::from(object_store) } diff --git a/src/common/procedure/src/store/state_store.rs b/src/common/procedure/src/store/state_store.rs index baefb3db64..450940a1ad 100644 --- a/src/common/procedure/src/store/state_store.rs +++ b/src/common/procedure/src/store/state_store.rs @@ -15,12 +15,13 @@ use std::pin::Pin; use std::sync::Arc; +use async_stream::try_stream; use async_trait::async_trait; -use futures::{Stream, TryStreamExt}; -use object_store::{ObjectMode, ObjectStore}; +use futures::{Stream, StreamExt}; +use object_store::{EntryMode, Metakey, ObjectStore}; use snafu::ResultExt; -use crate::error::{DeleteStateSnafu, Error, PutStateSnafu, Result}; +use crate::error::{DeleteStateSnafu, Error, ListStateSnafu, PutStateSnafu, Result}; /// Key value from state store. type KeyValue = (String, Vec); @@ -64,49 +65,49 @@ impl ObjectStateStore { #[async_trait] impl StateStore for ObjectStateStore { async fn put(&self, key: &str, value: Vec) -> Result<()> { - let object = self.store.object(key); - object.write(value).await.context(PutStateSnafu { key }) + self.store + .write(key, value) + .await + .context(PutStateSnafu { key }) } async fn walk_top_down(&self, path: &str) -> Result { let path_string = path.to_string(); - let lister = self - .store - .object(path) - .scan() - .await - .map_err(|e| Error::ListState { - path: path_string.clone(), - source: e, - })?; + let mut lister = self.store.scan(path).await.map_err(|e| Error::ListState { + path: path_string.clone(), + source: e, + })?; - let stream = lister - .try_filter_map(|entry| async move { + let store = self.store.clone(); + + let stream = try_stream!({ + while let Some(res) = lister.next().await { + let entry = res.context(ListStateSnafu { path: &path_string })?; let key = entry.path(); - let key_value = match entry.mode().await? { - ObjectMode::FILE => { - let value = entry.read().await?; - - Some((key.to_string(), value)) - } - ObjectMode::DIR | ObjectMode::Unknown => None, - }; - - Ok(key_value) - }) - .map_err(move |e| Error::ListState { - path: path_string.clone(), - source: e, - }); + let metadata = store + .metadata(&entry, Metakey::Mode) + .await + .context(ListStateSnafu { path: key })?; + if let EntryMode::FILE = metadata.mode() { + let value = store + .read(key) + .await + .context(ListStateSnafu { path: key })?; + yield (key.to_string(), value); + } + } + }); Ok(Box::pin(stream)) } async fn delete(&self, keys: &[String]) -> Result<()> { for key in keys { - let object = self.store.object(key); - object.delete().await.context(DeleteStateSnafu { key })?; + self.store + .delete(key) + .await + .context(DeleteStateSnafu { key })?; } Ok(()) @@ -116,8 +117,8 @@ impl StateStore for ObjectStateStore { #[cfg(test)] mod tests { use common_test_util::temp_dir::create_temp_dir; + use futures_util::TryStreamExt; use object_store::services::Fs as Builder; - use object_store::ObjectStoreBuilder; use super::*; @@ -125,8 +126,10 @@ mod tests { async fn test_object_state_store() { let dir = create_temp_dir("state_store"); let store_dir = dir.path().to_str().unwrap(); - let accessor = Builder::default().root(store_dir).build().unwrap(); - let object_store = ObjectStore::new(accessor).finish(); + let mut builder = Builder::default(); + builder.root(store_dir); + + let object_store = ObjectStore::new(builder).unwrap().finish(); let state_store = ObjectStateStore::new(object_store); let data: Vec<_> = state_store diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 99d1b7c998..5f448c4729 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -330,18 +330,20 @@ pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Re ); let mut builder = OSSBuilder::default(); - let builder = builder + builder .root(&root) .bucket(&oss_config.bucket) .endpoint(&oss_config.endpoint) .access_key_id(&oss_config.access_key_id) .access_key_secret(&oss_config.access_key_secret); - let accessor = builder.build().with_context(|_| error::InitBackendSnafu { - config: store_config.clone(), - })?; + let object_store = ObjectStore::new(builder) + .with_context(|_| error::InitBackendSnafu { + config: store_config.clone(), + })? + .finish(); - create_object_store_with_cache(ObjectStore::new(accessor).finish(), store_config) + create_object_store_with_cache(object_store, store_config) } fn create_object_store_with_cache( @@ -394,24 +396,27 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res ); let mut builder = S3Builder::default(); - let mut builder = builder + builder .root(&root) .bucket(&s3_config.bucket) .access_key_id(&s3_config.access_key_id) .secret_access_key(&s3_config.secret_access_key); if s3_config.endpoint.is_some() { - builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap()); + builder.endpoint(s3_config.endpoint.as_ref().unwrap()); } if s3_config.region.is_some() { - builder = builder.region(s3_config.region.as_ref().unwrap()); + builder.region(s3_config.region.as_ref().unwrap()); } - let accessor = builder.build().with_context(|_| error::InitBackendSnafu { - config: store_config.clone(), - })?; - - create_object_store_with_cache(ObjectStore::new(accessor).finish(), store_config) + create_object_store_with_cache( + ObjectStore::new(builder) + .with_context(|_| error::InitBackendSnafu { + config: store_config.clone(), + })? + .finish(), + store_config, + ) } pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result { @@ -426,15 +431,14 @@ pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Res let atomic_write_dir = format!("{data_dir}/.tmp/"); - let accessor = FsBuilder::default() - .root(&data_dir) - .atomic_write_dir(&atomic_write_dir) - .build() + let mut builder = FsBuilder::default(); + builder.root(&data_dir).atomic_write_dir(&atomic_write_dir); + + Ok(ObjectStore::new(builder) .context(error::InitBackendSnafu { config: store_config.clone(), - })?; - - Ok(ObjectStore::new(accessor).finish()) + })? + .finish()) } /// Create metasrv client instance and spawn heartbeat loop. diff --git a/src/datanode/src/sql/copy_table_from.rs b/src/datanode/src/sql/copy_table_from.rs index 1ca4a5f27a..b661036857 100644 --- a/src/datanode/src/sql/copy_table_from.rs +++ b/src/datanode/src/sql/copy_table_from.rs @@ -48,7 +48,6 @@ impl SqlHandler { build_backend(&req.location, req.connection).context(error::BuildBackendSnafu)?; let (dir, filename) = find_dir_and_filename(&path); - let regex = req .pattern .as_ref() @@ -62,16 +61,18 @@ impl SqlHandler { Source::Dir }; - let lister = Lister::new(object_store, source, dir, regex); + let lister = Lister::new(object_store.clone(), source, dir, regex); - let objects = lister.list().await.context(error::ListObjectsSnafu)?; + let entries = lister.list().await.context(error::ListObjectsSnafu)?; let mut buf: Vec = Vec::new(); - for obj in objects.iter() { - let reader = obj.reader().await.context(error::ReadObjectSnafu { - path: &obj.path().to_string(), - })?; + for entry in entries.iter() { + let path = entry.path(); + let reader = object_store + .reader(path) + .await + .context(error::ReadObjectSnafu { path })?; let buf_reader = BufReader::new(reader.compat()); diff --git a/src/datanode/src/sql/copy_table_to.rs b/src/datanode/src/sql/copy_table_to.rs index 17a4c4e08d..7c513d5967 100644 --- a/src/datanode/src/sql/copy_table_to.rs +++ b/src/datanode/src/sql/copy_table_to.rs @@ -136,10 +136,10 @@ impl ParquetWriter { // "file_name_1_1000000" (row num: 1 ~ 1000000), // "file_name_1000001_xxx" (row num: 1000001 ~ xxx) let file_name = format!("{}_{}_{}", self.file_name, start_row_num, total_rows); - let object = self.object_store.object(&file_name); - object.write(buf).await.context(error::WriteObjectSnafu { - path: object.path(), - })?; + self.object_store + .write(&file_name, buf) + .await + .context(error::WriteObjectSnafu { path: file_name })?; if end_loop { return Ok(total_rows); diff --git a/src/mito/src/table/test_util.rs b/src/mito/src/table/test_util.rs index dd4ac97459..dcd5445711 100644 --- a/src/mito/src/table/test_util.rs +++ b/src/mito/src/table/test_util.rs @@ -23,7 +23,7 @@ use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRe use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector, VectorRef}; use log_store::NoopLogStore; use object_store::services::Fs as Builder; -use object_store::{ObjectStore, ObjectStoreBuilder}; +use object_store::ObjectStore; use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; @@ -98,8 +98,9 @@ pub fn build_test_table_info() -> TableInfo { pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) { let dir = create_temp_dir(prefix); let store_dir = dir.path().to_string_lossy(); - let accessor = Builder::default().root(&store_dir).build().unwrap(); - (dir, ObjectStore::new(accessor).finish()) + let mut builder = Builder::default(); + builder.root(&store_dir); + (dir, ObjectStore::new(builder).unwrap().finish()) } pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest { diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index e664c5842a..d38a93e85a 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -7,8 +7,10 @@ license.workspace = true [dependencies] lru = "0.9" async-trait = "0.1" +bytes = "1.4" futures = { version = "0.3" } -opendal = { version = "0.27", features = ["layers-tracing", "layers-metrics"] } +opendal = { version = "0.30", features = ["layers-tracing", "layers-metrics"] } +pin-project = "1.0" tokio.workspace = true [dev-dependencies] diff --git a/src/object-store/src/cache_policy.rs b/src/object-store/src/cache_policy.rs index 1ef6004c7e..8965ebd53c 100644 --- a/src/object-store/src/cache_policy.rs +++ b/src/object-store/src/cache_policy.rs @@ -12,18 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io; use std::num::NonZeroUsize; use std::ops::DerefMut; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; use async_trait::async_trait; -use futures::AsyncRead; +use bytes::Bytes; use lru::LruCache; -use opendal::ops::*; -use opendal::raw::*; +use opendal::ops::{OpDelete, OpList, OpRead, OpScan, OpWrite}; +use opendal::raw::oio::{Read, Reader, Write}; +use opendal::raw::{Accessor, Layer, LayeredAccessor, RpDelete, RpList, RpRead, RpScan, RpWrite}; use opendal::{ErrorKind, Result}; use tokio::sync::Mutex; @@ -68,11 +66,15 @@ impl LruCacheAccessor { } } +use opendal::raw::oio::ReadExt; + #[async_trait] impl LayeredAccessor for LruCacheAccessor { type Inner = I; - type Reader = output::Reader; + type Reader = Box; type BlockingReader = I::BlockingReader; + type Writer = I::Writer; + type BlockingWriter = I::BlockingWriter; type Pager = I::Pager; type BlockingPager = I::BlockingPager; @@ -92,17 +94,18 @@ impl LayeredAccessor for LruCacheAccessor { lru_cache.get_or_insert(cache_path.clone(), || ()); Ok(to_output_reader((rp, r))) } - Err(err) if err.kind() == ErrorKind::ObjectNotFound => { - let (rp, reader) = self.inner.read(&path, args.clone()).await?; + Err(err) if err.kind() == ErrorKind::NotFound => { + let (rp, mut reader) = self.inner.read(&path, args.clone()).await?; let size = rp.clone().into_metadata().content_length(); - let _ = self - .cache - .write( - &cache_path, - OpWrite::new(size), - Box::new(ReadWrapper(reader)), - ) - .await?; + let (_, mut writer) = self.cache.write(&cache_path, OpWrite::new()).await?; + + // TODO(hl): We can use [Writer::append](https://docs.rs/opendal/0.30.4/opendal/struct.Writer.html#method.append) + // here to avoid loading whole file into memory once all our backend supports `Writer`. + let mut buf = vec![0; size as usize]; + reader.read(&mut buf).await?; + writer.write(Bytes::from(buf)).await?; + writer.close().await?; + match self.cache.read(&cache_path, OpRead::default()).await { Ok((rp, reader)) => { let r = { @@ -123,8 +126,8 @@ impl LayeredAccessor for LruCacheAccessor { } } - fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - self.inner.blocking_read(path, args) + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + self.inner.write(path, args).await } async fn delete(&self, path: &str, args: OpDelete) -> Result { @@ -158,6 +161,14 @@ impl LayeredAccessor for LruCacheAccessor { self.inner.scan(path, args).await } + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + self.inner.blocking_read(path, args) + } + + fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + self.inner.blocking_write(path, args) + } + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { self.inner.blocking_list(path, args) } @@ -167,22 +178,7 @@ impl LayeredAccessor for LruCacheAccessor { } } -/// TODO: Workaround for output::Read doesn't implement input::Read -/// -/// Should be remove after opendal fixed it. -struct ReadWrapper(R); - -impl AsyncRead for ReadWrapper { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - self.0.poll_read(cx, buf) - } -} - #[inline] -fn to_output_reader(input: (RpRead, R)) -> (RpRead, output::Reader) { +fn to_output_reader(input: (RpRead, R)) -> (RpRead, Reader) { (input.0, Box::new(input.1)) } diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index 78d475be3a..9fd9c578bb 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub use opendal::raw::oio::Pager; pub use opendal::{ - layers, services, Builder as ObjectStoreBuilder, Error, ErrorKind, Object, ObjectLister, - ObjectMetadata, ObjectMode, Operator as ObjectStore, Result, + layers, services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Metakey, + Operator as ObjectStore, Result, }; + pub mod cache_policy; pub mod test_util; pub mod util; diff --git a/src/object-store/src/test_util.rs b/src/object-store/src/test_util.rs index d1a22df206..2f38b4ccbc 100644 --- a/src/object-store/src/test_util.rs +++ b/src/object-store/src/test_util.rs @@ -29,7 +29,6 @@ impl TempFolder { } pub async fn remove_all(&mut self) -> Result<()> { - let batch = self.store.batch(); - batch.remove_all(&self.path).await + self.store.remove_all(&self.path).await } } diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index de9f25f8c8..38aa02256e 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -13,10 +13,9 @@ // limitations under the License. use futures::TryStreamExt; +use opendal::{Entry, Lister}; -use crate::{Object, ObjectLister}; - -pub async fn collect(stream: ObjectLister) -> Result, opendal::Error> { +pub async fn collect(stream: Lister) -> Result, opendal::Error> { stream.try_collect::>().await } diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 4154506610..820973996f 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -21,73 +21,65 @@ use common_test_util::temp_dir::create_temp_dir; use object_store::cache_policy::LruCacheLayer; use object_store::services::{Fs, S3}; use object_store::test_util::TempFolder; -use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore, ObjectStoreBuilder}; +use object_store::{util, ObjectStore, ObjectStoreBuilder}; use opendal::services::Oss; -use opendal::Operator; +use opendal::{EntryMode, Operator, OperatorBuilder}; async fn test_object_crud(store: &ObjectStore) -> Result<()> { // Create object handler. - let object = store.object("test_file"); - // Write data info object; - assert!(object.write("Hello, World!").await.is_ok()); + let file_name = "test_file"; + store.write(file_name, "Hello, World!").await?; // Read data from object; - let bs = object.read().await?; + let bs = store.read(file_name).await?; assert_eq!("Hello, World!", String::from_utf8(bs)?); // Read range from object; - let bs = object.range_read(1..=11).await?; + let bs = store.range_read(file_name, 1..=11).await?; assert_eq!("ello, World", String::from_utf8(bs)?); // Get object's Metadata - let meta = object.metadata().await?; - assert_eq!("test_file", object.path()); - assert_eq!(ObjectMode::FILE, meta.mode()); + let meta = store.stat(file_name).await?; + assert_eq!(EntryMode::FILE, meta.mode()); assert_eq!(13, meta.content_length()); // Delete object. - assert!(object.delete().await.is_ok()); - assert!(object.read().await.is_err()); - + store.delete(file_name).await.unwrap(); + store.read(file_name).await.unwrap_err(); Ok(()) } async fn test_object_list(store: &ObjectStore) -> Result<()> { // Create some object handlers. - let o1 = store.object("test_file1"); - let o2 = store.object("test_file2"); - let o3 = store.object("test_file3"); - // Write something - assert!(o1.write("Hello, object1!").await.is_ok()); - assert!(o2.write("Hello, object2!").await.is_ok()); - assert!(o3.write("Hello, object3!").await.is_ok()); + let p1 = "test_file1"; + let p2 = "test_file2"; + let p3 = "test_file3"; + store.write(p1, "Hello, object1!").await?; + store.write(p2, "Hello, object2!").await?; + store.write(p3, "Hello, object3!").await?; // List objects - let o: Object = store.object("/"); - let obs: ObjectLister = o.list().await?; - let objects = util::collect(obs).await?; - assert_eq!(3, objects.len()); + let lister = store.list("/").await?; + let entries = util::collect(lister).await?; + assert_eq!(3, entries.len()); - // Delete o1, o3 - assert!(o1.delete().await.is_ok()); - assert!(o3.delete().await.is_ok()); - - // List obejcts again - let objects = util::collect(o.list().await?).await?; - assert_eq!(1, objects.len()); + store.delete(p1).await?; + store.delete(p3).await?; + // List objects again // Only o2 is exists - let o2 = &objects[0].clone(); - let bs = o2.read().await?; - assert_eq!("Hello, object2!", String::from_utf8(bs)?); - // Delete o2 - assert!(o2.delete().await.is_ok()); + let entries = util::collect(store.list("/").await?).await?; + assert_eq!(1, entries.len()); + assert_eq!(p2, entries.get(0).unwrap().path()); - let objects = util::collect(o.list().await?).await?; - assert!(objects.is_empty()); + let content = store.read(p2).await?; + assert_eq!("Hello, object2!", String::from_utf8(content)?); + store.delete(p2).await?; + let entries = util::collect(store.list("/").await?).await?; + assert!(entries.is_empty()); Ok(()) } @@ -95,13 +87,12 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> { async fn test_fs_backend() -> Result<()> { let data_dir = create_temp_dir("test_fs_backend"); let tmp_dir = create_temp_dir("test_fs_backend"); - let store = ObjectStore::new( - Fs::default() - .root(&data_dir.path().to_string_lossy()) - .atomic_write_dir(&tmp_dir.path().to_string_lossy()) - .build()?, - ) - .finish(); + let mut builder = Fs::default(); + builder + .root(&data_dir.path().to_string_lossy()) + .atomic_write_dir(&tmp_dir.path().to_string_lossy()); + + let store = ObjectStore::new(builder).unwrap().finish(); test_object_crud(&store).await?; test_object_list(&store).await?; @@ -118,14 +109,14 @@ async fn test_s3_backend() -> Result<()> { let root = uuid::Uuid::new_v4().to_string(); - let accessor = S3::default() + let mut builder = S3::default(); + builder .root(&root) .access_key_id(&env::var("GT_S3_ACCESS_KEY_ID")?) .secret_access_key(&env::var("GT_S3_ACCESS_KEY")?) - .bucket(&bucket) - .build()?; + .bucket(&bucket); - let store = ObjectStore::new(accessor).finish(); + let store = ObjectStore::new(builder).unwrap().finish(); let mut guard = TempFolder::new(&store, "/"); test_object_crud(&store).await?; @@ -146,14 +137,14 @@ async fn test_oss_backend() -> Result<()> { let root = uuid::Uuid::new_v4().to_string(); - let accessor = Oss::default() + let mut builder = Oss::default(); + builder .root(&root) .access_key_id(&env::var("GT_OSS_ACCESS_KEY_ID")?) .access_key_secret(&env::var("GT_OSS_ACCESS_KEY")?) - .bucket(&bucket) - .build()?; + .bucket(&bucket); - let store = ObjectStore::new(accessor).finish(); + let store = ObjectStore::new(builder).unwrap().finish(); let mut guard = TempFolder::new(&store, "/"); test_object_crud(&store).await?; @@ -170,8 +161,7 @@ async fn assert_cache_files( file_names: &[&str], file_contents: &[&str], ) -> Result<()> { - let o = store.object("/"); - let obs = o.list().await?; + let obs = store.list("/").await?; let objects = util::collect(obs).await?; // compare the cache file with the expected cache file; ignore orders @@ -180,7 +170,7 @@ async fn assert_cache_files( assert!(position.is_some(), "file not found: {}", o.name()); let position = position.unwrap(); - let bs = o.read().await?; + let bs = store.read(o.path()).await.unwrap(); assert_eq!( file_contents[position], String::from_utf8(bs.clone())?, @@ -194,40 +184,42 @@ async fn assert_cache_files( #[tokio::test] async fn test_object_store_cache_policy() -> Result<()> { + common_telemetry::init_default_ut_logging(); // create file storage let root_dir = create_temp_dir("test_fs_backend"); - let store = ObjectStore::new( + let store = OperatorBuilder::new( Fs::default() .root(&root_dir.path().to_string_lossy()) .atomic_write_dir(&root_dir.path().to_string_lossy()) - .build()?, - ); + .build() + .unwrap(), + ) + .finish(); // create file cache layer let cache_dir = create_temp_dir("test_fs_cache"); - let cache_acc = Fs::default() + let mut builder = Fs::default(); + builder .root(&cache_dir.path().to_string_lossy()) - .atomic_write_dir(&cache_dir.path().to_string_lossy()) - .build()?; - let cache_store = ObjectStore::new(cache_acc.clone()).finish(); + .atomic_write_dir(&cache_dir.path().to_string_lossy()); + let cache_accessor = Arc::new(builder.build().unwrap()); + let cache_store = OperatorBuilder::new(cache_accessor.clone()).finish(); + // create operator for cache dir to verify cache file - let store = store - .layer(LruCacheLayer::new(Arc::new(cache_acc), 3)) - .finish(); + let store = store.layer(LruCacheLayer::new(Arc::new(cache_accessor), 3)); // create several object handler. - let o1 = store.object("test_file1"); - let o2 = store.object("test_file2"); - // write data into object; - assert!(o1.write("Hello, object1!").await.is_ok()); - assert!(o2.write("Hello, object2!").await.is_ok()); + let p1 = "test_file1"; + let p2 = "test_file2"; + store.write(p1, "Hello, object1!").await.unwrap(); + store.write(p2, "Hello, object2!").await.unwrap(); - // crate cache by read object - o1.range_read(7..).await?; - o1.read().await?; - o2.range_read(7..).await?; - o2.read().await?; + // create cache by read object + store.range_read(p1, 0..).await?; + store.read(p1).await?; + store.range_read(p2, 0..).await?; + store.read(p2).await?; assert_cache_files( &cache_store, @@ -240,7 +232,7 @@ async fn test_object_store_cache_policy() -> Result<()> { ) .await?; - assert!(o2.delete().await.is_ok()); + store.delete(p2).await.unwrap(); assert_cache_files( &cache_store, @@ -249,11 +241,11 @@ async fn test_object_store_cache_policy() -> Result<()> { ) .await?; - let o3 = store.object("test_file3"); - assert!(o3.write("Hello, object3!").await.is_ok()); + let p3 = "test_file3"; + store.write(p3, "Hello, object3!").await.unwrap(); - o3.read().await?; - o3.range_read(0..5).await?; + store.read(p3).await.unwrap(); + store.range_read(p3, 0..5).await.unwrap(); assert_cache_files( &cache_store, diff --git a/src/storage/src/compaction/writer.rs b/src/storage/src/compaction/writer.rs index 0b9cd12e00..f30e38d941 100644 --- a/src/storage/src/compaction/writer.rs +++ b/src/storage/src/compaction/writer.rs @@ -93,7 +93,7 @@ mod tests { TimestampMillisecondVector, TimestampMillisecondVectorBuilder, UInt64VectorBuilder, }; use object_store::services::Fs; - use object_store::{ObjectStore, ObjectStoreBuilder}; + use object_store::ObjectStore; use store_api::storage::{ChunkReader, OpType, SequenceNumber}; use super::*; @@ -277,8 +277,10 @@ mod tests { async fn test_sst_reader() { let dir = create_temp_dir("write_parquet"); let path = dir.path().to_str().unwrap(); - let backend = Fs::default().root(path).build().unwrap(); - let object_store = ObjectStore::new(backend).finish(); + let mut builder = Fs::default(); + builder.root(path); + + let object_store = ObjectStore::new(builder).unwrap().finish(); let seq = AtomicU64::new(0); let schema = schema_for_test(); @@ -354,8 +356,9 @@ mod tests { async fn test_sst_split() { let dir = create_temp_dir("write_parquet"); let path = dir.path().to_str().unwrap(); - let backend = Fs::default().root(path).build().unwrap(); - let object_store = ObjectStore::new(backend).finish(); + let mut builder = Fs::default(); + builder.root(path); + let object_store = ObjectStore::new(builder).unwrap().finish(); let schema = schema_for_test(); let seq = AtomicU64::new(0); diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 7e34b9f73c..3c773a93d7 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -381,7 +381,6 @@ mod tests { use datatypes::type_id::LogicalTypeId; use log_store::test_util::log_store_util; use object_store::services::Fs; - use object_store::ObjectStoreBuilder; use store_api::storage::Region; use super::*; @@ -396,8 +395,9 @@ mod tests { let dir = create_temp_dir("test_create_new_region"); let store_dir = dir.path().to_string_lossy(); - let accessor = Fs::default().root(&store_dir).build().unwrap(); - let object_store = ObjectStore::new(accessor).finish(); + let mut builder = Fs::default(); + builder.root(&store_dir); + let object_store = ObjectStore::new(builder).unwrap().finish(); let config = EngineConfig::default(); diff --git a/src/storage/src/file_purger.rs b/src/storage/src/file_purger.rs index c0a460d916..b9c97fb5d3 100644 --- a/src/storage/src/file_purger.rs +++ b/src/storage/src/file_purger.rs @@ -107,7 +107,7 @@ pub mod noop { mod tests { use common_test_util::temp_dir::create_temp_dir; use object_store::services::Fs; - use object_store::{ObjectStore, ObjectStoreBuilder}; + use object_store::ObjectStore; use store_api::storage::OpType; use super::*; @@ -168,13 +168,9 @@ mod tests { #[tokio::test] async fn test_file_purger_handler() { let dir = create_temp_dir("file-purge"); - let object_store = ObjectStore::new( - Fs::default() - .root(dir.path().to_str().unwrap()) - .build() - .unwrap(), - ) - .finish(); + let mut builder = Fs::default(); + builder.root(dir.path().to_str().unwrap()); + let object_store = ObjectStore::new(builder).unwrap().finish(); let sst_file_id = FileId::random(); @@ -198,22 +194,20 @@ mod tests { .unwrap(); notify.notified().await; - - let object = object_store.object(&format!("{}/{}", path, sst_file_id.as_parquet())); - assert!(!object.is_exist().await.unwrap()); + let exists = object_store + .is_exist(&format!("{}/{}", path, sst_file_id.as_parquet())) + .await + .unwrap(); + assert!(!exists); } #[tokio::test] async fn test_file_purge_loop() { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("file-purge"); - let object_store = ObjectStore::new( - Fs::default() - .root(dir.path().to_str().unwrap()) - .build() - .unwrap(), - ) - .finish(); + let mut builder = Fs::default(); + builder.root(dir.path().to_str().unwrap()); + let object_store = ObjectStore::new(builder).unwrap().finish(); let sst_file_id = FileId::random(); let scheduler = Arc::new(LocalScheduler::new( SchedulerConfig::default(), @@ -228,9 +222,9 @@ mod tests { drop(handle); } scheduler.stop(true).await.unwrap(); + assert!(!object_store - .object(&format!("{}/{}", path, sst_file_id.as_parquet())) - .is_exist() + .is_exist(&format!("{}/{}", path, sst_file_id.as_parquet())) .await .unwrap()); } diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index 2a0167b4d6..2a2dc39efb 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -167,7 +167,7 @@ mod tests { use common_test_util::temp_dir::create_temp_dir; use object_store::services::Fs; - use object_store::{ObjectStore, ObjectStoreBuilder}; + use object_store::ObjectStore; use store_api::manifest::action::ProtocolAction; use store_api::manifest::{Manifest, MetaActionIterator, MAX_VERSION}; @@ -180,13 +180,9 @@ mod tests { async fn test_region_manifest() { common_telemetry::init_default_ut_logging(); let tmp_dir = create_temp_dir("test_region_manifest"); - let object_store = ObjectStore::new( - Fs::default() - .root(&tmp_dir.path().to_string_lossy()) - .build() - .unwrap(), - ) - .finish(); + let mut builder = Fs::default(); + builder.root(&tmp_dir.path().to_string_lossy()); + let object_store = ObjectStore::new(builder).unwrap().finish(); let manifest = RegionManifest::with_checkpointer("/manifest/", object_store); @@ -294,13 +290,9 @@ mod tests { async fn test_region_manifest_checkpoint() { common_telemetry::init_default_ut_logging(); let tmp_dir = create_temp_dir("test_region_manifest_checkpoint"); - let object_store = ObjectStore::new( - Fs::default() - .root(&tmp_dir.path().to_string_lossy()) - .build() - .unwrap(), - ) - .finish(); + let mut builder = Fs::default(); + builder.root(&tmp_dir.path().to_string_lossy()); + let object_store = ObjectStore::new(builder).unwrap().finish(); let manifest = RegionManifest::with_checkpointer("/manifest/", object_store); diff --git a/src/storage/src/manifest/storage.rs b/src/storage/src/manifest/storage.rs index 209503bb44..ac2736c513 100644 --- a/src/storage/src/manifest/storage.rs +++ b/src/storage/src/manifest/storage.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use common_telemetry::logging; use futures::TryStreamExt; use lazy_static::lazy_static; -use object_store::{util, Object, ObjectStore}; +use object_store::{util, Entry, ErrorKind, ObjectStore}; use regex::Regex; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; @@ -63,7 +63,8 @@ pub fn is_delta_file(file_name: &str) -> bool { } pub struct ObjectStoreLogIterator { - iter: Box + Send + Sync>, + object_store: ObjectStore, + iter: Box + Send + Sync>, } #[async_trait] @@ -72,11 +73,12 @@ impl LogIterator for ObjectStoreLogIterator { async fn next_log(&mut self) -> Result)>> { match self.iter.next() { - Some((v, object)) => { - let bytes = object.read().await.context(ReadObjectSnafu { - path: object.path(), - })?; - + Some((v, entry)) => { + let bytes = self + .object_store + .read(entry.path()) + .await + .context(ReadObjectSnafu { path: entry.path() })?; Ok(Some((v, bytes))) } None => Ok(None), @@ -150,23 +152,13 @@ impl ManifestLogStorage for ManifestObjectStore { ) -> Result { ensure!(start <= end, InvalidScanIndexSnafu { start, end }); - let dir = self.object_store.object(&self.path); - let dir_exists = dir - .is_exist() - .await - .context(ReadObjectSnafu { path: &self.path })?; - if !dir_exists { - return Ok(ObjectStoreLogIterator { - iter: Box::new(Vec::default().into_iter()), - }); - } - - let streamer = dir - .list() + let streamer = self + .object_store + .list(&self.path) .await .context(ListObjectsSnafu { path: &self.path })?; - let mut entries: Vec<(ManifestVersion, Object)> = streamer + let mut entries: Vec<(ManifestVersion, Entry)> = streamer .try_filter_map(|e| async move { let file_name = e.name(); if is_delta_file(file_name) { @@ -187,40 +179,39 @@ impl ManifestLogStorage for ManifestObjectStore { entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2)); Ok(ObjectStoreLogIterator { + object_store: self.object_store.clone(), iter: Box::new(entries.into_iter()), }) } async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { - let object = self.object_store.object(&self.delta_file_path(version)); - object.write(bytes).await.context(WriteObjectSnafu { - path: object.path(), - })?; - - Ok(()) + let path = self.delta_file_path(version); + self.object_store + .write(&path, bytes.to_vec()) + .await + .context(WriteObjectSnafu { path }) } async fn delete(&self, start: ManifestVersion, end: ManifestVersion) -> Result<()> { //TODO(dennis): delete in batch or concurrently? for v in start..end { - let object = self.object_store.object(&self.delta_file_path(v)); - object.delete().await.context(DeleteObjectSnafu { - path: object.path(), - })?; + let path = self.delta_file_path(v); + self.object_store + .delete(&path) + .await + .context(DeleteObjectSnafu { path })?; } - Ok(()) } async fn save_checkpoint(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { - let object = self - .object_store - .object(&self.checkpoint_file_path(version)); - object.write(bytes).await.context(WriteObjectSnafu { - path: object.path(), - })?; + let path = self.checkpoint_file_path(version); + self.object_store + .write(&path, bytes.to_vec()) + .await + .context(WriteObjectSnafu { path })?; - let last_checkpoint = self.object_store.object(&self.last_checkpoint_path()); + let last_checkpoint_path = self.last_checkpoint_path(); let checkpoint_metadata = CheckpointMetadata { size: bytes.len(), @@ -231,16 +222,17 @@ impl ManifestLogStorage for ManifestObjectStore { logging::debug!( "Save checkpoint in path: {}, metadata: {:?}", - last_checkpoint.path(), + last_checkpoint_path, checkpoint_metadata ); let bs = checkpoint_metadata.encode()?; - last_checkpoint - .write(bs.as_ref()) + + self.object_store + .write(&last_checkpoint_path, bs.as_ref().to_vec()) .await .context(WriteObjectSnafu { - path: last_checkpoint.path(), + path: last_checkpoint_path, })?; Ok(()) @@ -250,53 +242,49 @@ impl ManifestLogStorage for ManifestObjectStore { &self, version: ManifestVersion, ) -> Result)>> { + let path = self.checkpoint_file_path(version); let checkpoint = self .object_store - .object(&self.checkpoint_file_path(version)); + .read(&path) + .await + .context(ReadObjectSnafu { path })?; - Ok(Some(( - version, - checkpoint.read().await.context(ReadObjectSnafu { - path: checkpoint.path(), - })?, - ))) + Ok(Some((version, checkpoint))) } async fn delete_checkpoint(&self, version: ManifestVersion) -> Result<()> { - let checkpoint = self - .object_store - .object(&self.checkpoint_file_path(version)); - checkpoint.delete().await.context(DeleteObjectSnafu { - path: checkpoint.path(), - })?; - + let path = self.checkpoint_file_path(version); + self.object_store + .delete(&path) + .await + .context(DeleteObjectSnafu { path })?; Ok(()) } async fn load_last_checkpoint(&self) -> Result)>> { - let last_checkpoint = self.object_store.object(&self.last_checkpoint_path()); + let last_checkpoint_path = self.last_checkpoint_path(); - let checkpoint_exists = last_checkpoint.is_exist().await.context(ReadObjectSnafu { - path: last_checkpoint.path(), - })?; + let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await { + Ok(last_checkpoint_data) => last_checkpoint_data, + Err(e) if e.kind() == ErrorKind::NotFound => { + return Ok(None); + } + Err(e) => { + return Err(e).context(ReadObjectSnafu { + path: last_checkpoint_path, + }); + } + }; - if checkpoint_exists { - let bytes = last_checkpoint.read().await.context(ReadObjectSnafu { - path: last_checkpoint.path(), - })?; + let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?; - let checkpoint_metadata = CheckpointMetadata::decode(&bytes)?; + logging::debug!( + "Load checkpoint in path: {}, metadata: {:?}", + last_checkpoint_path, + checkpoint_metadata + ); - logging::debug!( - "Load checkpoint in path: {}, metadata: {:?}", - last_checkpoint.path(), - checkpoint_metadata - ); - - self.load_checkpoint(checkpoint_metadata.version).await - } else { - Ok(None) - } + self.load_checkpoint(checkpoint_metadata.version).await } } @@ -304,7 +292,7 @@ impl ManifestLogStorage for ManifestObjectStore { mod tests { use common_test_util::temp_dir::create_temp_dir; use object_store::services::Fs; - use object_store::{ObjectStore, ObjectStoreBuilder}; + use object_store::ObjectStore; use super::*; @@ -312,13 +300,9 @@ mod tests { async fn test_manifest_log_store() { common_telemetry::init_default_ut_logging(); let tmp_dir = create_temp_dir("test_manifest_log_store"); - let object_store = ObjectStore::new( - Fs::default() - .root(&tmp_dir.path().to_string_lossy()) - .build() - .unwrap(), - ) - .finish(); + let mut builder = Fs::default(); + builder.root(&tmp_dir.path().to_string_lossy()); + let object_store = ObjectStore::new(builder).unwrap().finish(); let log_store = ManifestObjectStore::new("/", object_store); diff --git a/src/storage/src/region/tests.rs b/src/storage/src/region/tests.rs index 7625d13881..6503165db8 100644 --- a/src/storage/src/region/tests.rs +++ b/src/storage/src/region/tests.rs @@ -31,7 +31,7 @@ use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef}; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::NoopLogStore; use object_store::services::Fs; -use object_store::{ObjectStore, ObjectStoreBuilder}; +use object_store::ObjectStore; use store_api::manifest::MAX_VERSION; use store_api::storage::{ consts, Chunk, ChunkReader, RegionMeta, ScanRequest, SequenceNumber, Snapshot, WriteRequest, @@ -286,13 +286,9 @@ async fn test_recover_region_manifets() { let tmp_dir = create_temp_dir("test_recover_region_manifets"); let memtable_builder = Arc::new(DefaultMemtableBuilder::default()) as _; - let object_store = ObjectStore::new( - Fs::default() - .root(&tmp_dir.path().to_string_lossy()) - .build() - .unwrap(), - ) - .finish(); + let mut builder = Fs::default(); + builder.root(&tmp_dir.path().to_string_lossy()); + let object_store = ObjectStore::new(builder).unwrap().finish(); let manifest = RegionManifest::with_checkpointer("/manifest/", object_store.clone()); let region_meta = Arc::new(build_region_meta()); diff --git a/src/storage/src/sst.rs b/src/storage/src/sst.rs index f74c020f50..aaff02b564 100644 --- a/src/storage/src/sst.rs +++ b/src/storage/src/sst.rs @@ -508,8 +508,10 @@ impl AccessLayer for FsAccessLayer { /// Deletes a SST file with given file id. async fn delete_sst(&self, file_id: FileId) -> Result<()> { let path = self.sst_file_path(&file_id.as_parquet()); - let object = self.object_store.object(&path); - object.delete().await.context(DeleteSstSnafu) + self.object_store + .delete(&path) + .await + .context(DeleteSstSnafu) } } diff --git a/src/storage/src/sst/parquet.rs b/src/storage/src/sst/parquet.rs index 76701f85d0..c8bd6baa72 100644 --- a/src/storage/src/sst/parquet.rs +++ b/src/storage/src/sst/parquet.rs @@ -89,7 +89,6 @@ impl<'a> ParquetWriter<'a> { let projected_schema = self.source.projected_schema(); let store_schema = projected_schema.schema_to_read(); let schema = store_schema.arrow_schema().clone(); - let object = self.object_store.object(self.file_path); let writer_props = WriterProperties::builder() .set_compression(Compression::ZSTD) @@ -136,16 +135,16 @@ impl<'a> ParquetWriter<'a> { .ok() .flatten(); - object.write(buf).await.context(WriteObjectSnafu { - path: object.path(), - })?; - let file_size = object - .metadata() + // object_store.write will make sure all bytes are written or an error is raised. + let buf_len = buf.len() as u64; + self.object_store + .write(self.file_path, buf) .await .context(WriteObjectSnafu { - path: object.path(), - })? - .content_length(); + path: self.file_path, + })?; + + let file_size = buf_len; Ok(Some(SstInfo { time_range, file_size, @@ -261,9 +260,9 @@ impl ParquetReader { pub async fn chunk_stream(&self) -> Result { let file_path = self.file_handle.file_path(); let operator = self.object_store.clone(); + let reader = operator - .object(&file_path) - .reader() + .reader(&file_path) .await .context(ReadObjectSnafu { path: &file_path })? .compat(); @@ -547,7 +546,6 @@ mod tests { use datatypes::types::{TimestampMillisecondType, TimestampType}; use datatypes::vectors::TimestampMillisecondVector; use object_store::services::Fs; - use object_store::ObjectStoreBuilder; use store_api::storage::OpType; use super::*; @@ -558,6 +556,12 @@ mod tests { use crate::schema::ProjectedSchema; use crate::sst::{FileId, FileMeta}; + fn create_object_store(root: &str) -> ObjectStore { + let mut builder = Fs::default(); + builder.root(root); + ObjectStore::new(builder).unwrap().finish() + } + #[tokio::test] async fn test_parquet_writer() { let schema = memtable_tests::schema_for_test(); @@ -587,8 +591,8 @@ mod tests { let dir = create_temp_dir("write_parquet"); let path = dir.path().to_str().unwrap(); - let backend = Fs::default().root(path).build().unwrap(); - let object_store = ObjectStore::new(backend).finish(); + + let object_store = create_object_store(path); let sst_file_name = "test-flush.parquet"; let iter = memtable.iter(&IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); @@ -599,14 +603,7 @@ mod tests { .unwrap(); // verify parquet file - let reader = BufReader::new( - object_store - .object(sst_file_name) - .reader() - .await - .unwrap() - .compat(), - ); + let reader = BufReader::new(object_store.reader(sst_file_name).await.unwrap().compat()); let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); @@ -685,8 +682,7 @@ mod tests { let dir = create_temp_dir("write_parquet"); let path = dir.path().to_str().unwrap(); - let backend = Fs::default().root(path).build().unwrap(); - let object_store = ObjectStore::new(backend).finish(); + let object_store = create_object_store(path); let sst_file_handle = new_file_handle(FileId::random()); let sst_file_name = sst_file_handle.file_name(); let iter = memtable.iter(&IterContext::default()).unwrap(); @@ -709,13 +705,7 @@ mod tests { time_range ); assert_ne!(file_size, 0); - let operator = ObjectStore::new( - Fs::default() - .root(dir.path().to_str().unwrap()) - .build() - .unwrap(), - ) - .finish(); + let operator = create_object_store(dir.path().to_str().unwrap()); let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap()); let reader = ParquetReader::new( @@ -783,8 +773,8 @@ mod tests { let dir = create_temp_dir("write_parquet"); let path = dir.path().to_str().unwrap(); - let backend = Fs::default().root(path).build().unwrap(); - let object_store = ObjectStore::new(backend).finish(); + + let object_store = create_object_store(path); let file_handle = new_file_handle(FileId::random()); let sst_file_name = file_handle.file_name(); let iter = memtable.iter(&IterContext::default()).unwrap(); @@ -807,13 +797,7 @@ mod tests { time_range ); assert_ne!(file_size, 0); - let operator = ObjectStore::new( - Fs::default() - .root(dir.path().to_str().unwrap()) - .build() - .unwrap(), - ) - .finish(); + let operator = create_object_store(dir.path().to_str().unwrap()); let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap()); let reader = ParquetReader::new( @@ -903,8 +887,7 @@ mod tests { let dir = create_temp_dir("read-parquet-by-range"); let path = dir.path().to_str().unwrap(); - let backend = Fs::default().root(path).build().unwrap(); - let object_store = ObjectStore::new(backend).finish(); + let object_store = create_object_store(path); let sst_file_handle = new_file_handle(FileId::random()); let sst_file_name = sst_file_handle.file_name(); let iter = memtable.iter(&IterContext::default()).unwrap(); @@ -998,8 +981,9 @@ mod tests { let dir = create_temp_dir("read-parquet-by-range"); let path = dir.path().to_str().unwrap(); - let backend = Fs::default().root(path).build().unwrap(); - let object_store = ObjectStore::new(backend).finish(); + let mut builder = Fs::default(); + builder.root(path); + let object_store = ObjectStore::new(builder).unwrap().finish(); let sst_file_name = "test-read.parquet"; let iter = memtable.iter(&IterContext::default()).unwrap(); let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone()); diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index 8980aeb69f..c7038629e1 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::LogConfig; use object_store::services::Fs as Builder; -use object_store::{ObjectStore, ObjectStoreBuilder}; +use object_store::ObjectStore; use crate::background::JobPoolImpl; use crate::compaction::noop::NoopCompactionScheduler; @@ -43,8 +43,10 @@ pub async fn new_store_config( let sst_dir = engine::region_sst_dir(parent_dir, region_name); let manifest_dir = engine::region_manifest_dir(parent_dir, region_name); - let accessor = Builder::default().root(store_dir).build().unwrap(); - let object_store = ObjectStore::new(accessor).finish(); + let mut builder = Builder::default(); + builder.root(store_dir); + + let object_store = ObjectStore::new(builder).unwrap().finish(); let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone())); let manifest = RegionManifest::with_checkpointer(&manifest_dir, object_store); let job_pool = Arc::new(JobPoolImpl {}); diff --git a/src/table-procedure/src/test_util.rs b/src/table-procedure/src/test_util.rs index 561e186cd9..2789c4c45e 100644 --- a/src/table-procedure/src/test_util.rs +++ b/src/table-procedure/src/test_util.rs @@ -24,7 +24,7 @@ use log_store::NoopLogStore; use mito::config::EngineConfig; use mito::engine::MitoEngine; use object_store::services::Fs; -use object_store::{ObjectStore, ObjectStoreBuilder}; +use object_store::ObjectStore; use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; @@ -40,8 +40,9 @@ impl TestEnv { pub fn new(prefix: &str) -> TestEnv { let dir = create_temp_dir(prefix); let store_dir = format!("{}/db", dir.path().to_string_lossy()); - let accessor = Fs::default().root(&store_dir).build().unwrap(); - let object_store = ObjectStore::new(accessor).finish(); + let mut builder = Fs::default(); + builder.root(&store_dir); + let object_store = ObjectStore::new(builder).unwrap().finish(); let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let storage_engine = EngineImpl::new( @@ -57,8 +58,9 @@ impl TestEnv { )); let procedure_dir = format!("{}/procedure", dir.path().to_string_lossy()); - let accessor = Fs::default().root(&procedure_dir).build().unwrap(); - let object_store = ObjectStore::new(accessor).finish(); + let mut builder = Fs::default(); + builder.root(&procedure_dir); + let object_store = ObjectStore::new(builder).unwrap().finish(); let procedure_manager = Arc::new(LocalManager::new(ManagerConfig { object_store, diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 4b33b02b6e..bbb5b8b25a 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -34,7 +34,7 @@ use datatypes::schema::{ColumnSchema, RawSchema}; use frontend::instance::Instance as FeInstance; use object_store::services::{Oss, S3}; use object_store::test_util::TempFolder; -use object_store::{ObjectStore, ObjectStoreBuilder}; +use object_store::ObjectStore; use once_cell::sync::OnceCell; use rand::Rng; use servers::grpc::GrpcServer; @@ -104,18 +104,17 @@ fn get_test_store_config( cache_capacity: None, }; - let accessor = Oss::default() + let mut builder = Oss::default(); + builder .root(&oss_config.root) .endpoint(&oss_config.endpoint) .access_key_id(&oss_config.access_key_id) .access_key_secret(&oss_config.access_key_secret) - .bucket(&oss_config.bucket) - .build() - .unwrap(); + .bucket(&oss_config.bucket); let config = ObjectStoreConfig::Oss(oss_config); - let store = ObjectStore::new(accessor).finish(); + let store = ObjectStore::new(builder).unwrap().finish(); ( config, @@ -134,17 +133,16 @@ fn get_test_store_config( cache_capacity: None, }; - let accessor = S3::default() + let mut builder = S3::default(); + builder .root(&s3_config.root) .access_key_id(&s3_config.access_key_id) .secret_access_key(&s3_config.secret_access_key) - .bucket(&s3_config.bucket) - .build() - .unwrap(); + .bucket(&s3_config.bucket); let config = ObjectStoreConfig::S3(s3_config); - let store = ObjectStore::new(accessor).finish(); + let store = ObjectStore::new(builder).unwrap().finish(); (config, Some(TempDirGuard::S3(TempFolder::new(&store, "/")))) }