chore: Bump OpenDAL to v0.27 (#1057)

* Bump OpenDAL to v0.27

Signed-off-by: Xuanwo <github@xuanwo.io>

* Make cargo check happy

Signed-off-by: Xuanwo <github@xuanwo.io>

* Address comments

Signed-off-by: Xuanwo <github@xuanwo.io>

* Address comments

Signed-off-by: Xuanwo <github@xuanwo.io>

* Format toml

Signed-off-by: Xuanwo <github@xuanwo.io>

* Make taplo happy

Signed-off-by: Xuanwo <github@xuanwo.io>

---------

Signed-off-by: Xuanwo <github@xuanwo.io>
This commit is contained in:
Xuanwo
2023-02-23 11:20:45 +08:00
committed by GitHub
parent f42acc90c2
commit 98ef74bff4
28 changed files with 373 additions and 211 deletions

143
Cargo.lock generated
View File

@@ -653,6 +653,18 @@ dependencies = [
"tokio",
]
[[package]]
name = "backon"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f34fac4d7cdaefa2deded0eda2d5d59dbfd43370ff3f856209e72340ae84c294"
dependencies = [
"futures",
"pin-project",
"rand 0.8.5",
"tokio",
]
[[package]]
name = "backtrace"
version = "0.3.67"
@@ -729,25 +741,6 @@ dependencies = [
"serde",
]
[[package]]
name = "bincode"
version = "2.0.0-rc.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bb50c5a2ef4b9b1e7ae73e3a73b52ea24b20312d629f9c4df28260b7ad2c3c4"
dependencies = [
"bincode_derive",
"serde",
]
[[package]]
name = "bincode_derive"
version = "2.0.0-rc.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a45a23389446d2dd25dc8e73a7a3b3c43522b630cac068927f0649d43d719d2"
dependencies = [
"virtue",
]
[[package]]
name = "bindgen"
version = "0.59.2"
@@ -2172,7 +2165,7 @@ dependencies = [
"axum",
"axum-macros",
"axum-test-helper",
"backon",
"backon 0.2.0",
"catalog",
"client",
"common-base",
@@ -2248,6 +2241,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1a467a65c5e759bce6e65eaf91cc29f466cdc57cb65777bd646872a8a1fd4de"
dependencies = [
"const-oid",
"pem-rfc7468",
"zeroize",
]
[[package]]
@@ -2336,6 +2331,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f"
dependencies = [
"block-buffer",
"const-oid",
"crypto-common",
"subtle",
]
@@ -3464,6 +3460,9 @@ name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
dependencies = [
"spin",
]
[[package]]
name = "lazycell"
@@ -3662,7 +3661,7 @@ version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f270b952b07995fe874b10a5ed7dd28c80aa2130e37a7de7ed667d034e0a521"
dependencies = [
"bincode 1.3.3",
"bincode",
"cactus",
"cfgrammar",
"filetime",
@@ -4321,6 +4320,23 @@ dependencies = [
"serde",
]
[[package]]
name = "num-bigint-dig"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2399c9463abc5f909349d8aa9ba080e0b88b3ce2885389b60b993f39b1a56905"
dependencies = [
"byteorder",
"lazy_static",
"libm",
"num-integer",
"num-iter",
"num-traits",
"rand 0.8.5",
"smallvec",
"zeroize",
]
[[package]]
name = "num-complex"
version = "0.4.3"
@@ -4480,16 +4496,15 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "opendal"
version = "0.25.1"
version = "0.27.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73829d3a057542556dc2c2d2b70700a44dda913cdb5483094c20ef9673ca283c"
checksum = "ef6f7b936f2f8483e19643357cb50d9ec9a49c506971ef69ca676913cf5afd91"
dependencies = [
"anyhow",
"async-compat",
"async-trait",
"backon",
"backon 0.4.0",
"base64 0.21.0",
"bincode 2.0.0-rc.2",
"bytes",
"flagset",
"futures",
@@ -4795,6 +4810,15 @@ dependencies = [
"base64 0.13.1",
]
[[package]]
name = "pem-rfc7468"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d159833a9105500e0398934e205e0773f0b27529557134ecfc51c27646adac"
dependencies = [
"base64ct",
]
[[package]]
name = "percent-encoding"
version = "2.2.0"
@@ -4996,6 +5020,28 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkcs1"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eff33bdbdfc54cc98a2eca766ebdec3e1b8fb7387523d5c9c9a2891da856f719"
dependencies = [
"der",
"pkcs8",
"spki",
"zeroize",
]
[[package]]
name = "pkcs8"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9eca2c590a5f85da82668fa685c09ce2888b9430e83299debf1f34b65fd4a4ba"
dependencies = [
"der",
"spki",
]
[[package]]
name = "pkg-config"
version = "0.3.26"
@@ -5763,12 +5809,12 @@ dependencies = [
[[package]]
name = "reqsign"
version = "0.8.1"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f446438814fde3785305a59a85a6d1b361ce2c9d29e58dd87c9103a242c40b6"
checksum = "ef4d5fefeaaa1e64f4aabb79da4ea68bf6d0e7935ad927728280d2a8e95735fc"
dependencies = [
"anyhow",
"backon",
"backon 0.4.0",
"base64 0.21.0",
"bytes",
"dirs",
@@ -5781,6 +5827,8 @@ dependencies = [
"once_cell",
"percent-encoding",
"quick-xml",
"rand 0.8.5",
"rsa",
"rust-ini",
"serde",
"serde_json",
@@ -5905,6 +5953,27 @@ dependencies = [
"serde",
]
[[package]]
name = "rsa"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b3896c9b7790b70a9aa314a30e4ae114200992a19c96cbe0ca6070edd32ab8"
dependencies = [
"byteorder",
"digest",
"num-bigint-dig",
"num-integer",
"num-iter",
"num-traits",
"pkcs1",
"pkcs8",
"rand_core 0.6.4",
"sha2",
"signature",
"subtle",
"zeroize",
]
[[package]]
name = "rust-ini"
version = "0.18.0"
@@ -6076,7 +6145,7 @@ name = "rustpython-compiler-core"
version = "0.1.2"
source = "git+https://github.com/discord9/RustPython?rev=2e126345#2e12634569d01674724490193eb9638f056e51ca"
dependencies = [
"bincode 1.3.3",
"bincode",
"bitflags",
"bstr",
"itertools",
@@ -6826,6 +6895,10 @@ name = "signature"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fe458c98333f9c8152221191a77e2a44e8325d0193484af2e9421a53019e57d"
dependencies = [
"digest",
"rand_core 0.6.4",
]
[[package]]
name = "simba"
@@ -8368,12 +8441,6 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "virtue"
version = "0.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b60dcd6a64dd45abf9bd426970c9843726da7fc08f44cd6fcebf68c21220a63"
[[package]]
name = "vob"
version = "3.0.2"
@@ -8756,6 +8823,12 @@ dependencies = [
"lzma-sys",
]
[[package]]
name = "zeroize"
version = "1.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f"
[[package]]
name = "zstd"
version = "0.12.2+zstd.1.5.2"

View File

@@ -399,7 +399,7 @@ mod tests {
use log_store::NoopLogStore;
use mito::config::EngineConfig;
use mito::engine::MitoEngine;
use object_store::ObjectStore;
use object_store::{ObjectStore, ObjectStoreBuilder};
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
@@ -486,7 +486,7 @@ mod tests {
.root(&store_dir)
.build()
.unwrap();
let object_store = ObjectStore::new(accessor);
let object_store = ObjectStore::new(accessor).finish();
let noop_compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let table_engine = Arc::new(MitoEngine::new(
EngineConfig::default(),

View File

@@ -411,7 +411,8 @@ impl ProcedureManager for LocalManager {
/// Create a new [ProcedureMeta] for test purpose.
#[cfg(test)]
mod test_util {
use object_store::services::fs::Builder;
use object_store::services::Fs as Builder;
use object_store::ObjectStoreBuilder;
use tempdir::TempDir;
use super::*;
@@ -423,7 +424,7 @@ mod test_util {
pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
ObjectStore::new(accessor)
ObjectStore::new(accessor).finish()
}
}

View File

@@ -246,7 +246,8 @@ impl ParsedKey {
#[cfg(test)]
mod tests {
use async_trait::async_trait;
use object_store::services::fs::Builder;
use object_store::services::Fs as Builder;
use object_store::ObjectStoreBuilder;
use tempdir::TempDir;
use super::*;
@@ -255,7 +256,7 @@ mod tests {
fn procedure_store_for_test(dir: &TempDir) -> ProcedureStore {
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let object_store = ObjectStore::new(accessor).finish();
ProcedureStore::from(object_store)
}

View File

@@ -20,9 +20,7 @@ use futures::{Stream, TryStreamExt};
use object_store::{ObjectMode, ObjectStore};
use snafu::ResultExt;
use crate::error::{
DeleteStateSnafu, Error, ListStateSnafu, PutStateSnafu, ReadStateSnafu, Result,
};
use crate::error::{DeleteStateSnafu, Error, PutStateSnafu, Result};
/// Key value from state store.
type KeyValue = (String, Vec<u8>);
@@ -72,22 +70,23 @@ impl StateStore for ObjectStateStore {
async fn walk_top_down(&self, path: &str) -> Result<KeyValueStream> {
let path_string = path.to_string();
let op = self.store.batch();
// Note that there is no guarantee about the order between files and dirs
// at the same level.
// See https://docs.rs/opendal/0.25.2/opendal/raw/struct.TopDownWalker.html#note
let stream = op
.walk_top_down(path)
.context(ListStateSnafu { path })?
.map_err(move |e| Error::ListState {
let lister = self
.store
.object(path)
.scan()
.await
.map_err(|e| Error::ListState {
path: path_string.clone(),
source: e,
})
})?;
let stream = lister
.try_filter_map(|entry| async move {
let key = entry.path();
let key_value = match entry.mode().await.context(ReadStateSnafu { key })? {
let key_value = match entry.mode().await? {
ObjectMode::FILE => {
let value = entry.read().await.context(ReadStateSnafu { key })?;
let value = entry.read().await?;
Some((key.to_string(), value))
}
@@ -95,6 +94,10 @@ impl StateStore for ObjectStateStore {
};
Ok(key_value)
})
.map_err(move |e| Error::ListState {
path: path_string.clone(),
source: e,
});
Ok(Box::pin(stream))
@@ -112,7 +115,8 @@ impl StateStore for ObjectStateStore {
#[cfg(test)]
mod tests {
use object_store::services::fs::Builder;
use object_store::services::Fs as Builder;
use object_store::ObjectStoreBuilder;
use tempdir::TempDir;
use super::*;
@@ -122,7 +126,7 @@ mod tests {
let dir = TempDir::new("state_store").unwrap();
let store_dir = dir.path().to_str().unwrap();
let accessor = Builder::default().root(store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let object_store = ObjectStore::new(accessor).finish();
let state_store = ObjectStateStore::new(object_store);
let data: Vec<_> = state_store

View File

@@ -16,7 +16,6 @@ use std::sync::Arc;
use std::time::Duration;
use std::{fs, path};
use backon::ExponentialBackoff;
use catalog::remote::MetaKvBackend;
use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
use common_base::readable_size::ReadableSize;
@@ -29,12 +28,10 @@ use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::cache_policy::LruCachePolicy;
use object_store::layers::{CacheLayer, LoggingLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::services::fs::Builder as FsBuilder;
use object_store::services::oss::Builder as OSSBuilder;
use object_store::services::s3::Builder as S3Builder;
use object_store::{util, ObjectStore};
use object_store::cache_policy::LruCacheLayer;
use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::services::{Fs as FsBuilder, Oss as OSSBuilder, S3 as S3Builder};
use object_store::{util, ObjectStore, ObjectStoreBuilder};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use servers::Mode;
use snafu::prelude::*;
@@ -227,7 +224,7 @@ pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result
object_store.map(|object_store| {
object_store
.layer(RetryLayer::new(ExponentialBackoff::default().with_jitter()))
.layer(RetryLayer::new().with_jitter())
.layer(MetricsLayer)
.layer(LoggingLayer::default())
.layer(TracingLayer)
@@ -258,7 +255,7 @@ pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Re
config: store_config.clone(),
})?;
create_object_store_with_cache(ObjectStore::new(accessor), store_config)
create_object_store_with_cache(ObjectStore::new(accessor).finish(), store_config)
}
fn create_object_store_with_cache(
@@ -285,13 +282,13 @@ fn create_object_store_with_cache(
if let Some(path) = cache_path {
let cache_store =
ObjectStore::new(FsBuilder::default().root(path).build().with_context(|_| {
error::InitBackendSnafu {
FsBuilder::default()
.root(path)
.build()
.with_context(|_| error::InitBackendSnafu {
config: store_config.clone(),
}
})?);
let policy = LruCachePolicy::new(cache_capacity.0 as usize);
let cache_layer = CacheLayer::new(cache_store).with_policy(policy);
})?;
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize);
Ok(object_store.layer(cache_layer))
} else {
Ok(object_store)
@@ -328,7 +325,7 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res
config: store_config.clone(),
})?;
create_object_store_with_cache(ObjectStore::new(accessor), store_config)
create_object_store_with_cache(ObjectStore::new(accessor).finish(), store_config)
}
pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
@@ -351,7 +348,7 @@ pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Res
config: store_config.clone(),
})?;
Ok(ObjectStore::new(accessor))
Ok(ObjectStore::new(accessor).finish())
}
/// Create metasrv client instance and spawn heartbeat loop.

View File

@@ -147,8 +147,8 @@ mod tests {
use log_store::NoopLogStore;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::services::fs::Builder;
use object_store::ObjectStore;
use object_store::services::Fs as Builder;
use object_store::{ObjectStore, ObjectStoreBuilder};
use query::parser::{QueryLanguageParser, QueryStatement};
use query::QueryEngineFactory;
use session::context::QueryContext;
@@ -213,7 +213,7 @@ mod tests {
let dir = TempDir::new("setup_test_engine_and_table").unwrap();
let store_dir = dir.path().to_string_lossy();
let accessor = Builder::default().root(&store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let object_store = ObjectStore::new(accessor).finish();
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
let sql = r#"insert into demo(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),

View File

@@ -22,8 +22,8 @@ use datafusion::parquet::basic::{Compression, Encoding};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::RecordBatchStream;
use futures::TryStreamExt;
use object_store::services::fs::Builder;
use object_store::ObjectStore;
use object_store::services::Fs as Builder;
use object_store::{ObjectStore, ObjectStoreBuilder};
use snafu::ResultExt;
use table::engine::TableReference;
use table::requests::CopyTableRequest;
@@ -53,7 +53,7 @@ impl SqlHandler {
let stream = Box::pin(DfRecordBatchStreamAdapter::new(stream));
let accessor = Builder::default().build().unwrap();
let object_store = ObjectStore::new(accessor);
let object_store = ObjectStore::new(accessor).finish();
let mut parquet_writer = ParquetWriter::new(req.file_name, stream, object_store);
// TODO(jiachun):

View File

@@ -21,8 +21,8 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef};
use datatypes::vectors::VectorRef;
use log_store::NoopLogStore;
use object_store::services::fs::Builder;
use object_store::ObjectStore;
use object_store::services::Fs as Builder;
use object_store::{ObjectStore, ObjectStoreBuilder};
use storage::compaction::noop::NoopCompactionScheduler;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
@@ -99,7 +99,7 @@ pub async fn new_test_object_store(prefix: &str) -> (TempDir, ObjectStore) {
let dir = TempDir::new(prefix).unwrap();
let store_dir = dir.path().to_string_lossy();
let accessor = Builder::default().root(&store_dir).build().unwrap();
(dir, ObjectStore::new(accessor))
(dir, ObjectStore::new(accessor).finish())
}
pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest {

View File

@@ -8,10 +8,7 @@ license.workspace = true
lru = "0.9"
async-trait = "0.1"
futures = { version = "0.3" }
opendal = { version = "0.25.1", features = [
"layers-tracing",
"layers-metrics",
] }
opendal = { version = "0.27", features = ["layers-tracing", "layers-metrics"] }
tokio.workspace = true
[dev-dependencies]

View File

@@ -15,4 +15,5 @@
pub mod azblob;
pub mod fs;
pub mod memory;
pub mod oss;
pub mod s3;

View File

@@ -12,4 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use opendal::services::azblob::Builder;
pub use opendal::services::Azblob as Builder;

View File

@@ -12,4 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use opendal::services::fs::Builder;
pub use opendal::services::Fs as Builder;

View File

@@ -12,4 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use opendal::services::memory::Builder;
pub use opendal::services::Memory as Builder;

View File

@@ -0,0 +1,15 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub use opendal::services::Oss as Builder;

View File

@@ -12,4 +12,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use opendal::services::s3::Builder;
pub use opendal::services::S3 as Builder;

View File

@@ -12,112 +12,177 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io;
use std::num::NonZeroUsize;
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::AsyncRead;
use lru::LruCache;
use opendal::layers::CachePolicy;
use opendal::raw::output::Reader;
use opendal::raw::{Accessor, RpDelete, RpRead};
use opendal::{ErrorKind, OpDelete, OpRead, OpWrite, Result};
use opendal::ops::*;
use opendal::raw::*;
use opendal::{ErrorKind, Result};
use tokio::sync::Mutex;
#[derive(Debug)]
pub struct LruCachePolicy {
pub struct LruCacheLayer<C> {
cache: Arc<C>,
lru_cache: Arc<Mutex<LruCache<String, ()>>>,
}
impl LruCachePolicy {
pub fn new(capacity: usize) -> Self {
impl<C: Accessor> LruCacheLayer<C> {
pub fn new(cache: Arc<C>, capacity: usize) -> Self {
Self {
cache,
lru_cache: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(capacity).unwrap(),
))),
}
}
}
impl<I: Accessor, C: Accessor> Layer<I> for LruCacheLayer<C> {
type LayeredAccessor = LruCacheAccessor<I, C>;
fn layer(&self, inner: I) -> Self::LayeredAccessor {
LruCacheAccessor {
inner: Arc::new(inner),
cache: self.cache.clone(),
lru_cache: self.lru_cache.clone(),
}
}
}
#[derive(Debug)]
pub struct LruCacheAccessor<I, C> {
inner: Arc<I>,
cache: Arc<C>,
lru_cache: Arc<Mutex<LruCache<String, ()>>>,
}
impl<I, C> LruCacheAccessor<I, C> {
fn cache_path(&self, path: &str, args: &OpRead) -> String {
format!("{}.cache-{}", path, args.range().to_header())
}
}
#[async_trait]
impl CachePolicy for LruCachePolicy {
fn on_read(
&self,
inner: Arc<dyn Accessor>,
cache: Arc<dyn Accessor>,
path: &str,
args: OpRead,
) -> BoxFuture<'static, Result<(RpRead, Reader)>> {
impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
type Inner = I;
type Reader = output::Reader;
type BlockingReader = I::BlockingReader;
type Pager = I::Pager;
type BlockingPager = I::BlockingPager;
fn inner(&self) -> &Self::Inner {
&self.inner
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let path = path.to_string();
let cache_path = self.cache_path(&path, &args);
let lru_cache = self.lru_cache.clone();
Box::pin(async move {
match cache.read(&cache_path, OpRead::default()).await {
Ok(v) => {
// update lru when cache hit
let mut lru_cache = lru_cache.lock().await;
lru_cache.get_or_insert(cache_path.clone(), || ());
Ok(v)
}
Err(err) if err.kind() == ErrorKind::ObjectNotFound => {
let (rp, reader) = inner.read(&path, args.clone()).await?;
let size = rp.clone().into_metadata().content_length();
let _ = cache
.write(&cache_path, OpWrite::new(size), Box::new(reader))
.await?;
match cache.read(&cache_path, OpRead::default()).await {
Ok(v) => {
let r = {
// push new cache file name to lru
let mut lru_cache = lru_cache.lock().await;
lru_cache.push(cache_path.clone(), ())
};
// delete the evicted cache file
if let Some((k, _v)) = r {
let _ = cache.delete(&k, OpDelete::new()).await;
}
Ok(v)
}
Err(_) => inner.read(&path, args).await,
}
}
Err(_) => inner.read(&path, args).await,
match self.cache.read(&cache_path, OpRead::default()).await {
Ok((rp, r)) => {
// update lru when cache hit
let mut lru_cache = lru_cache.lock().await;
lru_cache.get_or_insert(cache_path.clone(), || ());
Ok(to_output_reader((rp, r)))
}
})
Err(err) if err.kind() == ErrorKind::ObjectNotFound => {
let (rp, reader) = self.inner.read(&path, args.clone()).await?;
let size = rp.clone().into_metadata().content_length();
let _ = self
.cache
.write(
&cache_path,
OpWrite::new(size),
Box::new(ReadWrapper(reader)),
)
.await?;
match self.cache.read(&cache_path, OpRead::default()).await {
Ok((rp, reader)) => {
let r = {
// push new cache file name to lru
let mut lru_cache = lru_cache.lock().await;
lru_cache.push(cache_path.clone(), ())
};
// delete the evicted cache file
if let Some((k, _v)) = r {
let _ = self.cache.delete(&k, OpDelete::new()).await;
}
return Ok(to_output_reader((rp, reader)));
}
Err(_) => return self.inner.read(&path, args).await.map(to_output_reader),
}
}
Err(_) => return self.inner.read(&path, args).await.map(to_output_reader),
}
}
fn on_delete(
&self,
inner: Arc<dyn Accessor>,
cache: Arc<dyn Accessor>,
path: &str,
args: OpDelete,
) -> BoxFuture<'static, Result<RpDelete>> {
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
self.inner.blocking_read(path, args)
}
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let path = path.to_string();
let lru_cache = self.lru_cache.clone();
Box::pin(async move {
let cache_files: Vec<String> = {
let mut guard = lru_cache.lock().await;
let lru = guard.deref_mut();
let cache_files = lru
.iter()
.filter(|(k, _v)| k.starts_with(format!("{path}.cache-").as_str()))
.map(|(k, _v)| k.clone())
.collect::<Vec<_>>();
for k in &cache_files {
lru.pop(k);
}
cache_files
};
for file in cache_files {
let _ = cache.delete(&file, OpDelete::new()).await;
let cache_files: Vec<String> = {
let mut guard = lru_cache.lock().await;
let lru = guard.deref_mut();
let cache_files = lru
.iter()
.filter(|(k, _v)| k.starts_with(format!("{path}.cache-").as_str()))
.map(|(k, _v)| k.clone())
.collect::<Vec<_>>();
for k in &cache_files {
lru.pop(k);
}
inner.delete(&path, args).await
})
cache_files
};
for file in cache_files {
let _ = self.cache.delete(&file, OpDelete::new()).await;
}
return self.inner.delete(&path, args).await;
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
self.inner.list(path, args).await
}
async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> {
self.inner.scan(path, args).await
}
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
self.inner.blocking_list(path, args)
}
fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::BlockingPager)> {
self.inner.blocking_scan(path, args)
}
}
/// TODO: Workaround for output::Read doesn't implement input::Read
///
/// Should be remove after opendal fixed it.
struct ReadWrapper<R>(R);
impl<R: output::Read> AsyncRead for ReadWrapper<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.0.poll_read(cx, buf)
}
}
#[inline]
fn to_output_reader<R: output::Read + 'static>(input: (RpRead, R)) -> (RpRead, output::Reader) {
(input.0, Box::new(input.1))
}

View File

@@ -13,8 +13,8 @@
// limitations under the License.
pub use opendal::{
layers, services, Error, ErrorKind, Layer, Object, ObjectLister, ObjectMetadata, ObjectMode,
Operator as ObjectStore, Result,
layers, services, Builder as ObjectStoreBuilder, Error, ErrorKind, Object, ObjectLister,
ObjectMetadata, ObjectMode, Operator as ObjectStore, Result,
};
pub mod backend;
pub mod cache_policy;

View File

@@ -13,15 +13,15 @@
// limitations under the License.
use std::env;
use std::sync::Arc;
use anyhow::Result;
use common_telemetry::logging;
use object_store::backend::{fs, s3};
use object_store::cache_policy::LruCachePolicy;
use object_store::cache_policy::LruCacheLayer;
use object_store::test_util::TempFolder;
use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore};
use opendal::layers::CacheLayer;
use opendal::services::oss;
use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore, ObjectStoreBuilder};
use opendal::services::Oss;
use opendal::Operator;
use tempdir::TempDir;
@@ -100,7 +100,8 @@ async fn test_fs_backend() -> Result<()> {
.root(&data_dir.path().to_string_lossy())
.atomic_write_dir(&tmp_dir.path().to_string_lossy())
.build()?,
);
)
.finish();
test_object_crud(&store).await?;
test_object_list(&store).await?;
@@ -124,7 +125,7 @@ async fn test_s3_backend() -> Result<()> {
.bucket(&bucket)
.build()?;
let store = ObjectStore::new(accessor);
let store = ObjectStore::new(accessor).finish();
let mut guard = TempFolder::new(&store, "/");
test_object_crud(&store).await?;
@@ -145,14 +146,14 @@ async fn test_oss_backend() -> Result<()> {
let root = uuid::Uuid::new_v4().to_string();
let accessor = oss::Builder::default()
let accessor = Oss::default()
.root(&root)
.access_key_id(&env::var("GT_OSS_ACCESS_KEY_ID")?)
.access_key_secret(&env::var("GT_OSS_ACCESS_KEY")?)
.bucket(&bucket)
.build()?;
let store = ObjectStore::new(accessor);
let store = ObjectStore::new(accessor).finish();
let mut guard = TempFolder::new(&store, "/");
test_object_crud(&store).await?;
@@ -204,16 +205,15 @@ async fn test_object_store_cache_policy() -> Result<()> {
// create file cache layer
let cache_dir = TempDir::new("test_fs_cache")?;
let cache_op = ObjectStore::new(
fs::Builder::default()
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&cache_dir.path().to_string_lossy())
.build()?,
);
let cache_acc = fs::Builder::default()
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&cache_dir.path().to_string_lossy())
.build()?;
let cache_store = ObjectStore::new(cache_acc.clone()).finish();
// create operator for cache dir to verify cache file
let cache_store = ObjectStore::from(cache_op.inner());
let policy = LruCachePolicy::new(3);
let store = store.layer(CacheLayer::new(cache_op).with_policy(policy));
let store = store
.layer(LruCacheLayer::new(Arc::new(cache_acc), 3))
.finish();
// create several object handler.
let o1 = store.object("test_file1");

View File

@@ -92,7 +92,7 @@ mod tests {
TimestampMillisecondVector, TimestampMillisecondVectorBuilder, UInt64VectorBuilder,
};
use object_store::backend::fs::Builder;
use object_store::ObjectStore;
use object_store::{ObjectStore, ObjectStoreBuilder};
use store_api::storage::{ChunkReader, OpType, SequenceNumber};
use tempdir::TempDir;
@@ -273,7 +273,7 @@ mod tests {
let dir = TempDir::new("write_parquet").unwrap();
let path = dir.path().to_str().unwrap();
let backend = Builder::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend);
let object_store = ObjectStore::new(backend).finish();
let seq = AtomicU64::new(0);
let schema = schema_for_test();
@@ -350,7 +350,7 @@ mod tests {
let dir = TempDir::new("write_parquet").unwrap();
let path = dir.path().to_str().unwrap();
let backend = Builder::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend);
let object_store = ObjectStore::new(backend).finish();
let schema = schema_for_test();
let seq = AtomicU64::new(0);

View File

@@ -380,6 +380,7 @@ mod tests {
use datatypes::type_id::LogicalTypeId;
use log_store::test_util::log_store_util;
use object_store::backend::fs::Builder;
use object_store::ObjectStoreBuilder;
use store_api::storage::Region;
use tempdir::TempDir;
@@ -395,7 +396,7 @@ mod tests {
let store_dir = dir.path().to_string_lossy();
let accessor = Builder::default().root(&store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let object_store = ObjectStore::new(accessor).finish();
let config = EngineConfig::default();

View File

@@ -107,7 +107,7 @@ pub mod noop {
#[cfg(test)]
mod tests {
use object_store::backend::fs::Builder;
use object_store::ObjectStore;
use object_store::{ObjectStore, ObjectStoreBuilder};
use store_api::storage::OpType;
use tempdir::TempDir;
@@ -172,7 +172,8 @@ mod tests {
.root(dir.path().to_str().unwrap())
.build()
.unwrap(),
);
)
.finish();
let sst_file_name = "test-file-purge-handler.parquet";
let noop_file_purger = Arc::new(LocalScheduler::new(
@@ -209,7 +210,8 @@ mod tests {
.root(dir.path().to_str().unwrap())
.build()
.unwrap(),
);
)
.finish();
let sst_file_name = "test-file-purger.parquet";
let scheduler = Arc::new(LocalScheduler::new(
SchedulerConfig::default(),

View File

@@ -23,7 +23,7 @@ mod tests {
use std::sync::Arc;
use object_store::backend::fs;
use object_store::ObjectStore;
use object_store::{ObjectStore, ObjectStoreBuilder};
use store_api::manifest::action::ProtocolAction;
use store_api::manifest::{Manifest, MetaActionIterator, MAX_VERSION};
use tempdir::TempDir;
@@ -41,7 +41,8 @@ mod tests {
.root(&tmp_dir.path().to_string_lossy())
.build()
.unwrap(),
);
)
.finish();
let manifest = RegionManifest::new("/manifest/", object_store);

View File

@@ -278,7 +278,7 @@ impl ManifestLogStorage for ManifestObjectStore {
#[cfg(test)]
mod tests {
use object_store::backend::fs;
use object_store::ObjectStore;
use object_store::{ObjectStore, ObjectStoreBuilder};
use tempdir::TempDir;
use super::*;
@@ -292,7 +292,8 @@ mod tests {
.root(&tmp_dir.path().to_string_lossy())
.build()
.unwrap(),
);
)
.finish();
let log_store = ManifestObjectStore::new("/", object_store);

View File

@@ -30,7 +30,7 @@ use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, VectorRef};
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::NoopLogStore;
use object_store::backend::fs;
use object_store::ObjectStore;
use object_store::{ObjectStore, ObjectStoreBuilder};
use store_api::storage::{
consts, Chunk, ChunkReader, RegionMeta, ScanRequest, SequenceNumber, Snapshot, WriteRequest,
};
@@ -286,7 +286,8 @@ async fn test_recover_region_manifets() {
.root(&tmp_dir.path().to_string_lossy())
.build()
.unwrap(),
);
)
.finish();
let manifest = RegionManifest::new("/manifest/", object_store.clone());
let region_meta = Arc::new(build_region_meta());

View File

@@ -524,6 +524,7 @@ mod tests {
use datatypes::types::{TimestampMillisecondType, TimestampType};
use datatypes::vectors::TimestampMillisecondVector;
use object_store::backend::fs::Builder;
use object_store::ObjectStoreBuilder;
use store_api::storage::OpType;
use tempdir::TempDir;
@@ -563,7 +564,7 @@ mod tests {
let dir = TempDir::new("write_parquet").unwrap();
let path = dir.path().to_str().unwrap();
let backend = Builder::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend);
let object_store = ObjectStore::new(backend).finish();
let sst_file_name = "test-flush.parquet";
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());
@@ -661,7 +662,7 @@ mod tests {
let dir = TempDir::new("write_parquet").unwrap();
let path = dir.path().to_str().unwrap();
let backend = Builder::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend);
let object_store = ObjectStore::new(backend).finish();
let sst_file_name = "test-read-large.parquet";
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());
@@ -683,7 +684,8 @@ mod tests {
.root(dir.path().to_str().unwrap())
.build()
.unwrap(),
);
)
.finish();
let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap());
let reader = ParquetReader::new(
@@ -733,7 +735,7 @@ mod tests {
let dir = TempDir::new("write_parquet").unwrap();
let path = dir.path().to_str().unwrap();
let backend = Builder::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend);
let object_store = ObjectStore::new(backend).finish();
let sst_file_name = "test-read.parquet";
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());
@@ -755,7 +757,8 @@ mod tests {
.root(dir.path().to_str().unwrap())
.build()
.unwrap(),
);
)
.finish();
let projected_schema = Arc::new(ProjectedSchema::new(schema, Some(vec![1])).unwrap());
let reader = ParquetReader::new(
@@ -845,7 +848,7 @@ mod tests {
let dir = TempDir::new("read-parquet-by-range").unwrap();
let path = dir.path().to_str().unwrap();
let backend = Builder::default().root(path).build().unwrap();
let object_store = ObjectStore::new(backend);
let object_store = ObjectStore::new(backend).finish();
let sst_file_name = "test-read.parquet";
let iter = memtable.iter(&IterContext::default()).unwrap();
let writer = ParquetWriter::new(sst_file_name, Source::Iter(iter), object_store.clone());

View File

@@ -16,8 +16,8 @@ use std::sync::Arc;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
use object_store::backend::fs::Builder;
use object_store::ObjectStore;
use object_store::services::Fs as Builder;
use object_store::{ObjectStore, ObjectStoreBuilder};
use crate::background::JobPoolImpl;
use crate::compaction::noop::NoopCompactionScheduler;
@@ -44,7 +44,7 @@ pub async fn new_store_config(
let manifest_dir = engine::region_manifest_dir(parent_dir, region_name);
let accessor = Builder::default().root(store_dir).build().unwrap();
let object_store = ObjectStore::new(accessor);
let object_store = ObjectStore::new(accessor).finish();
let sst_layer = Arc::new(FsAccessLayer::new(&sst_dir, object_store.clone()));
let manifest = RegionManifest::new(&manifest_dir, object_store);
let job_pool = Arc::new(JobPoolImpl {});

View File

@@ -31,10 +31,9 @@ use datanode::sql::SqlHandler;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use frontend::instance::Instance as FeInstance;
use object_store::backend::s3;
use object_store::services::oss;
use object_store::backend::{oss, s3};
use object_store::test_util::TempFolder;
use object_store::ObjectStore;
use object_store::{ObjectStore, ObjectStoreBuilder};
use once_cell::sync::OnceCell;
use rand::Rng;
use servers::grpc::GrpcServer;
@@ -116,7 +115,7 @@ fn get_test_store_config(
let config = ObjectStoreConfig::Oss(oss_config);
let store = ObjectStore::new(accessor);
let store = ObjectStore::new(accessor).finish();
(
config,
@@ -145,7 +144,7 @@ fn get_test_store_config(
let config = ObjectStoreConfig::S3(s3_config);
let store = ObjectStore::new(accessor);
let store = ObjectStore::new(accessor).finish();
(config, Some(TempDirGuard::S3(TempFolder::new(&store, "/"))))
}