fix: object store cache bug (#1482)

* feat: use streaming read instead of reading whole file

* feat: enable atomic writing for object store file caching

* fix: recover existing keys from local cache

* test: recovering keys from local file cache for LruCachePolicy

* Update src/datanode/src/instance.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: cr comments

* feat: md5 hash caching path

* fix: test

* fix: read cache

* Update src/object-store/src/cache_policy.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
dennis zhuang
2023-05-04 18:25:40 +08:00
committed by GitHub
parent c471007edd
commit b1920c41a4
6 changed files with 136 additions and 63 deletions

1
Cargo.lock generated
View File

@@ -5378,6 +5378,7 @@ dependencies = [
"common-test-util",
"futures",
"lru 0.9.0",
"md5",
"metrics",
"opendal",
"pin-project",

View File

@@ -21,8 +21,6 @@ use snafu::Location;
use storage::error::Error as StorageError;
use table::error::Error as TableError;
use crate::datanode::ObjectStoreConfig;
/// Business error of datanode.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -210,9 +208,8 @@ pub enum Error {
#[snafu(display("Failed to storage engine, source: {}", source))]
OpenStorageEngine { source: StorageError },
#[snafu(display("Failed to init backend, config: {:#?}, source: {}", config, source))]
#[snafu(display("Failed to init backend, source: {}", source))]
InitBackend {
config: Box<ObjectStoreConfig>,
source: object_store::Error,
location: Location,
},

View File

@@ -379,15 +379,13 @@ pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Re
.access_key_secret(oss_config.access_key_secret.expose_secret());
let object_store = ObjectStore::new(builder)
.with_context(|_| error::InitBackendSnafu {
config: store_config.clone(),
})?
.context(error::InitBackendSnafu)?
.finish();
create_object_store_with_cache(object_store, store_config)
create_object_store_with_cache(object_store, store_config).await
}
fn create_object_store_with_cache(
async fn create_object_store_with_cache(
object_store: ObjectStore,
store_config: &ObjectStoreConfig,
) -> Result<ObjectStore> {
@@ -410,14 +408,17 @@ fn create_object_store_with_cache(
};
if let Some(path) = cache_path {
let cache_store =
FsBuilder::default()
.root(path)
.build()
.with_context(|_| error::InitBackendSnafu {
config: store_config.clone(),
})?;
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize);
let atomic_temp_dir = format!("{path}/.tmp/");
clean_temp_dir(&atomic_temp_dir)?;
let cache_store = FsBuilder::default()
.root(path)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::InitBackendSnafu)?;
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.await
.context(error::InitBackendSnafu)?;
Ok(object_store.layer(cache_layer))
} else {
Ok(object_store)
@@ -452,12 +453,21 @@ pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Res
create_object_store_with_cache(
ObjectStore::new(builder)
.with_context(|_| error::InitBackendSnafu {
config: store_config.clone(),
})?
.context(error::InitBackendSnafu)?
.finish(),
store_config,
)
.await
}
fn clean_temp_dir(dir: &str) -> Result<()> {
if path::Path::new(&dir).exists() {
info!("Begin to clean temp storage directory: {}", dir);
fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?;
info!("Cleaned temp storage directory: {}", dir);
}
Ok(())
}
pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result<ObjectStore> {
@@ -471,24 +481,13 @@ pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Res
info!("The file storage directory is: {}", &data_dir);
let atomic_write_dir = format!("{data_dir}/.tmp/");
if path::Path::new(&atomic_write_dir).exists() {
info!(
"Begin to clean temp storage directory: {}",
&atomic_write_dir
);
fs::remove_dir_all(&atomic_write_dir).context(error::RemoveDirSnafu {
dir: &atomic_write_dir,
})?;
info!("Cleaned temp storage directory: {}", &atomic_write_dir);
}
clean_temp_dir(&atomic_write_dir)?;
let mut builder = FsBuilder::default();
builder.root(&data_dir).atomic_write_dir(&atomic_write_dir);
let object_store = ObjectStore::new(builder)
.context(error::InitBackendSnafu {
config: store_config.clone(),
})?
.context(error::InitBackendSnafu)?
.finish();
Ok(object_store)

View File

@@ -9,6 +9,7 @@ lru = "0.9"
async-trait = "0.1"
bytes = "1.4"
futures = { version = "0.3" }
md5 = "0.7"
metrics = "0.20"
opendal = { version = "0.33", features = ["layers-tracing", "layers-metrics"] }
pin-project = "1.0"

View File

@@ -17,11 +17,10 @@ use std::ops::DerefMut;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use lru::LruCache;
use metrics::increment_counter;
use opendal::ops::{OpDelete, OpList, OpRead, OpScan, OpWrite};
use opendal::raw::oio::{Read, Reader, Write};
use opendal::raw::oio::{Page, Read, Reader, Write};
use opendal::raw::{Accessor, Layer, LayeredAccessor, RpDelete, RpList, RpRead, RpScan, RpWrite};
use opendal::{ErrorKind, Result};
use tokio::sync::Mutex;
@@ -31,19 +30,41 @@ use crate::metrics::{
OBJECT_STORE_LRU_CACHE_MISS,
};
#[derive(Clone)]
pub struct LruCacheLayer<C> {
cache: Arc<C>,
lru_cache: Arc<Mutex<LruCache<String, ()>>>,
}
impl<C: Accessor> LruCacheLayer<C> {
pub fn new(cache: Arc<C>, capacity: usize) -> Self {
Self {
impl<C: Accessor + Clone> LruCacheLayer<C> {
pub async fn new(cache: Arc<C>, capacity: usize) -> Result<Self> {
let layer = Self {
cache,
lru_cache: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(capacity).unwrap(),
))),
};
layer.recover_keys().await?;
Ok(layer)
}
/// Recover existing keys from `cache` to `lru_cache`.
async fn recover_keys(&self) -> Result<()> {
let (_, mut pager) = self.cache.list("/", OpList::default()).await?;
let mut lru_cache = self.lru_cache.lock().await;
while let Some(entries) = pager.next().await? {
for entry in entries {
lru_cache.push(entry.path().to_string(), ());
}
}
Ok(())
}
pub async fn lru_contains_key(&self, key: &str) -> bool {
self.lru_cache.lock().await.contains(key)
}
}
@@ -68,7 +89,11 @@ pub struct LruCacheAccessor<I, C> {
impl<I, C> LruCacheAccessor<I, C> {
fn cache_path(&self, path: &str, args: &OpRead) -> String {
format!("{}.cache-{}", path, args.range().to_header())
format!(
"{:x}.cache-{}",
md5::compute(path),
args.range().to_header()
)
}
}
@@ -91,9 +116,10 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let path = path.to_string();
let cache_path = self.cache_path(&path, &args);
let lru_cache = self.lru_cache.clone();
let lru_cache = &self.lru_cache;
match self.cache.read(&cache_path, args.clone()).await {
// the args is already in the cache path, so we must create a new OpRead.
match self.cache.read(&cache_path, OpRead::default()).await {
Ok((rp, r)) => {
increment_counter!(OBJECT_STORE_LRU_CACHE_HIT);
@@ -105,18 +131,16 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
Err(err) if err.kind() == ErrorKind::NotFound => {
increment_counter!(OBJECT_STORE_LRU_CACHE_MISS);
let (rp, mut reader) = self.inner.read(&path, args.clone()).await?;
let size = rp.clone().into_metadata().content_length();
let (_, mut reader) = self.inner.read(&path, args.clone()).await?;
let (_, mut writer) = self.cache.write(&cache_path, OpWrite::new()).await?;
// TODO(hl): We can use [Writer::append](https://docs.rs/opendal/0.30.4/opendal/struct.Writer.html#method.append)
// here to avoid loading whole file into memory once all our backend supports `Writer`.
let mut buf = vec![0; size as usize];
reader.read(&mut buf).await?;
writer.write(Bytes::from(buf)).await?;
while let Some(bytes) = reader.next().await {
writer.write(bytes?).await?;
}
writer.close().await?;
match self.cache.read(&cache_path, args.clone()).await {
match self.cache.read(&cache_path, OpRead::default()).await {
Ok((rp, reader)) => {
let r = {
// push new cache file name to lru
@@ -144,15 +168,15 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
}
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let path = path.to_string();
let lru_cache = self.lru_cache.clone();
let cache_path = md5::compute(path);
let lru_cache = &self.lru_cache;
let cache_files: Vec<String> = {
let mut guard = lru_cache.lock().await;
let lru = guard.deref_mut();
let cache_files = lru
.iter()
.filter(|(k, _v)| k.starts_with(format!("{path}.cache-").as_str()))
.filter(|(k, _v)| k.starts_with(format!("{:x}.cache-", cache_path).as_str()))
.map(|(k, _v)| k.clone())
.collect::<Vec<_>>();
for k in &cache_files {
@@ -163,7 +187,7 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
for file in cache_files {
let _ = self.cache.delete(&file, OpDelete::new()).await;
}
return self.inner.delete(&path, args).await;
self.inner.delete(path, args).await
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {

View File

@@ -22,6 +22,7 @@ use object_store::cache_policy::LruCacheLayer;
use object_store::services::{Fs, S3};
use object_store::test_util::TempFolder;
use object_store::{util, ObjectStore, ObjectStoreBuilder};
use opendal::raw::Accessor;
use opendal::services::Oss;
use opendal::{EntryMode, Operator, OperatorBuilder};
@@ -157,6 +158,15 @@ async fn test_oss_backend() -> Result<()> {
Ok(())
}
async fn assert_lru_cache<C: Accessor + Clone>(
cache_layer: &LruCacheLayer<C>,
file_names: &[&str],
) {
for file_name in file_names {
assert!(cache_layer.lru_contains_key(file_name).await);
}
}
async fn assert_cache_files(
store: &Operator,
file_names: &[&str],
@@ -188,7 +198,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
common_telemetry::init_default_ut_logging();
common_telemetry::init_default_metrics_recorder();
// create file storage
let root_dir = create_temp_dir("test_fs_backend");
let root_dir = create_temp_dir("test_object_store_cache_policy");
let store = OperatorBuilder::new(
Fs::default()
.root(&root_dir.path().to_string_lossy())
@@ -199,7 +209,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
.finish();
// create file cache layer
let cache_dir = create_temp_dir("test_fs_cache");
let cache_dir = create_temp_dir("test_object_store_cache_policy_cache");
let mut builder = Fs::default();
builder
.root(&cache_dir.path().to_string_lossy())
@@ -208,7 +218,10 @@ async fn test_object_store_cache_policy() -> Result<()> {
let cache_store = OperatorBuilder::new(cache_accessor.clone()).finish();
// create operator for cache dir to verify cache file
let store = store.layer(LruCacheLayer::new(Arc::new(cache_accessor), 3));
let cache_layer = LruCacheLayer::new(Arc::new(cache_accessor.clone()), 3)
.await
.unwrap();
let store = store.layer(cache_layer.clone());
// create several object handler.
// write data into object;
@@ -221,27 +234,41 @@ async fn test_object_store_cache_policy() -> Result<()> {
store.range_read(p1, 0..).await?;
store.read(p1).await?;
store.range_read(p2, 0..).await?;
store.range_read(p2, 7..).await?;
store.read(p2).await?;
assert_cache_files(
&cache_store,
&[
"test_file1.cache-bytes=0-",
"test_file2.cache-bytes=7-",
"test_file2.cache-bytes=0-",
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-",
],
&["Hello, object1!", "object2!", "Hello, object2!"],
)
.await?;
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-",
],
)
.await;
store.delete(p2).await.unwrap();
assert_cache_files(
&cache_store,
&["test_file1.cache-bytes=0-"],
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"],
&["Hello, object1!"],
)
.await?;
assert_lru_cache(
&cache_layer,
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"],
)
.await;
let p3 = "test_file3";
store.write(p3, "Hello, object3!").await.unwrap();
@@ -252,13 +279,22 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert_cache_files(
&cache_store,
&[
"test_file1.cache-bytes=0-",
"test_file3.cache-bytes=0-",
"test_file3.cache-bytes=0-4",
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
&["Hello, object1!", "Hello, object3!", "Hello"],
)
.await?;
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)
.await;
let handle = metric::try_handle().unwrap();
let metric_text = handle.render();
@@ -266,5 +302,20 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert!(metric_text.contains("object_store_lru_cache_hit"));
assert!(metric_text.contains("object_store_lru_cache_miss"));
drop(cache_layer);
let cache_layer = LruCacheLayer::new(Arc::new(cache_accessor), 3)
.await
.unwrap();
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)
.await;
Ok(())
}