feat: upgrade opendal (#1245)

* chore: upgrade opendal

* chore: finish upgrading opendal

* fix: clippy complaints

* fix some tests

* fix: all unit tests

* chore: rebase develop

* fix: sqlness tests

* optimize imports

* chore: rebase develop

* doc: add todo
This commit is contained in:
Lei, HUANG
2023-03-28 09:47:33 +08:00
committed by GitHub
parent e63b28bff1
commit 5edd2a3dbe
31 changed files with 390 additions and 431 deletions

7
Cargo.lock generated
View File

@@ -1608,6 +1608,7 @@ dependencies = [
name = "common-procedure"
version = "0.1.1"
dependencies = [
"async-stream",
"async-trait",
"backon 0.4.0",
"common-error",
@@ -4581,11 +4582,13 @@ version = "0.1.1"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"common-telemetry",
"common-test-util",
"futures",
"lru 0.9.0",
"opendal",
"pin-project",
"tokio",
"uuid",
]
@@ -4624,9 +4627,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "opendal"
version = "0.27.2"
version = "0.30.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef6f7b936f2f8483e19643357cb50d9ec9a49c506971ef69ca676913cf5afd91"
checksum = "bc08c1c75e26f33f13f22c46b0c222132df5e12519fd67046ecf061b58e8c26f"
dependencies = [
"anyhow",
"async-compat",

View File

@@ -400,7 +400,7 @@ mod tests {
use log_store::NoopLogStore;
use mito::config::EngineConfig;
use mito::engine::MitoEngine;
use object_store::{ObjectStore, ObjectStoreBuilder};
use object_store::ObjectStore;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
@@ -482,11 +482,9 @@ mod tests {
pub async fn prepare_table_engine() -> (TempDir, TableEngineRef) {
let dir = create_temp_dir("system-table-test");
let store_dir = dir.path().to_string_lossy();
let accessor = object_store::services::Fs::default()
.root(&store_dir)
.build()
.unwrap();
let object_store = ObjectStore::new(accessor).finish();
let mut builder = object_store::services::Fs::default();
builder.root(&store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let noop_compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let table_engine = Arc::new(MitoEngine::new(
EngineConfig::default(),

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use futures::{future, TryStreamExt};
use object_store::{Object, ObjectStore};
use object_store::{Entry, ObjectStore};
use regex::Regex;
use snafu::ResultExt;
@@ -46,13 +46,12 @@ impl Lister {
}
}
pub async fn list(&self) -> Result<Vec<Object>> {
pub async fn list(&self) -> Result<Vec<Entry>> {
match &self.source {
Source::Dir => {
let streamer = self
.object_store
.object(&self.path)
.list()
.list(&self.path)
.await
.context(error::ListObjectsSnafu { path: &self.path })?;
@@ -70,11 +69,14 @@ impl Lister {
.context(error::ListObjectsSnafu { path: &self.path })
}
Source::Filename(filename) => {
let obj = self
.object_store
.object(&format!("{}{}", self.path, filename));
Ok(vec![obj])
// make sure this file exists
let file_full_path = format!("{}{}", self.path, filename);
let _ = self.object_store.stat(&file_full_path).await.context(
error::ListObjectsSnafu {
path: &file_full_path,
},
)?;
Ok(vec![Entry::new(&file_full_path)])
}
}
}

View File

@@ -13,16 +13,16 @@
// limitations under the License.
use object_store::services::Fs;
use object_store::{ObjectStore, ObjectStoreBuilder};
use object_store::ObjectStore;
use snafu::ResultExt;
use crate::error::{self, Result};
use crate::error::{BuildBackendSnafu, Result};
pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
let accessor = Fs::default()
.root(root)
.build()
.context(error::BuildBackendSnafu)?;
Ok(ObjectStore::new(accessor).finish())
let mut builder = Fs::default();
builder.root(root);
let object_store = ObjectStore::new(builder)
.context(BuildBackendSnafu)?
.finish();
Ok(object_store)
}

View File

@@ -15,7 +15,7 @@
use std::collections::HashMap;
use object_store::services::S3;
use object_store::{ObjectStore, ObjectStoreBuilder};
use object_store::ObjectStore;
use snafu::ResultExt;
use crate::error::{self, Result};
@@ -73,7 +73,7 @@ pub fn build_s3_backend(
}
}
let accessor = builder.build().context(error::BuildBackendSnafu)?;
Ok(ObjectStore::new(accessor).finish())
Ok(ObjectStore::new(builder)
.context(error::BuildBackendSnafu)?
.finish())
}

View File

@@ -6,6 +6,7 @@ license.workspace = true
[dependencies]
async-trait.workspace = true
async-stream.workspace = true
common-error = { path = "../error" }
common-runtime = { path = "../runtime" }
common-telemetry = { path = "../telemetry" }

View File

@@ -423,7 +423,6 @@ impl ProcedureManager for LocalManager {
mod test_util {
use common_test_util::temp_dir::TempDir;
use object_store::services::Fs as Builder;
use object_store::ObjectStoreBuilder;
use super::*;
@@ -433,8 +432,9 @@ mod test_util {
pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
ObjectStore::new(accessor).finish()
let mut builder = Builder::default();
builder.root(store_dir);
ObjectStore::new(builder).unwrap().finish()
}
}

View File

@@ -473,8 +473,7 @@ mod tests {
async fn check_files(object_store: &ObjectStore, procedure_id: ProcedureId, files: &[&str]) {
let dir = format!("{procedure_id}/");
let object = object_store.object(&dir);
let lister = object.list().await.unwrap();
let lister = object_store.list(&dir).await.unwrap();
let mut files_in_dir: Vec<_> = lister
.map_ok(|de| de.name().to_string())
.try_collect()

View File

@@ -248,15 +248,15 @@ mod tests {
use async_trait::async_trait;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use object_store::services::Fs as Builder;
use object_store::ObjectStoreBuilder;
use super::*;
use crate::{Context, LockKey, Procedure, Status};
fn procedure_store_for_test(dir: &TempDir) -> ProcedureStore {
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor).finish();
let mut builder = Builder::default();
builder.root(store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
ProcedureStore::from(object_store)
}

View File

@@ -15,12 +15,13 @@
use std::pin::Pin;
use std::sync::Arc;
use async_stream::try_stream;
use async_trait::async_trait;
use futures::{Stream, TryStreamExt};
use object_store::{ObjectMode, ObjectStore};
use futures::{Stream, StreamExt};
use object_store::{EntryMode, Metakey, ObjectStore};
use snafu::ResultExt;
use crate::error::{DeleteStateSnafu, Error, PutStateSnafu, Result};
use crate::error::{DeleteStateSnafu, Error, ListStateSnafu, PutStateSnafu, Result};
/// Key value from state store.
type KeyValue = (String, Vec<u8>);
@@ -64,49 +65,49 @@ impl ObjectStateStore {
#[async_trait]
impl StateStore for ObjectStateStore {
async fn put(&self, key: &str, value: Vec<u8>) -> Result<()> {
let object = self.store.object(key);
object.write(value).await.context(PutStateSnafu { key })
self.store
.write(key, value)
.await
.context(PutStateSnafu { key })
}
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream> {
let path_string = path.to_string();
let lister = self
.store
.object(path)
.scan()
.await
.map_err(|e| Error::ListState {
path: path_string.clone(),
source: e,
})?;
let mut lister = self.store.scan(path).await.map_err(|e| Error::ListState {
path: path_string.clone(),
source: e,
})?;
let stream = lister
.try_filter_map(|entry| async move {
let store = self.store.clone();
let stream = try_stream!({
while let Some(res) = lister.next().await {
let entry = res.context(ListStateSnafu { path: &path_string })?;
let key = entry.path();
let key_value = match entry.mode().await? {
ObjectMode::FILE => {
let value = entry.read().await?;
Some((key.to_string(), value))
}
ObjectMode::DIR | ObjectMode::Unknown => None,
};
Ok(key_value)
})
.map_err(move |e| Error::ListState {
path: path_string.clone(),
source: e,
});
let metadata = store
.metadata(&entry, Metakey::Mode)
.await
.context(ListStateSnafu { path: key })?;
if let EntryMode::FILE = metadata.mode() {
let value = store
.read(key)
.await
.context(ListStateSnafu { path: key })?;
yield (key.to_string(), value);
}
}
});
Ok(Box::pin(stream))
}
async fn delete(&self, keys: &[String]) -> Result<()> {
for key in keys {
let object = self.store.object(key);
object.delete().await.context(DeleteStateSnafu { key })?;
self.store
.delete(key)
.await
.context(DeleteStateSnafu { key })?;
}
Ok(())
@@ -116,8 +117,8 @@ impl StateStore for ObjectStateStore {
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use futures_util::TryStreamExt;
use object_store::services::Fs as Builder;
use object_store::ObjectStoreBuilder;
use super::*;
@@ -125,8 +126,10 @@ mod tests {
async fn test_object_state_store() {
let dir = create_temp_dir("state_store");
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor).finish();
let mut builder = Builder::default();
builder.root(store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let state_store = ObjectStateStore::new(object_store);
let data: Vec<_> = state_store

View File

@@ -330,18 +330,20 @@ pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Re
);
let mut builder = OSSBuilder::default();
let builder = builder
builder
.root(&root)
.bucket(&oss_config.bucket)
.endpoint(&oss_config.endpoint)
.access_key_id(&oss_config.access_key_id)
.access_key_secret(&oss_config.access_key_secret);
let accessor = builder.build().with_context(|_| error::InitBackendSnafu {
config: store_config.clone(),
})?;
let object_store = ObjectStore::new(builder)
.with_context(|_| error::InitBackendSnafu {
config: store_config.clone(),
})?
.finish();
create_object_store_with_cache(ObjectStore::new(accessor).finish(), store_config)
create_object_store_with_cache(object_store, store_config)
}
fn create_object_store_with_cache(
@@ -394,24 +396,27 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res
);
let mut builder = S3Builder::default();
let mut builder = builder
builder
.root(&root)
.bucket(&s3_config.bucket)
.access_key_id(&s3_config.access_key_id)
.secret_access_key(&s3_config.secret_access_key);
if s3_config.endpoint.is_some() {
builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
builder.endpoint(s3_config.endpoint.as_ref().unwrap());
}
if s3_config.region.is_some() {
builder = builder.region(s3_config.region.as_ref().unwrap());
builder.region(s3_config.region.as_ref().unwrap());
}
let accessor = builder.build().with_context(|_| error::InitBackendSnafu {
config: store_config.clone(),
})?;
create_object_store_with_cache(ObjectStore::new(accessor).finish(), store_config)
create_object_store_with_cache(
ObjectStore::new(builder)
.with_context(|_| error::InitBackendSnafu {
config: store_config.clone(),
})?
.finish(),
store_config,
)
}
pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
@@ -426,15 +431,14 @@ pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Res
let atomic_write_dir = format!("{data_dir}/.tmp/");
let accessor = FsBuilder::default()
.root(&data_dir)
.atomic_write_dir(&atomic_write_dir)
.build()
let mut builder = FsBuilder::default();
builder.root(&data_dir).atomic_write_dir(&atomic_write_dir);
Ok(ObjectStore::new(builder)
.context(error::InitBackendSnafu {
config: store_config.clone(),
})?;
Ok(ObjectStore::new(accessor).finish())
})?
.finish())
}
/// Create metasrv client instance and spawn heartbeat loop.

View File

@@ -48,7 +48,6 @@ impl SqlHandler {
build_backend(&req.location, req.connection).context(error::BuildBackendSnafu)?;
let (dir, filename) = find_dir_and_filename(&path);
let regex = req
.pattern
.as_ref()
@@ -62,16 +61,18 @@ impl SqlHandler {
Source::Dir
};
let lister = Lister::new(object_store, source, dir, regex);
let lister = Lister::new(object_store.clone(), source, dir, regex);
let objects = lister.list().await.context(error::ListObjectsSnafu)?;
let entries = lister.list().await.context(error::ListObjectsSnafu)?;
let mut buf: Vec<RecordBatch> = Vec::new();
for obj in objects.iter() {
let reader = obj.reader().await.context(error::ReadObjectSnafu {
path: &obj.path().to_string(),
})?;
for entry in entries.iter() {
let path = entry.path();
let reader = object_store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
let buf_reader = BufReader::new(reader.compat());

View File

@@ -136,10 +136,10 @@ impl ParquetWriter {
// "file_name_1_1000000" (row num: 1 ~ 1000000),
// "file_name_1000001_xxx" (row num: 1000001 ~ xxx)
let file_name = format!("{}_{}_{}", self.file_name, start_row_num, total_rows);
let object = self.object_store.object(&file_name);
object.write(buf).await.context(error::WriteObjectSnafu {
path: object.path(),
})?;
self.object_store
.write(&file_name, buf)
.await
.context(error::WriteObjectSnafu { path: file_name })?;
if end_loop {
return Ok(total_rows);

View File

@@ -23,7 +23,7 @@ use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRe
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector, VectorRef};
use log_store::NoopLogStore;
use object_store::services::Fs as Builder;
use object_store::{ObjectStore, ObjectStoreBuilder};
use object_store::ObjectStore;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
@@ -98,8 +98,9 @@ pub fn build_test_table_info() -> TableInfo {
pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) {
let dir = create_temp_dir(prefix);
let store_dir = dir.path().to_string_lossy();
let accessor = Builder::default().root(&store_dir).build().unwrap();
(dir, ObjectStore::new(accessor).finish())
let mut builder = Builder::default();
builder.root(&store_dir);
(dir, ObjectStore::new(builder).unwrap().finish())
}
pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest {

View File

@@ -7,8 +7,10 @@ license.workspace = true
[dependencies]
lru = "0.9"
async-trait = "0.1"
bytes = "1.4"
futures = { version = "0.3" }
opendal = { version = "0.27", features = ["layers-tracing", "layers-metrics"] }
opendal = { version = "0.30", features = ["layers-tracing", "layers-metrics"] }
pin-project = "1.0"
tokio.workspace = true
[dev-dependencies]

View File

@@ -12,18 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io;
use std::num::NonZeroUsize;
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use async_trait::async_trait;
use futures::AsyncRead;
use bytes::Bytes;
use lru::LruCache;
use opendal::ops::*;
use opendal::raw::*;
use opendal::ops::{OpDelete, OpList, OpRead, OpScan, OpWrite};
use opendal::raw::oio::{Read, Reader, Write};
use opendal::raw::{Accessor, Layer, LayeredAccessor, RpDelete, RpList, RpRead, RpScan, RpWrite};
use opendal::{ErrorKind, Result};
use tokio::sync::Mutex;
@@ -68,11 +66,15 @@ impl<I, C> LruCacheAccessor<I, C> {
}
}
use opendal::raw::oio::ReadExt;
#[async_trait]
impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
type Inner = I;
type Reader = output::Reader;
type Reader = Box<dyn Read>;
type BlockingReader = I::BlockingReader;
type Writer = I::Writer;
type BlockingWriter = I::BlockingWriter;
type Pager = I::Pager;
type BlockingPager = I::BlockingPager;
@@ -92,17 +94,18 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
lru_cache.get_or_insert(cache_path.clone(), || ());
Ok(to_output_reader((rp, r)))
}
Err(err) if err.kind() == ErrorKind::ObjectNotFound => {
let (rp, reader) = self.inner.read(&path, args.clone()).await?;
Err(err) if err.kind() == ErrorKind::NotFound => {
let (rp, mut reader) = self.inner.read(&path, args.clone()).await?;
let size = rp.clone().into_metadata().content_length();
let _ = self
.cache
.write(
&cache_path,
OpWrite::new(size),
Box::new(ReadWrapper(reader)),
)
.await?;
let (_, mut writer) = self.cache.write(&cache_path, OpWrite::new()).await?;
// TODO(hl): We can use [Writer::append](https://docs.rs/opendal/0.30.4/opendal/struct.Writer.html#method.append)
// here to avoid loading whole file into memory once all our backend supports `Writer`.
let mut buf = vec![0; size as usize];
reader.read(&mut buf).await?;
writer.write(Bytes::from(buf)).await?;
writer.close().await?;
match self.cache.read(&cache_path, OpRead::default()).await {
Ok((rp, reader)) => {
let r = {
@@ -123,8 +126,8 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
}
}
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
self.inner.blocking_read(path, args)
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
self.inner.write(path, args).await
}
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
@@ -158,6 +161,14 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
self.inner.scan(path, args).await
}
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
self.inner.blocking_read(path, args)
}
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
self.inner.blocking_write(path, args)
}
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
self.inner.blocking_list(path, args)
}
@@ -167,22 +178,7 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
}
}
/// TODO: Workaround for output::Read doesn't implement input::Read
///
/// Should be remove after opendal fixed it.
struct ReadWrapper<R>(R);
impl<R: output::Read> AsyncRead for ReadWrapper<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.0.poll_read(cx, buf)
}
}
#[inline]
fn to_output_reader<R: output::Read + 'static>(input: (RpRead, R)) -> (RpRead, output::Reader) {
fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
(input.0, Box::new(input.1))
}

View File

@@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use opendal::raw::oio::Pager;
pub use opendal::{
layers, services, Builder as ObjectStoreBuilder, Error, ErrorKind, Object, ObjectLister,
ObjectMetadata, ObjectMode, Operator as ObjectStore, Result,
layers, services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Metakey,
Operator as ObjectStore, Result,
};
pub mod cache_policy;
pub mod test_util;
pub mod util;

View File

@@ -29,7 +29,6 @@ impl TempFolder {
}
pub async fn remove_all(&mut self) -> Result<()> {
let batch = self.store.batch();
batch.remove_all(&self.path).await
self.store.remove_all(&self.path).await
}
}

View File

@@ -13,10 +13,9 @@
// limitations under the License.
use futures::TryStreamExt;
use opendal::{Entry, Lister};
use crate::{Object, ObjectLister};
pub async fn collect(stream: ObjectLister) -> Result<Vec<Object>, opendal::Error> {
pub async fn collect(stream: Lister) -> Result<Vec<Entry>, opendal::Error> {
stream.try_collect::<Vec<_>>().await
}

View File

@@ -21,73 +21,65 @@ use common_test_util::temp_dir::create_temp_dir;
use object_store::cache_policy::LruCacheLayer;
use object_store::services::{Fs, S3};
use object_store::test_util::TempFolder;
use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore, ObjectStoreBuilder};
use object_store::{util, ObjectStore, ObjectStoreBuilder};
use opendal::services::Oss;
use opendal::Operator;
use opendal::{EntryMode, Operator, OperatorBuilder};
async fn test_object_crud(store: &ObjectStore) -> Result<()> {
// Create object handler.
let object = store.object("test_file");
// Write data info object;
assert!(object.write("Hello, World!").await.is_ok());
let file_name = "test_file";
store.write(file_name, "Hello, World!").await?;
// Read data from object;
let bs = object.read().await?;
let bs = store.read(file_name).await?;
assert_eq!("Hello, World!", String::from_utf8(bs)?);
// Read range from object;
let bs = object.range_read(1..=11).await?;
let bs = store.range_read(file_name, 1..=11).await?;
assert_eq!("ello, World", String::from_utf8(bs)?);
// Get object's Metadata
let meta = object.metadata().await?;
assert_eq!("test_file", object.path());
assert_eq!(ObjectMode::FILE, meta.mode());
let meta = store.stat(file_name).await?;
assert_eq!(EntryMode::FILE, meta.mode());
assert_eq!(13, meta.content_length());
// Delete object.
assert!(object.delete().await.is_ok());
assert!(object.read().await.is_err());
store.delete(file_name).await.unwrap();
store.read(file_name).await.unwrap_err();
Ok(())
}
async fn test_object_list(store: &ObjectStore) -> Result<()> {
// Create some object handlers.
let o1 = store.object("test_file1");
let o2 = store.object("test_file2");
let o3 = store.object("test_file3");
// Write something
assert!(o1.write("Hello, object1!").await.is_ok());
assert!(o2.write("Hello, object2!").await.is_ok());
assert!(o3.write("Hello, object3!").await.is_ok());
let p1 = "test_file1";
let p2 = "test_file2";
let p3 = "test_file3";
store.write(p1, "Hello, object1!").await?;
store.write(p2, "Hello, object2!").await?;
store.write(p3, "Hello, object3!").await?;
// List objects
let o: Object = store.object("/");
let obs: ObjectLister = o.list().await?;
let objects = util::collect(obs).await?;
assert_eq!(3, objects.len());
let lister = store.list("/").await?;
let entries = util::collect(lister).await?;
assert_eq!(3, entries.len());
// Delete o1, o3
assert!(o1.delete().await.is_ok());
assert!(o3.delete().await.is_ok());
// List obejcts again
let objects = util::collect(o.list().await?).await?;
assert_eq!(1, objects.len());
store.delete(p1).await?;
store.delete(p3).await?;
// List objects again
// Only o2 is exists
let o2 = &objects[0].clone();
let bs = o2.read().await?;
assert_eq!("Hello, object2!", String::from_utf8(bs)?);
// Delete o2
assert!(o2.delete().await.is_ok());
let entries = util::collect(store.list("/").await?).await?;
assert_eq!(1, entries.len());
assert_eq!(p2, entries.get(0).unwrap().path());
let objects = util::collect(o.list().await?).await?;
assert!(objects.is_empty());
let content = store.read(p2).await?;
assert_eq!("Hello, object2!", String::from_utf8(content)?);
store.delete(p2).await?;
let entries = util::collect(store.list("/").await?).await?;
assert!(entries.is_empty());
Ok(())
}
@@ -95,13 +87,12 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> {
async fn test_fs_backend() -> Result<()> {
let data_dir = create_temp_dir("test_fs_backend");
let tmp_dir = create_temp_dir("test_fs_backend");
let store = ObjectStore::new(
Fs::default()
.root(&data_dir.path().to_string_lossy())
.atomic_write_dir(&tmp_dir.path().to_string_lossy())
.build()?,
)
.finish();
let mut builder = Fs::default();
builder
.root(&data_dir.path().to_string_lossy())
.atomic_write_dir(&tmp_dir.path().to_string_lossy());
let store = ObjectStore::new(builder).unwrap().finish();
test_object_crud(&store).await?;
test_object_list(&store).await?;
@@ -118,14 +109,14 @@ async fn test_s3_backend() -> Result<()> {
let root = uuid::Uuid::new_v4().to_string();
let accessor = S3::default()
let mut builder = S3::default();
builder
.root(&root)
.access_key_id(&env::var("GT_S3_ACCESS_KEY_ID")?)
.secret_access_key(&env::var("GT_S3_ACCESS_KEY")?)
.bucket(&bucket)
.build()?;
.bucket(&bucket);
let store = ObjectStore::new(accessor).finish();
let store = ObjectStore::new(builder).unwrap().finish();
let mut guard = TempFolder::new(&store, "/");
test_object_crud(&store).await?;
@@ -146,14 +137,14 @@ async fn test_oss_backend() -> Result<()> {
let root = uuid::Uuid::new_v4().to_string();
let accessor = Oss::default()
let mut builder = Oss::default();
builder
.root(&root)
.access_key_id(&env::var("GT_OSS_ACCESS_KEY_ID")?)
.access_key_secret(&env::var("GT_OSS_ACCESS_KEY")?)
.bucket(&bucket)
.build()?;
.bucket(&bucket);
let store = ObjectStore::new(accessor).finish();
let store = ObjectStore::new(builder).unwrap().finish();
let mut guard = TempFolder::new(&store, "/");
test_object_crud(&store).await?;
@@ -170,8 +161,7 @@ async fn assert_cache_files(
file_names: &[&str],
file_contents: &[&str],
) -> Result<()> {
let o = store.object("/");
let obs = o.list().await?;
let obs = store.list("/").await?;
let objects = util::collect(obs).await?;
// compare the cache file with the expected cache file; ignore orders
@@ -180,7 +170,7 @@ async fn assert_cache_files(
assert!(position.is_some(), "file not found: {}", o.name());
let position = position.unwrap();
let bs = o.read().await?;
let bs = store.read(o.path()).await.unwrap();
assert_eq!(
file_contents[position],
String::from_utf8(bs.clone())?,
@@ -194,40 +184,42 @@ async fn assert_cache_files(
#[tokio::test]
async fn test_object_store_cache_policy() -> Result<()> {
common_telemetry::init_default_ut_logging();
// create file storage
let root_dir = create_temp_dir("test_fs_backend");
let store = ObjectStore::new(
let store = OperatorBuilder::new(
Fs::default()
.root(&root_dir.path().to_string_lossy())
.atomic_write_dir(&root_dir.path().to_string_lossy())
.build()?,
);
.build()
.unwrap(),
)
.finish();
// create file cache layer
let cache_dir = create_temp_dir("test_fs_cache");
let cache_acc = Fs::default()
let mut builder = Fs::default();
builder
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&cache_dir.path().to_string_lossy())
.build()?;
let cache_store = ObjectStore::new(cache_acc.clone()).finish();
.atomic_write_dir(&cache_dir.path().to_string_lossy());
let cache_accessor = Arc::new(builder.build().unwrap());
let cache_store = OperatorBuilder::new(cache_accessor.clone()).finish();
// create operator for cache dir to verify cache file
let store = store
.layer(LruCacheLayer::new(Arc::new(cache_acc), 3))
.finish();
let store = store.layer(LruCacheLayer::new(Arc::new(cache_accessor), 3));
// create several object handler.
let o1 = store.object("test_file1");
let o2 = store.object("test_file2");
// write data into object;
assert!(o1.write("Hello, object1!").await.is_ok());
assert!(o2.write("Hello, object2!").await.is_ok());
let p1 = "test_file1";
let p2 = "test_file2";
store.write(p1, "Hello, object1!").await.unwrap();
store.write(p2, "Hello, object2!").await.unwrap();
// crate cache by read object
o1.range_read(7..).await?;
o1.read().await?;
o2.range_read(7..).await?;
o2.read().await?;
// create cache by read object
store.range_read(p1, 0..).await?;
store.read(p1).await?;
store.range_read(p2, 0..).await?;
store.read(p2).await?;
assert_cache_files(
&cache_store,
@@ -240,7 +232,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
)
.await?;
assert!(o2.delete().await.is_ok());
store.delete(p2).await.unwrap();
assert_cache_files(
&cache_store,
@@ -249,11 +241,11 @@ async fn test_object_store_cache_policy() -> Result<()> {
)
.await?;
let o3 = store.object("test_file3");
assert!(o3.write("Hello, object3!").await.is_ok());
let p3 = "test_file3";
store.write(p3, "Hello, object3!").await.unwrap();
o3.read().await?;
o3.range_read(0..5).await?;
store.read(p3).await.unwrap();
store.range_read(p3, 0..5).await.unwrap();
assert_cache_files(
&cache_store,

View File

@@ -93,7 +93,7 @@ mod tests {
TimestampMillisecondVector, TimestampMillisecondVectorBuilder, UInt64VectorBuilder,
};
use object_store::services::Fs;
use object_store::{ObjectStore, ObjectStoreBuilder};
use object_store::ObjectStore;
use store_api::storage::{ChunkReader, OpType, SequenceNumber};
use super::*;
@@ -277,8 +277,10 @@ mod tests {
async fn test_sst_reader() {
let dir = create_temp_dir("write_parquet");
let path = dir.path().to_str().unwrap();
let backend = Fs::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend).finish();
let mut builder = Fs::default();
builder.root(path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let seq = AtomicU64::new(0);
let schema = schema_for_test();
@@ -354,8 +356,9 @@ mod tests {
async fn test_sst_split() {
let dir = create_temp_dir("write_parquet");
let path = dir.path().to_str().unwrap();
let backend = Fs::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend).finish();
let mut builder = Fs::default();
builder.root(path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let schema = schema_for_test();
let seq = AtomicU64::new(0);

View File

@@ -381,7 +381,6 @@ mod tests {
use datatypes::type_id::LogicalTypeId;
use log_store::test_util::log_store_util;
use object_store::services::Fs;
use object_store::ObjectStoreBuilder;
use store_api::storage::Region;
use super::*;
@@ -396,8 +395,9 @@ mod tests {
let dir = create_temp_dir("test_create_new_region");
let store_dir = dir.path().to_string_lossy();
let accessor = Fs::default().root(&store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor).finish();
let mut builder = Fs::default();
builder.root(&store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let config = EngineConfig::default();

View File

@@ -107,7 +107,7 @@ pub mod noop {
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::services::Fs;
use object_store::{ObjectStore, ObjectStoreBuilder};
use object_store::ObjectStore;
use store_api::storage::OpType;
use super::*;
@@ -168,13 +168,9 @@ mod tests {
#[tokio::test]
async fn test_file_purger_handler() {
let dir = create_temp_dir("file-purge");
let object_store = ObjectStore::new(
Fs::default()
.root(dir.path().to_str().unwrap())
.build()
.unwrap(),
)
.finish();
let mut builder = Fs::default();
builder.root(dir.path().to_str().unwrap());
let object_store = ObjectStore::new(builder).unwrap().finish();
let sst_file_id = FileId::random();
@@ -198,22 +194,20 @@ mod tests {
.unwrap();
notify.notified().await;
let object = object_store.object(&format!("{}/{}", path, sst_file_id.as_parquet()));
assert!(!object.is_exist().await.unwrap());
let exists = object_store
.is_exist(&format!("{}/{}", path, sst_file_id.as_parquet()))
.await
.unwrap();
assert!(!exists);
}
#[tokio::test]
async fn test_file_purge_loop() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("file-purge");
let object_store = ObjectStore::new(
Fs::default()
.root(dir.path().to_str().unwrap())
.build()
.unwrap(),
)
.finish();
let mut builder = Fs::default();
builder.root(dir.path().to_str().unwrap());
let object_store = ObjectStore::new(builder).unwrap().finish();
let sst_file_id = FileId::random();
let scheduler = Arc::new(LocalScheduler::new(
SchedulerConfig::default(),
@@ -228,9 +222,9 @@ mod tests {
drop(handle);
}
scheduler.stop(true).await.unwrap();
assert!(!object_store
.object(&format!("{}/{}", path, sst_file_id.as_parquet()))
.is_exist()
.is_exist(&format!("{}/{}", path, sst_file_id.as_parquet()))
.await
.unwrap());
}

View File

@@ -167,7 +167,7 @@ mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::services::Fs;
use object_store::{ObjectStore, ObjectStoreBuilder};
use object_store::ObjectStore;
use store_api::manifest::action::ProtocolAction;
use store_api::manifest::{Manifest, MetaActionIterator, MAX_VERSION};
@@ -180,13 +180,9 @@ mod tests {
async fn test_region_manifest() {
common_telemetry::init_default_ut_logging();
let tmp_dir = create_temp_dir("test_region_manifest");
let object_store = ObjectStore::new(
Fs::default()
.root(&tmp_dir.path().to_string_lossy())
.build()
.unwrap(),
)
.finish();
let mut builder = Fs::default();
builder.root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();
let manifest = RegionManifest::with_checkpointer("/manifest/", object_store);
@@ -294,13 +290,9 @@ mod tests {
async fn test_region_manifest_checkpoint() {
common_telemetry::init_default_ut_logging();
let tmp_dir = create_temp_dir("test_region_manifest_checkpoint");
let object_store = ObjectStore::new(
Fs::default()
.root(&tmp_dir.path().to_string_lossy())
.build()
.unwrap(),
)
.finish();
let mut builder = Fs::default();
builder.root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();
let manifest = RegionManifest::with_checkpointer("/manifest/", object_store);

View File

@@ -19,7 +19,7 @@ use async_trait::async_trait;
use common_telemetry::logging;
use futures::TryStreamExt;
use lazy_static::lazy_static;
use object_store::{util, Object, ObjectStore};
use object_store::{util, Entry, ErrorKind, ObjectStore};
use regex::Regex;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
@@ -63,7 +63,8 @@ pub fn is_delta_file(file_name: &str) -> bool {
}
pub struct ObjectStoreLogIterator {
iter: Box<dyn Iterator<Item = (ManifestVersion, Object)> + Send + Sync>,
object_store: ObjectStore,
iter: Box<dyn Iterator<Item = (ManifestVersion, Entry)> + Send + Sync>,
}
#[async_trait]
@@ -72,11 +73,12 @@ impl LogIterator for ObjectStoreLogIterator {
async fn next_log(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
match self.iter.next() {
Some((v, object)) => {
let bytes = object.read().await.context(ReadObjectSnafu {
path: object.path(),
})?;
Some((v, entry)) => {
let bytes = self
.object_store
.read(entry.path())
.await
.context(ReadObjectSnafu { path: entry.path() })?;
Ok(Some((v, bytes)))
}
None => Ok(None),
@@ -150,23 +152,13 @@ impl ManifestLogStorage for ManifestObjectStore {
) -> Result<ObjectStoreLogIterator> {
ensure!(start <= end, InvalidScanIndexSnafu { start, end });
let dir = self.object_store.object(&self.path);
let dir_exists = dir
.is_exist()
.await
.context(ReadObjectSnafu { path: &self.path })?;
if !dir_exists {
return Ok(ObjectStoreLogIterator {
iter: Box::new(Vec::default().into_iter()),
});
}
let streamer = dir
.list()
let streamer = self
.object_store
.list(&self.path)
.await
.context(ListObjectsSnafu { path: &self.path })?;
let mut entries: Vec<(ManifestVersion, Object)> = streamer
let mut entries: Vec<(ManifestVersion, Entry)> = streamer
.try_filter_map(|e| async move {
let file_name = e.name();
if is_delta_file(file_name) {
@@ -187,40 +179,39 @@ impl ManifestLogStorage for ManifestObjectStore {
entries.sort_unstable_by(|(v1, _), (v2, _)| v1.cmp(v2));
Ok(ObjectStoreLogIterator {
object_store: self.object_store.clone(),
iter: Box::new(entries.into_iter()),
})
}
async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
let object = self.object_store.object(&self.delta_file_path(version));
object.write(bytes).await.context(WriteObjectSnafu {
path: object.path(),
})?;
Ok(())
let path = self.delta_file_path(version);
self.object_store
.write(&path, bytes.to_vec())
.await
.context(WriteObjectSnafu { path })
}
async fn delete(&self, start: ManifestVersion, end: ManifestVersion) -> Result<()> {
//TODO(dennis): delete in batch or concurrently?
for v in start..end {
let object = self.object_store.object(&self.delta_file_path(v));
object.delete().await.context(DeleteObjectSnafu {
path: object.path(),
})?;
let path = self.delta_file_path(v);
self.object_store
.delete(&path)
.await
.context(DeleteObjectSnafu { path })?;
}
Ok(())
}
async fn save_checkpoint(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
let object = self
.object_store
.object(&self.checkpoint_file_path(version));
object.write(bytes).await.context(WriteObjectSnafu {
path: object.path(),
})?;
let path = self.checkpoint_file_path(version);
self.object_store
.write(&path, bytes.to_vec())
.await
.context(WriteObjectSnafu { path })?;
let last_checkpoint = self.object_store.object(&self.last_checkpoint_path());
let last_checkpoint_path = self.last_checkpoint_path();
let checkpoint_metadata = CheckpointMetadata {
size: bytes.len(),
@@ -231,16 +222,17 @@ impl ManifestLogStorage for ManifestObjectStore {
logging::debug!(
"Save checkpoint in path: {}, metadata: {:?}",
last_checkpoint.path(),
last_checkpoint_path,
checkpoint_metadata
);
let bs = checkpoint_metadata.encode()?;
last_checkpoint
.write(bs.as_ref())
self.object_store
.write(&last_checkpoint_path, bs.as_ref().to_vec())
.await
.context(WriteObjectSnafu {
path: last_checkpoint.path(),
path: last_checkpoint_path,
})?;
Ok(())
@@ -250,53 +242,49 @@ impl ManifestLogStorage for ManifestObjectStore {
&self,
version: ManifestVersion,
) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let path = self.checkpoint_file_path(version);
let checkpoint = self
.object_store
.object(&self.checkpoint_file_path(version));
.read(&path)
.await
.context(ReadObjectSnafu { path })?;
Ok(Some((
version,
checkpoint.read().await.context(ReadObjectSnafu {
path: checkpoint.path(),
})?,
)))
Ok(Some((version, checkpoint)))
}
async fn delete_checkpoint(&self, version: ManifestVersion) -> Result<()> {
let checkpoint = self
.object_store
.object(&self.checkpoint_file_path(version));
checkpoint.delete().await.context(DeleteObjectSnafu {
path: checkpoint.path(),
})?;
let path = self.checkpoint_file_path(version);
self.object_store
.delete(&path)
.await
.context(DeleteObjectSnafu { path })?;
Ok(())
}
async fn load_last_checkpoint(&self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let last_checkpoint = self.object_store.object(&self.last_checkpoint_path());
let last_checkpoint_path = self.last_checkpoint_path();
let checkpoint_exists = last_checkpoint.is_exist().await.context(ReadObjectSnafu {
path: last_checkpoint.path(),
})?;
let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
Ok(last_checkpoint_data) => last_checkpoint_data,
Err(e) if e.kind() == ErrorKind::NotFound => {
return Ok(None);
}
Err(e) => {
return Err(e).context(ReadObjectSnafu {
path: last_checkpoint_path,
});
}
};
if checkpoint_exists {
let bytes = last_checkpoint.read().await.context(ReadObjectSnafu {
path: last_checkpoint.path(),
})?;
let checkpoint_metadata = CheckpointMetadata::decode(&last_checkpoint_data)?;
let checkpoint_metadata = CheckpointMetadata::decode(&bytes)?;
logging::debug!(
"Load checkpoint in path: {}, metadata: {:?}",
last_checkpoint_path,
checkpoint_metadata
);
logging::debug!(
"Load checkpoint in path: {}, metadata: {:?}",
last_checkpoint.path(),
checkpoint_metadata
);
self.load_checkpoint(checkpoint_metadata.version).await
} else {
Ok(None)
}
self.load_checkpoint(checkpoint_metadata.version).await
}
}
@@ -304,7 +292,7 @@ impl ManifestLogStorage for ManifestObjectStore {
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::services::Fs;
use object_store::{ObjectStore, ObjectStoreBuilder};
use object_store::ObjectStore;
use super::*;
@@ -312,13 +300,9 @@ mod tests {
async fn test_manifest_log_store() {
common_telemetry::init_default_ut_logging();
let tmp_dir = create_temp_dir("test_manifest_log_store");
let object_store = ObjectStore::new(
Fs::default()
.root(&tmp_dir.path().to_string_lossy())
.build()
.unwrap(),
)
.finish();
let mut builder = Fs::default();
builder.root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();
let log_store = ManifestObjectStore::new("/", object_store);

View File

@@ -31,7 +31,7 @@ use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef};
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::NoopLogStore;
use object_store::services::Fs;
use object_store::{ObjectStore, ObjectStoreBuilder};
use object_store::ObjectStore;
use store_api::manifest::MAX_VERSION;
use store_api::storage::{
consts, Chunk, ChunkReader, RegionMeta, ScanRequest, SequenceNumber, Snapshot, WriteRequest,
@@ -286,13 +286,9 @@ async fn test_recover_region_manifets() {
let tmp_dir = create_temp_dir("test_recover_region_manifets");
let memtable_builder = Arc::new(DefaultMemtableBuilder::default()) as _;
let object_store = ObjectStore::new(
Fs::default()
.root(&tmp_dir.path().to_string_lossy())
.build()
.unwrap(),
)
.finish();
let mut builder = Fs::default();
builder.root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();
let manifest = RegionManifest::with_checkpointer("/manifest/", object_store.clone());
let region_meta = Arc::new(build_region_meta());

View File

@@ -508,8 +508,10 @@ impl AccessLayer for FsAccessLayer {
/// Deletes a SST file with given file id.
async fn delete_sst(&self, file_id: FileId) -> Result<()> {
let path = self.sst_file_path(&file_id.as_parquet());
let object = self.object_store.object(&path);
object.delete().await.context(DeleteSstSnafu)
self.object_store
.delete(&path)
.await
.context(DeleteSstSnafu)
}
}

View File

@@ -89,7 +89,6 @@ impl<'a> ParquetWriter<'a> {
let projected_schema = self.source.projected_schema();
let store_schema = projected_schema.schema_to_read();
let schema = store_schema.arrow_schema().clone();
let object = self.object_store.object(self.file_path);
let writer_props = WriterProperties::builder()
.set_compression(Compression::ZSTD)
@@ -136,16 +135,16 @@ impl<'a> ParquetWriter<'a> {
.ok()
.flatten();
object.write(buf).await.context(WriteObjectSnafu {
path: object.path(),
})?;
let file_size = object
.metadata()
// object_store.write will make sure all bytes are written or an error is raised.
let buf_len = buf.len() as u64;
self.object_store
.write(self.file_path, buf)
.await
.context(WriteObjectSnafu {
path: object.path(),
})?
.content_length();
path: self.file_path,
})?;
let file_size = buf_len;
Ok(Some(SstInfo {
time_range,
file_size,
@@ -261,9 +260,9 @@ impl ParquetReader {
pub async fn chunk_stream(&self) -> Result<ChunkStream> {
let file_path = self.file_handle.file_path();
let operator = self.object_store.clone();
let reader = operator
.object(&file_path)
.reader()
.reader(&file_path)
.await
.context(ReadObjectSnafu { path: &file_path })?
.compat();
@@ -547,7 +546,6 @@ mod tests {
use datatypes::types::{TimestampMillisecondType, TimestampType};
use datatypes::vectors::TimestampMillisecondVector;
use object_store::services::Fs;
use object_store::ObjectStoreBuilder;
use store_api::storage::OpType;
use super::*;
@@ -558,6 +556,12 @@ mod tests {
use crate::schema::ProjectedSchema;
use crate::sst::{FileId, FileMeta};
fn create_object_store(root: &str) -> ObjectStore {
let mut builder = Fs::default();
builder.root(root);
ObjectStore::new(builder).unwrap().finish()
}
#[tokio::test]
async fn test_parquet_writer() {
let schema = memtable_tests::schema_for_test();
@@ -587,8 +591,8 @@ mod tests {
let dir = create_temp_dir("write_parquet");
let path = dir.path().to_str().unwrap();
let backend = Fs::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend).finish();
let object_store = create_object_store(path);
let sst_file_name = "test-flush.parquet";
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());
@@ -599,14 +603,7 @@ mod tests {
.unwrap();
// verify parquet file
let reader = BufReader::new(
object_store
.object(sst_file_name)
.reader()
.await
.unwrap()
.compat(),
);
let reader = BufReader::new(object_store.reader(sst_file_name).await.unwrap().compat());
let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
@@ -685,8 +682,7 @@ mod tests {
let dir = create_temp_dir("write_parquet");
let path = dir.path().to_str().unwrap();
let backend = Fs::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend).finish();
let object_store = create_object_store(path);
let sst_file_handle = new_file_handle(FileId::random());
let sst_file_name = sst_file_handle.file_name();
let iter = memtable.iter(&IterContext::default()).unwrap();
@@ -709,13 +705,7 @@ mod tests {
time_range
);
assert_ne!(file_size, 0);
let operator = ObjectStore::new(
Fs::default()
.root(dir.path().to_str().unwrap())
.build()
.unwrap(),
)
.finish();
let operator = create_object_store(dir.path().to_str().unwrap());
let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap());
let reader = ParquetReader::new(
@@ -783,8 +773,8 @@ mod tests {
let dir = create_temp_dir("write_parquet");
let path = dir.path().to_str().unwrap();
let backend = Fs::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend).finish();
let object_store = create_object_store(path);
let file_handle = new_file_handle(FileId::random());
let sst_file_name = file_handle.file_name();
let iter = memtable.iter(&IterContext::default()).unwrap();
@@ -807,13 +797,7 @@ mod tests {
time_range
);
assert_ne!(file_size, 0);
let operator = ObjectStore::new(
Fs::default()
.root(dir.path().to_str().unwrap())
.build()
.unwrap(),
)
.finish();
let operator = create_object_store(dir.path().to_str().unwrap());
let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap());
let reader = ParquetReader::new(
@@ -903,8 +887,7 @@ mod tests {
let dir = create_temp_dir("read-parquet-by-range");
let path = dir.path().to_str().unwrap();
let backend = Fs::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend).finish();
let object_store = create_object_store(path);
let sst_file_handle = new_file_handle(FileId::random());
let sst_file_name = sst_file_handle.file_name();
let iter = memtable.iter(&IterContext::default()).unwrap();
@@ -998,8 +981,9 @@ mod tests {
let dir = create_temp_dir("read-parquet-by-range");
let path = dir.path().to_str().unwrap();
let backend = Fs::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend).finish();
let mut builder = Fs::default();
builder.root(path);
let object_store = ObjectStore::new(builder).unwrap().finish();
let sst_file_name = "test-read.parquet";
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
use object_store::services::Fs as Builder;
use object_store::{ObjectStore, ObjectStoreBuilder};
use object_store::ObjectStore;
use crate::background::JobPoolImpl;
use crate::compaction::noop::NoopCompactionScheduler;
@@ -43,8 +43,10 @@ pub async fn new_store_config(
let sst_dir = engine::region_sst_dir(parent_dir, region_name);
let manifest_dir = engine::region_manifest_dir(parent_dir, region_name);
let accessor = Builder::default().root(store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor).finish();
let mut builder = Builder::default();
builder.root(store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone()));
let manifest = RegionManifest::with_checkpointer(&manifest_dir, object_store);
let job_pool = Arc::new(JobPoolImpl {});

View File

@@ -24,7 +24,7 @@ use log_store::NoopLogStore;
use mito::config::EngineConfig;
use mito::engine::MitoEngine;
use object_store::services::Fs;
use object_store::{ObjectStore, ObjectStoreBuilder};
use object_store::ObjectStore;
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
@@ -40,8 +40,9 @@ impl TestEnv {
pub fn new(prefix: &str) -> TestEnv {
let dir = create_temp_dir(prefix);
let store_dir = format!("{}/db", dir.path().to_string_lossy());
let accessor = Fs::default().root(&store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor).finish();
let mut builder = Fs::default();
builder.root(&store_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let storage_engine = EngineImpl::new(
@@ -57,8 +58,9 @@ impl TestEnv {
));
let procedure_dir = format!("{}/procedure", dir.path().to_string_lossy());
let accessor = Fs::default().root(&procedure_dir).build().unwrap();
let object_store = ObjectStore::new(accessor).finish();
let mut builder = Fs::default();
builder.root(&procedure_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig {
object_store,

View File

@@ -34,7 +34,7 @@ use datatypes::schema::{ColumnSchema, RawSchema};
use frontend::instance::Instance as FeInstance;
use object_store::services::{Oss, S3};
use object_store::test_util::TempFolder;
use object_store::{ObjectStore, ObjectStoreBuilder};
use object_store::ObjectStore;
use once_cell::sync::OnceCell;
use rand::Rng;
use servers::grpc::GrpcServer;
@@ -104,18 +104,17 @@ fn get_test_store_config(
cache_capacity: None,
};
let accessor = Oss::default()
let mut builder = Oss::default();
builder
.root(&oss_config.root)
.endpoint(&oss_config.endpoint)
.access_key_id(&oss_config.access_key_id)
.access_key_secret(&oss_config.access_key_secret)
.bucket(&oss_config.bucket)
.build()
.unwrap();
.bucket(&oss_config.bucket);
let config = ObjectStoreConfig::Oss(oss_config);
let store = ObjectStore::new(accessor).finish();
let store = ObjectStore::new(builder).unwrap().finish();
(
config,
@@ -134,17 +133,16 @@ fn get_test_store_config(
cache_capacity: None,
};
let accessor = S3::default()
let mut builder = S3::default();
builder
.root(&s3_config.root)
.access_key_id(&s3_config.access_key_id)
.secret_access_key(&s3_config.secret_access_key)
.bucket(&s3_config.bucket)
.build()
.unwrap();
.bucket(&s3_config.bucket);
let config = ObjectStoreConfig::S3(s3_config);
let store = ObjectStore::new(accessor).finish();
let store = ObjectStore::new(builder).unwrap().finish();
(config, Some(TempDirGuard::S3(TempFolder::new(&store, "/"))))
}