feat: enable caching when using object store (#928)

* feat: enable caching when using object store

* feat: support file cache for object store

* feat: maintaining the cached files with lru

* fix: improve the code

* empty commit

* improve the code
This commit is contained in:
elijah
2023-02-07 15:46:37 +08:00
committed by GitHub
parent 2f2609d8c6
commit 926022e14c
8 changed files with 300 additions and 4 deletions

11
Cargo.lock generated
View File

@@ -3686,6 +3686,15 @@ dependencies = [
"hashbrown 0.12.3",
]
[[package]]
name = "lru"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71e7d46de488603ffdd5f30afbc64fbba2378214a2c3a2fb83abf3d33126df17"
dependencies = [
"hashbrown 0.13.2",
]
[[package]]
name = "lz4"
version = "1.24.0"
@@ -4404,8 +4413,10 @@ name = "object-store"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"common-telemetry",
"futures",
"lru 0.9.0",
"opendal",
"tempdir",
"tokio",

View File

@@ -25,6 +25,8 @@ use crate::error::Result;
use crate::instance::{Instance, InstanceRef};
use crate::server::Services;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024);
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ObjectStoreConfig {
@@ -48,6 +50,8 @@ pub struct S3Config {
pub secret_access_key: String,
pub endpoint: Option<String>,
pub region: Option<String>,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
}
#[derive(Debug, Clone, Serialize, Default, Deserialize)]
@@ -58,6 +62,8 @@ pub struct OssConfig {
pub access_key_id: String,
pub access_key_secret: String,
pub endpoint: String,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
}
impl Default for ObjectStoreConfig {

View File

@@ -19,6 +19,7 @@ use std::{fs, path};
use backon::ExponentialBackoff;
use catalog::remote::MetaKvBackend;
use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
use common_base::readable_size::ReadableSize;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::logging::info;
@@ -28,7 +29,8 @@ use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOpts;
use mito::config::EngineConfig as TableEngineConfig;
use mito::engine::MitoEngine;
use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::cache_policy::LruCachePolicy;
use object_store::layers::{CacheLayer, LoggingLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::services::fs::Builder as FsBuilder;
use object_store::services::oss::Builder as OSSBuilder;
use object_store::services::s3::Builder as S3Builder;
@@ -42,7 +44,9 @@ use table::table::numbers::NumbersTable;
use table::table::TableIdProviderRef;
use table::Table;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
use crate::datanode::{
DatanodeOptions, ObjectStoreConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE,
};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, Result,
@@ -240,7 +244,44 @@ pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Re
config: store_config.clone(),
})?;
Ok(ObjectStore::new(accessor))
create_object_store_with_cache(ObjectStore::new(accessor), store_config)
}
fn create_object_store_with_cache(
object_store: ObjectStore,
store_config: &ObjectStoreConfig,
) -> Result<ObjectStore> {
let (cache_path, cache_capacity) = match store_config {
ObjectStoreConfig::S3(s3_config) => {
let path = s3_config.cache_path.as_ref();
let capacity = s3_config
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
ObjectStoreConfig::Oss(oss_config) => {
let path = oss_config.cache_path.as_ref();
let capacity = oss_config
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
_ => (None, ReadableSize(0)),
};
if let Some(path) = cache_path {
let cache_store =
ObjectStore::new(FsBuilder::default().root(path).build().with_context(|_| {
error::InitBackendSnafu {
config: store_config.clone(),
}
})?);
let policy = LruCachePolicy::new(cache_capacity.0 as usize);
let cache_layer = CacheLayer::new(cache_store).with_policy(policy);
Ok(object_store.layer(cache_layer))
} else {
Ok(object_store)
}
}
pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
@@ -273,7 +314,7 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res
config: store_config.clone(),
})?;
Ok(ObjectStore::new(accessor))
create_object_store_with_cache(ObjectStore::new(accessor), store_config)
}
pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {

View File

@@ -5,6 +5,8 @@ edition.workspace = true
license.workspace = true
[dependencies]
lru = "0.9"
async-trait = "0.1"
futures = { version = "0.3" }
opendal = { version = "0.25.1", features = [
"layers-tracing",

View File

@@ -0,0 +1,123 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::num::NonZeroUsize;
use std::ops::DerefMut;
use std::sync::Arc;
use async_trait::async_trait;
use futures::future::BoxFuture;
use lru::LruCache;
use opendal::layers::CachePolicy;
use opendal::raw::output::Reader;
use opendal::raw::{Accessor, RpDelete, RpRead};
use opendal::{ErrorKind, OpDelete, OpRead, OpWrite, Result};
use tokio::sync::Mutex;
#[derive(Debug)]
pub struct LruCachePolicy {
lru_cache: Arc<Mutex<LruCache<String, ()>>>,
}
impl LruCachePolicy {
pub fn new(capacity: usize) -> Self {
Self {
lru_cache: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(capacity).unwrap(),
))),
}
}
fn cache_path(&self, path: &str, args: &OpRead) -> String {
format!("{}.cache-{}", path, args.range().to_header())
}
}
#[async_trait]
impl CachePolicy for LruCachePolicy {
fn on_read(
&self,
inner: Arc<dyn Accessor>,
cache: Arc<dyn Accessor>,
path: &str,
args: OpRead,
) -> BoxFuture<'static, Result<(RpRead, Reader)>> {
let path = path.to_string();
let cache_path = self.cache_path(&path, &args);
let lru_cache = self.lru_cache.clone();
Box::pin(async move {
match cache.read(&cache_path, OpRead::default()).await {
Ok(v) => {
// update lru when cache hit
let mut lru_cache = lru_cache.lock().await;
lru_cache.get_or_insert(cache_path.clone(), || ());
Ok(v)
}
Err(err) if err.kind() == ErrorKind::ObjectNotFound => {
let (rp, reader) = inner.read(&path, args.clone()).await?;
let size = rp.clone().into_metadata().content_length();
let _ = cache
.write(&cache_path, OpWrite::new(size), Box::new(reader))
.await?;
match cache.read(&cache_path, OpRead::default()).await {
Ok(v) => {
let r = {
// push new cache file name to lru
let mut lru_cache = lru_cache.lock().await;
lru_cache.push(cache_path.clone(), ())
};
// delete the evicted cache file
if let Some((k, _v)) = r {
let _ = cache.delete(&k, OpDelete::new()).await;
}
Ok(v)
}
Err(_) => return inner.read(&path, args).await,
}
}
Err(_) => return inner.read(&path, args).await,
}
})
}
fn on_delete(
&self,
inner: Arc<dyn Accessor>,
cache: Arc<dyn Accessor>,
path: &str,
args: OpDelete,
) -> BoxFuture<'static, Result<RpDelete>> {
let path = path.to_string();
let lru_cache = self.lru_cache.clone();
Box::pin(async move {
let cache_files: Vec<String> = {
let mut guard = lru_cache.lock().await;
let lru = guard.deref_mut();
let cache_files = lru
.iter()
.filter(|(k, _v)| k.starts_with(format!("{path}.cache-").as_str()))
.map(|(k, _v)| k.clone())
.collect::<Vec<_>>();
for k in &cache_files {
lru.pop(k);
}
cache_files
};
for file in cache_files {
let _ = cache.delete(&file, OpDelete::new()).await;
}
return inner.delete(&path, args).await;
})
}
}

View File

@@ -17,5 +17,6 @@ pub use opendal::{
Operator as ObjectStore, Result,
};
pub mod backend;
pub mod cache_policy;
pub mod test_util;
pub mod util;

View File

@@ -17,9 +17,12 @@ use std::env;
use anyhow::Result;
use common_telemetry::logging;
use object_store::backend::{fs, s3};
use object_store::cache_policy::LruCachePolicy;
use object_store::test_util::TempFolder;
use object_store::{util, Object, ObjectLister, ObjectMode, ObjectStore};
use opendal::layers::CacheLayer;
use opendal::services::oss;
use opendal::Operator;
use tempdir::TempDir;
async fn test_object_crud(store: &ObjectStore) -> Result<()> {
@@ -160,3 +163,108 @@ async fn test_oss_backend() -> Result<()> {
Ok(())
}
async fn assert_cache_files(
store: &Operator,
file_names: &[&str],
file_contents: &[&str],
) -> Result<()> {
let o = store.object("/");
let obs = o.list().await?;
let objects = util::collect(obs).await?;
// compare the cache file with the expected cache file; ignore orders
for o in objects {
let position = file_names.iter().position(|&x| x == o.name());
assert!(position.is_some(), "file not found: {}", o.name());
let position = position.unwrap();
let bs = o.read().await?;
assert_eq!(
file_contents[position],
String::from_utf8(bs.clone())?,
"file content not match: {}",
o.name()
);
}
Ok(())
}
#[tokio::test]
async fn test_object_store_cache_policy() -> Result<()> {
// create file storage
let root_dir = TempDir::new("test_fs_backend")?;
let store = ObjectStore::new(
fs::Builder::default()
.root(&root_dir.path().to_string_lossy())
.atomic_write_dir(&root_dir.path().to_string_lossy())
.build()?,
);
// create file cache layer
let cache_dir = TempDir::new("test_fs_cache")?;
let cache_op = ObjectStore::new(
fs::Builder::default()
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&cache_dir.path().to_string_lossy())
.build()?,
);
// create operator for cache dir to verify cache file
let cache_store = ObjectStore::from(cache_op.inner());
let policy = LruCachePolicy::new(3);
let store = store.layer(CacheLayer::new(cache_op).with_policy(policy));
// create several object handler.
let o1 = store.object("test_file1");
let o2 = store.object("test_file2");
// write data into object;
assert!(o1.write("Hello, object1!").await.is_ok());
assert!(o2.write("Hello, object2!").await.is_ok());
// crate cache by read object
o1.range_read(7..).await?;
o1.read().await?;
o2.range_read(7..).await?;
o2.read().await?;
assert_cache_files(
&cache_store,
&[
"test_file1.cache-bytes=0-",
"test_file2.cache-bytes=7-",
"test_file2.cache-bytes=0-",
],
&["Hello, object1!", "object2!", "Hello, object2!"],
)
.await?;
assert!(o2.delete().await.is_ok());
assert_cache_files(
&cache_store,
&["test_file1.cache-bytes=0-"],
&["Hello, object1!"],
)
.await?;
let o3 = store.object("test_file3");
assert!(o3.write("Hello, object3!").await.is_ok());
o3.read().await?;
o3.range_read(0..5).await?;
assert_cache_files(
&cache_store,
&[
"test_file1.cache-bytes=0-",
"test_file3.cache-bytes=0-",
"test_file3.cache-bytes=0-4",
],
&["Hello, object1!", "Hello, object3!", "Hello"],
)
.await?;
Ok(())
}

View File

@@ -102,6 +102,8 @@ fn get_test_store_config(
access_key_secret: env::var("GT_OSS_ACCESS_KEY").unwrap(),
bucket: env::var("GT_OSS_BUCKET").unwrap(),
endpoint: env::var("GT_OSS_ENDPOINT").unwrap(),
cache_path: None,
cache_capacity: None,
};
let accessor = oss::Builder::default()
@@ -130,6 +132,8 @@ fn get_test_store_config(
bucket: env::var("GT_S3_BUCKET").unwrap(),
endpoint: None,
region: None,
cache_path: None,
cache_capacity: None,
};
let accessor = s3::Builder::default()