feat: improve object storage cache (#2522)

* feat: refactor object storage cache with moka

* chore: minor fixes

* fix: concurrent issues and invalidate cache after write/delete

* chore: minor changes

* fix: cargo lock

* refactor: rename

* chore: change DEFAULT_OBJECT_STORE_CACHE_SIZE to 256Mib

* fix: typo

* chore: style

* fix: toml format

* chore: toml

* fix: toml format

* Update src/object-store/src/layers/lru_cache/read_cache.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* chore: update Cargo.toml

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: update src/object-store/Cargo.toml

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: refactor and apply suggestions

* fix: typo

* feat: adds back allow list for caching

* chore: cr suggestion

Co-authored-by: Yingwen <realevenyag@gmail.com>

* chore: cr suggestion

Co-authored-by: Yingwen <realevenyag@gmail.com>

* refactor: wrap inner Accessor with Arc

* chore: remove run_pending_task in read and write path

* chore: the arc is unnecessary

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
dennis zhuang
2023-10-08 11:27:49 +08:00
committed by GitHub
parent 657542c0b8
commit ff15bc41d6
15 changed files with 499 additions and 304 deletions

114
Cargo.lock generated
View File

@@ -579,26 +579,6 @@ dependencies = [
"zstd-safe 6.0.6",
]
[[package]]
name = "async-io"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af"
dependencies = [
"async-lock",
"autocfg",
"cfg-if 1.0.0",
"concurrent-queue",
"futures-lite",
"log",
"parking",
"polling",
"rustix 0.37.23",
"slab",
"socket2 0.4.9",
"waker-fn",
]
[[package]]
name = "async-lock"
version = "2.8.0"
@@ -3526,21 +3506,6 @@ version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
[[package]]
name = "futures-lite"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce"
dependencies = [
"fastrand 1.9.0",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]]
name = "futures-macro"
version = "0.3.28"
@@ -5012,12 +4977,6 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4"
[[package]]
name = "linux-raw-sys"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]]
name = "linux-raw-sys"
version = "0.4.5"
@@ -5122,15 +5081,6 @@ dependencies = [
"vob",
]
[[package]]
name = "lru"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71e7d46de488603ffdd5f30afbc64fbba2378214a2c3a2fb83abf3d33126df17"
dependencies = [
"hashbrown 0.13.2",
]
[[package]]
name = "lru"
version = "0.10.1"
@@ -5598,12 +5548,12 @@ dependencies = [
[[package]]
name = "moka"
version = "0.11.3"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa6e72583bf6830c956235bff0d5afec8cf2952f579ebad18ae7821a917d950f"
checksum = "8dc65d4615c08c8a13d91fd404b5a2a4485ba35b4091e3315cf8798d280c2f29"
dependencies = [
"async-io",
"async-lock",
"async-trait",
"crossbeam-channel",
"crossbeam-epoch",
"crossbeam-utils",
@@ -5612,7 +5562,6 @@ dependencies = [
"parking_lot 0.12.1",
"quanta 0.11.1",
"rustc_version",
"scheduled-thread-pool",
"skeptic",
"smallvec",
"tagptr",
@@ -5666,7 +5615,7 @@ dependencies = [
"futures-sink",
"futures-util",
"lazy_static",
"lru 0.10.1",
"lru",
"mio",
"mysql_common",
"once_cell",
@@ -6009,14 +5958,14 @@ dependencies = [
"anyhow",
"async-trait",
"bytes",
"common-runtime",
"common-telemetry",
"common-test-util",
"futures",
"lru 0.9.0",
"md5",
"metrics",
"moka",
"opendal",
"pin-project",
"tokio",
"uuid",
]
@@ -6397,12 +6346,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "parking"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14f2252c834a40ed9bb5422029649578e63aa341ac401f74e719dd1afda8394e"
[[package]]
name = "parking_lot"
version = "0.11.2"
@@ -6834,22 +6777,6 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "polling"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce"
dependencies = [
"autocfg",
"bitflags 1.3.2",
"cfg-if 1.0.0",
"concurrent-queue",
"libc",
"log",
"pin-project-lite",
"windows-sys 0.48.0",
]
[[package]]
name = "portable-atomic"
version = "0.3.20"
@@ -8058,20 +7985,6 @@ dependencies = [
"windows-sys 0.45.0",
]
[[package]]
name = "rustix"
version = "0.37.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d69718bf81c6127a49dc64e44a742e8bb9213c0ff8869a22c308f84c1d4ab06"
dependencies = [
"bitflags 1.3.2",
"errno 0.3.3",
"io-lifetimes",
"libc",
"linux-raw-sys 0.3.8",
"windows-sys 0.48.0",
]
[[package]]
name = "rustix"
version = "0.38.10"
@@ -8577,15 +8490,6 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot 0.12.1",
]
[[package]]
name = "schemars"
version = "0.8.13"
@@ -11196,12 +11100,6 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8e76fae08f03f96e166d2dfda232190638c10e0383841252416f9cfe2ae60e6"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]]
name = "walkdir"
version = "2.3.3"

View File

@@ -82,7 +82,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
moka = { version = "0.11" }
moka = "0.12"
once_cell = "1.18"
opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] }
parquet = "43.0"

View File

@@ -47,6 +47,12 @@ type = "File"
# TTL for all tables. Disabled by default.
# global_ttl = "7d"
# Cache configuration for object storage such as 'S3' etc.
# The local file cache directory
# cache_path = "/path/local_cache"
# The local file cache capacity in bytes.
# cache_capacity = "256Mib"
# Compaction options, see `standalone.example.toml`.
[storage.compaction]
max_inflight_tasks = 4

View File

@@ -115,6 +115,10 @@ data_home = "/tmp/greptimedb/"
type = "File"
# TTL for all tables. Disabled by default.
# global_ttl = "7d"
# Cache configuration for object storage such as 'S3' etc.
# cache_path = "/path/local_cache"
# The local file cache capacity in bytes.
# cache_capacity = "256Mib"
# Compaction options.
[storage.compaction]

View File

@@ -37,7 +37,7 @@ use storage::config::{
};
use storage::scheduler::SchedulerConfig;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024);
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::mb(256);
/// Default data home in file storage
const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb";
@@ -90,6 +90,15 @@ impl Default for StorageConfig {
#[serde(default)]
pub struct FileConfig {}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct ObjectStorageCacheConfig {
/// The local file cache directory
pub cache_path: Option<String>,
/// The cache capacity in bytes
pub cache_capacity: Option<ReadableSize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct S3Config {
@@ -101,8 +110,8 @@ pub struct S3Config {
pub secret_access_key: SecretString,
pub endpoint: Option<String>,
pub region: Option<String>,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -115,8 +124,8 @@ pub struct OssConfig {
#[serde(skip_serializing)]
pub access_key_secret: SecretString,
pub endpoint: String,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -130,8 +139,8 @@ pub struct AzblobConfig {
pub account_key: SecretString,
pub endpoint: String,
pub sas_token: Option<String>,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -143,8 +152,8 @@ pub struct GcsConfig {
#[serde(skip_serializing)]
pub credential_path: SecretString,
pub endpoint: String,
pub cache_path: Option<String>,
pub cache_capacity: Option<ReadableSize>,
#[serde(flatten)]
pub cache: ObjectStorageCacheConfig,
}
impl Default for S3Config {
@@ -156,8 +165,7 @@ impl Default for S3Config {
secret_access_key: SecretString::from(String::default()),
endpoint: Option::default(),
region: Option::default(),
cache_path: Option::default(),
cache_capacity: Option::default(),
cache: ObjectStorageCacheConfig::default(),
}
}
}
@@ -170,8 +178,7 @@ impl Default for OssConfig {
access_key_id: SecretString::from(String::default()),
access_key_secret: SecretString::from(String::default()),
endpoint: String::default(),
cache_path: Option::default(),
cache_capacity: Option::default(),
cache: ObjectStorageCacheConfig::default(),
}
}
}
@@ -184,9 +191,8 @@ impl Default for AzblobConfig {
account_name: SecretString::from(String::default()),
account_key: SecretString::from(String::default()),
endpoint: String::default(),
cache_path: Option::default(),
cache_capacity: Option::default(),
sas_token: Option::default(),
cache: ObjectStorageCacheConfig::default(),
}
}
}
@@ -199,8 +205,7 @@ impl Default for GcsConfig {
scope: String::default(),
credential_path: SecretString::from(String::default()),
endpoint: String::default(),
cache_path: Option::default(),
cache_capacity: Option::default(),
cache: ObjectStorageCacheConfig::default(),
}
}
}

View File

@@ -19,7 +19,6 @@ use std::sync::Arc;
use catalog::kvbackend::MetaKvBackend;
use catalog::memory::MemoryCatalogManager;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
@@ -62,8 +61,6 @@ use crate::region_server::RegionServer;
use crate::server::Services;
use crate::store;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024);
const OPEN_REGION_PARALLELISM: usize = 16;
/// Datanode service.

View File

@@ -76,29 +76,33 @@ async fn create_object_store_with_cache(
) -> Result<ObjectStore> {
let (cache_path, cache_capacity) = match store_config {
ObjectStoreConfig::S3(s3_config) => {
let path = s3_config.cache_path.as_ref();
let path = s3_config.cache.cache_path.as_ref();
let capacity = s3_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
ObjectStoreConfig::Oss(oss_config) => {
let path = oss_config.cache_path.as_ref();
let path = oss_config.cache.cache_path.as_ref();
let capacity = oss_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
ObjectStoreConfig::Azblob(azblob_config) => {
let path = azblob_config.cache_path.as_ref();
let path = azblob_config.cache.cache_path.as_ref();
let capacity = azblob_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
}
ObjectStoreConfig::Gcs(gcs_config) => {
let path = gcs_config.cache_path.as_ref();
let path = gcs_config.cache.cache_path.as_ref();
let capacity = gcs_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(path, capacity)
@@ -119,6 +123,12 @@ async fn create_object_store_with_cache(
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.await
.context(error::InitBackendSnafu)?;
info!(
"Enabled local object storage cache, path: {}, capacity: {}.",
path, cache_capacity
);
Ok(object_store.layer(cache_layer))
} else {
Ok(object_store)

View File

@@ -40,7 +40,7 @@ humantime-serde = { workspace = true }
lazy_static = "1.4"
memcomparable = "0.2"
metrics.workspace = true
moka.workspace = true
moka = { workspace = true, features = ["sync"] }
object-store = { workspace = true }
parquet = { workspace = true, features = ["async"] }
paste.workspace = true

View File

@@ -7,19 +7,20 @@ license.workspace = true
[dependencies]
async-trait = "0.1"
bytes = "1.4"
futures = { version = "0.3" }
lru = "0.9"
common-runtime.workspace = true
common-telemetry.workspace = true
futures.workspace = true
md5 = "0.7"
metrics.workspace = true
moka = { workspace = true, features = ["future"] }
opendal = { version = "0.40", features = [
"layers-tracing",
"layers-metrics",
] }
pin-project = "1.0"
tokio.workspace = true
uuid.workspace = true
[dev-dependencies]
anyhow = "1.0"
common-telemetry = { workspace = true }
common-test-util = { workspace = true }
tokio.workspace = true

View File

@@ -12,101 +12,70 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::num::NonZeroUsize;
use std::ops::DerefMut;
use std::sync::Arc;
use async_trait::async_trait;
use lru::LruCache;
use metrics::increment_counter;
use opendal::raw::oio::{Page, Read, ReadExt, Reader, WriteExt};
use opendal::raw::oio::Read;
use opendal::raw::{
Accessor, Layer, LayeredAccessor, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
RpWrite,
};
use opendal::{ErrorKind, Result};
use tokio::sync::Mutex;
use crate::metrics::{
OBJECT_STORE_LRU_CACHE_ERROR, OBJECT_STORE_LRU_CACHE_ERROR_KIND, OBJECT_STORE_LRU_CACHE_HIT,
OBJECT_STORE_LRU_CACHE_MISS,
};
use opendal::Result;
mod read_cache;
use common_telemetry::logging::info;
use read_cache::ReadCache;
/// An opendal layer with local LRU file cache supporting.
#[derive(Clone)]
pub struct LruCacheLayer<C> {
cache: Arc<C>,
lru_cache: Arc<Mutex<LruCache<String, ()>>>,
pub struct LruCacheLayer<C: Clone> {
// The read cache
read_cache: ReadCache<C>,
}
impl<C: Accessor + Clone> LruCacheLayer<C> {
pub async fn new(cache: Arc<C>, capacity: usize) -> Result<Self> {
let layer = Self {
cache,
lru_cache: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(capacity).unwrap(),
))),
};
layer.recover_keys().await?;
/// Create a `[LruCacheLayer]` with local file cache and capacity in bytes.
pub async fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
let read_cache = ReadCache::new(file_cache, capacity);
let (entries, bytes) = read_cache.recover_cache().await?;
Ok(layer)
info!(
"Recovered {} entries and total size {} in bytes for LruCacheLayer",
entries, bytes
);
Ok(Self { read_cache })
}
/// Recover existing keys from `cache` to `lru_cache`.
async fn recover_keys(&self) -> Result<()> {
let (_, mut pager) = self.cache.list("/", OpList::default()).await?;
let mut lru_cache = self.lru_cache.lock().await;
while let Some(entries) = pager.next().await? {
for entry in entries {
let _ = lru_cache.push(entry.path().to_string(), ());
}
}
Ok(())
/// Returns true when the local cache contains the specific file
pub async fn contains_file(&self, path: &str) -> bool {
self.read_cache.contains_file(path).await
}
pub async fn lru_contains_key(&self, key: &str) -> bool {
self.lru_cache.lock().await.contains(key)
/// Returns the read cache statistics info `(EntryCount, SizeInBytes)`.
pub async fn read_cache_stat(&self) -> (u64, u64) {
self.read_cache.stat().await
}
}
impl<I: Accessor, C: Accessor> Layer<I> for LruCacheLayer<C> {
impl<I: Accessor, C: Accessor + Clone> Layer<I> for LruCacheLayer<C> {
type LayeredAccessor = LruCacheAccessor<I, C>;
fn layer(&self, inner: I) -> Self::LayeredAccessor {
LruCacheAccessor {
inner,
cache: self.cache.clone(),
lru_cache: self.lru_cache.clone(),
read_cache: self.read_cache.clone(),
}
}
}
#[derive(Debug)]
pub struct LruCacheAccessor<I, C> {
pub struct LruCacheAccessor<I, C: Clone> {
inner: I,
cache: Arc<C>,
lru_cache: Arc<Mutex<LruCache<String, ()>>>,
}
/// Returns true when the path of the file can be cached.
fn can_cache(path: &str) -> bool {
// TODO(dennis): find a better way
!path.ends_with("_last_checkpoint")
}
impl<I, C> LruCacheAccessor<I, C> {
fn cache_path(&self, path: &str, args: &OpRead) -> String {
format!(
"{:x}.cache-{}",
md5::compute(path),
args.range().to_header()
)
}
read_cache: ReadCache<C>,
}
#[async_trait]
impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
impl<I: Accessor, C: Accessor + Clone> LayeredAccessor for LruCacheAccessor<I, C> {
type Inner = I;
type Reader = Box<dyn Read>;
type BlockingReader = I::BlockingReader;
@@ -120,84 +89,27 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
if !can_cache(path) {
return self.inner.read(path, args).await.map(to_output_reader);
}
let path = path.to_string();
let cache_path = self.cache_path(&path, &args);
let lru_cache = &self.lru_cache;
// the args is already in the cache path, so we must create a new OpRead.
match self.cache.read(&cache_path, OpRead::default()).await {
Ok((rp, r)) => {
increment_counter!(OBJECT_STORE_LRU_CACHE_HIT);
// update lru when cache hit
let mut lru_cache = lru_cache.lock().await;
let _ = lru_cache.get_or_insert(cache_path.clone(), || ());
Ok(to_output_reader((rp, r)))
}
Err(err) if err.kind() == ErrorKind::NotFound => {
increment_counter!(OBJECT_STORE_LRU_CACHE_MISS);
let (_, mut reader) = self.inner.read(&path, args.clone()).await?;
let (_, mut writer) = self.cache.write(&cache_path, OpWrite::new()).await?;
while let Some(bytes) = reader.next().await {
writer.write(&bytes?).await?;
}
writer.close().await?;
match self.cache.read(&cache_path, OpRead::default()).await {
Ok((rp, reader)) => {
let r = {
// push new cache file name to lru
let mut lru_cache = lru_cache.lock().await;
lru_cache.push(cache_path.clone(), ())
};
// delete the evicted cache file
if let Some((k, _v)) = r {
let _ = self.cache.delete(&k, OpDelete::new()).await;
}
return Ok(to_output_reader((rp, reader)));
}
Err(_) => return self.inner.read(&path, args).await.map(to_output_reader),
}
}
Err(err) => {
increment_counter!(OBJECT_STORE_LRU_CACHE_ERROR, OBJECT_STORE_LRU_CACHE_ERROR_KIND => format!("{}", err.kind()));
return self.inner.read(&path, args).await.map(to_output_reader);
}
}
self.read_cache.read(&self.inner, path, args).await
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
self.inner.write(path, args).await
let result = self.inner.write(path, args).await;
self.read_cache
.invalidate_entries_with_prefix(format!("{:x}", md5::compute(path)))
.await;
result
}
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let cache_path = md5::compute(path);
let lru_cache = &self.lru_cache;
let result = self.inner.delete(path, args).await;
let cache_files: Vec<String> = {
let mut guard = lru_cache.lock().await;
let lru = guard.deref_mut();
let cache_files = lru
.iter()
.filter(|(k, _v)| k.starts_with(format!("{:x}.cache-", cache_path).as_str()))
.map(|(k, _v)| k.clone())
.collect::<Vec<_>>();
for k in &cache_files {
let _ = lru.pop(k);
}
cache_files
};
for file in cache_files {
let _ = self.cache.delete(&file, OpDelete::new()).await;
}
self.inner.delete(path, args).await
self.read_cache
.invalidate_entries_with_prefix(format!("{:x}", md5::compute(path)))
.await;
result
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
@@ -205,35 +117,20 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
}
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
// TODO(dennis): support blocking read cache
self.inner.blocking_read(path, args)
}
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
self.inner.blocking_write(path, args)
let result = self.inner.blocking_write(path, args);
self.read_cache
.blocking_invalidate_entries_with_prefix(format!("{:x}", md5::compute(path)));
result
}
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
self.inner.blocking_list(path, args)
}
}
#[inline]
fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
(input.0, Box::new(input.1))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_can_cache() {
assert!(can_cache("test"));
assert!(can_cache("a/b/c.parquet"));
assert!(can_cache("1.json"));
assert!(can_cache("100.checkpoint"));
assert!(can_cache("test/last_checkpoint"));
assert!(!can_cache("test/__last_checkpoint"));
assert!(!can_cache("a/b/c/__last_checkpoint"));
}
}

View File

@@ -0,0 +1,290 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_telemetry::logging::debug;
use futures::FutureExt;
use metrics::{decrement_gauge, increment_counter, increment_gauge};
use moka::future::Cache;
use moka::notification::ListenerFuture;
use opendal::raw::oio::{Page, Read, ReadExt, Reader, WriteExt};
use opendal::raw::{Accessor, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead};
use opendal::{Error as OpendalError, ErrorKind, Result};
use crate::metrics::{
OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
OBJECT_STORE_LRU_CACHE_MISS, OBJECT_STORE_READ_ERROR,
};
/// Cache value for read file
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
enum ReadResult {
// Read success with size
Success(u32),
// File not found
NotFound,
}
impl ReadResult {
fn size_bytes(&self) -> u32 {
match self {
ReadResult::NotFound => 0,
ReadResult::Success(size) => *size,
}
}
}
/// Returns true when the path of the file can be cached.
fn can_cache(path: &str) -> bool {
// TODO(dennis): find a better way
!path.ends_with("_last_checkpoint")
}
/// Generate an unique cache key for the read path and range.
fn read_cache_key(path: &str, args: &OpRead) -> String {
format!(
"{:x}.cache-{}",
md5::compute(path),
args.range().to_header()
)
}
/// Local read cache for files in object storage
#[derive(Clone, Debug)]
pub(crate) struct ReadCache<C: Clone> {
/// Local file cache backend
file_cache: Arc<C>,
/// Local memory cache to track local cache files
mem_cache: Cache<String, ReadResult>,
}
impl<C: Accessor + Clone> ReadCache<C> {
/// Create a [`ReadCache`] with capacity in bytes.
pub(crate) fn new(file_cache: Arc<C>, capacity: usize) -> Self {
let file_cache_cloned = file_cache.clone();
let eviction_listener =
move |read_key: Arc<String>, read_result: ReadResult, cause| -> ListenerFuture {
// Delete the file from local file cache when it's purged from mem_cache.
decrement_gauge!(OBJECT_STORE_LRU_CACHE_ENTRIES, 1.0);
let file_cache_cloned = file_cache_cloned.clone();
async move {
if let ReadResult::Success(size) = read_result {
decrement_gauge!(OBJECT_STORE_LRU_CACHE_BYTES, size as f64);
let result = file_cache_cloned.delete(&read_key, OpDelete::new()).await;
debug!(
"Deleted local cache file `{}`, result: {:?}, cause: {:?}.",
read_key, result, cause
);
}
}
.boxed()
};
Self {
file_cache,
mem_cache: Cache::builder()
.max_capacity(capacity as u64)
.weigher(|_key, value: &ReadResult| -> u32 {
// TODO(dennis): add key's length to weight?
value.size_bytes()
})
.async_eviction_listener(eviction_listener)
.support_invalidation_closures()
.build(),
}
}
/// Returns the cache's entry count and total approximate entry size in bytes.
pub(crate) async fn stat(&self) -> (u64, u64) {
self.mem_cache.run_pending_tasks().await;
(self.mem_cache.entry_count(), self.mem_cache.weighted_size())
}
/// Invalidte all cache items which key starts with `prefix`.
pub(crate) async fn invalidate_entries_with_prefix(&self, prefix: String) {
// Safety: always ok when building cache with `support_invalidation_closures`.
self.mem_cache
.invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
.ok();
}
/// Blocking version of `invalidate_entries_with_prefix`.
pub(crate) fn blocking_invalidate_entries_with_prefix(&self, prefix: String) {
// Safety: always ok when building cache with `support_invalidation_closures`.
self.mem_cache
.invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
.ok();
}
/// Recover existing cache items from `file_cache` to `mem_cache`.
/// Return entry count and total approximate entry size in bytes.
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
let (_, mut pager) = self.file_cache.list("/", OpList::default()).await?;
while let Some(entries) = pager.next().await? {
for entry in entries {
let read_key = entry.path();
// We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly,
// because it's private field.
let size = {
let stat = self.file_cache.stat(read_key, OpStat::default()).await?;
stat.into_metadata().content_length()
};
increment_gauge!(OBJECT_STORE_LRU_CACHE_ENTRIES, 1.0);
increment_gauge!(OBJECT_STORE_LRU_CACHE_BYTES, size as f64);
self.mem_cache
.insert(read_key.to_string(), ReadResult::Success(size as u32))
.await;
}
}
Ok(self.stat().await)
}
/// Returns true when the read cache contains the specific file.
pub(crate) async fn contains_file(&self, path: &str) -> bool {
self.mem_cache.run_pending_tasks().await;
self.mem_cache.contains_key(path)
&& self.file_cache.stat(path, OpStat::default()).await.is_ok()
}
/// Read from a specific path using the OpRead operation.
/// It will attempt to retrieve the data from the local cache.
/// If the data is not found in the local cache,
/// it will fallback to retrieving it from remote object storage
/// and cache the result locally.
pub(crate) async fn read<I>(
&self,
inner: &I,
path: &str,
args: OpRead,
) -> Result<(RpRead, Box<dyn Read>)>
where
I: Accessor,
{
if !can_cache(path) {
return inner.read(path, args).await.map(to_output_reader);
}
let read_key = read_cache_key(path, &args);
let read_result = self
.mem_cache
.try_get_with(
read_key.clone(),
self.read_remote(inner, &read_key, path, args.clone()),
)
.await
.map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?;
match read_result {
ReadResult::Success(_) => {
// There is a concurrent issue here, the local cache may be purged
// while reading, we have to fallback to remote read
match self.file_cache.read(&read_key, OpRead::default()).await {
Ok(ret) => {
increment_counter!(OBJECT_STORE_LRU_CACHE_HIT, "result" => "success");
Ok(to_output_reader(ret))
}
Err(_) => {
increment_counter!(OBJECT_STORE_LRU_CACHE_MISS);
inner.read(path, args).await.map(to_output_reader)
}
}
}
ReadResult::NotFound => {
increment_counter!(OBJECT_STORE_LRU_CACHE_HIT, "result" => "not_found");
Err(OpendalError::new(
ErrorKind::NotFound,
&format!("File not found: {path}"),
))
}
}
}
/// Read the file from remote storage. If success, write the content into local cache.
async fn read_remote<I>(
&self,
inner: &I,
read_key: &str,
path: &str,
args: OpRead,
) -> Result<ReadResult>
where
I: Accessor,
{
increment_counter!(OBJECT_STORE_LRU_CACHE_MISS);
let inner_result = inner.read(path, args).await;
match inner_result {
Ok((rp, mut reader)) => {
let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?;
while let Some(bytes) = reader.next().await {
writer.write(&bytes?).await?;
}
// Call `close` to ensure data is written.
writer.close().await?;
let read_bytes = rp.metadata().content_length() as u32;
increment_gauge!(OBJECT_STORE_LRU_CACHE_ENTRIES, 1.0);
increment_gauge!(OBJECT_STORE_LRU_CACHE_BYTES, read_bytes as f64);
Ok(ReadResult::Success(read_bytes))
}
Err(e) if e.kind() == ErrorKind::NotFound => {
increment_counter!(OBJECT_STORE_READ_ERROR, "kind" => format!("{}", e.kind()));
increment_gauge!(OBJECT_STORE_LRU_CACHE_ENTRIES, 1.0);
Ok(ReadResult::NotFound)
}
Err(e) => {
increment_counter!(OBJECT_STORE_READ_ERROR, "kind" => format!("{}", e.kind()));
Err(e)
}
}
}
}
fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
(input.0, Box::new(input.1))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_can_cache() {
assert!(can_cache("test"));
assert!(can_cache("a/b/c.parquet"));
assert!(can_cache("1.json"));
assert!(can_cache("100.checkpoint"));
assert!(can_cache("test/last_checkpoint"));
assert!(!can_cache("test/__last_checkpoint"));
assert!(!can_cache("a/b/c/__last_checkpoint"));
}
}

View File

@@ -14,7 +14,13 @@
//! object-store metrics
/// Cache hit counter, no matter what the cache result is.
pub const OBJECT_STORE_LRU_CACHE_HIT: &str = "object_store.lru_cache.hit";
/// Cache miss counter
pub const OBJECT_STORE_LRU_CACHE_MISS: &str = "object_store.lru_cache.miss";
pub const OBJECT_STORE_LRU_CACHE_ERROR: &str = "object_store.lru_cache.error";
pub const OBJECT_STORE_LRU_CACHE_ERROR_KIND: &str = "error";
/// Object store read error counter
pub const OBJECT_STORE_READ_ERROR: &str = "object_store.read.errors";
/// Cache entry number
pub const OBJECT_STORE_LRU_CACHE_ENTRIES: &str = "object_store.lru_cache.entries";
/// Cache size in bytes
pub const OBJECT_STORE_LRU_CACHE_BYTES: &str = "object_store.lru_cache.bytes";

View File

@@ -30,6 +30,8 @@ async fn test_object_crud(store: &ObjectStore) -> Result<()> {
// Create object handler.
// Write data info object;
let file_name = "test_file";
assert!(store.read(file_name).await.is_err());
store.write(file_name, "Hello, World!").await?;
// Read data from object;
@@ -80,6 +82,11 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> {
store.delete(p2).await?;
let entries = store.list("/").await?;
assert!(entries.is_empty());
assert!(store.read(p1).await.is_err());
assert!(store.read(p2).await.is_err());
assert!(store.read(p3).await.is_err());
Ok(())
}
@@ -210,12 +217,48 @@ async fn test_gcs_backend() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn test_file_backend_with_lru_cache() -> Result<()> {
logging::init_default_ut_logging();
let data_dir = create_temp_dir("test_file_backend_with_lru_cache");
let tmp_dir = create_temp_dir("test_file_backend_with_lru_cache");
let mut builder = Fs::default();
let _ = builder
.root(&data_dir.path().to_string_lossy())
.atomic_write_dir(&tmp_dir.path().to_string_lossy());
let store = ObjectStore::new(builder).unwrap().finish();
let cache_dir = create_temp_dir("test_file_backend_with_lru_cache");
let cache_layer = {
let mut builder = Fs::default();
let _ = builder
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&cache_dir.path().to_string_lossy());
let file_cache = Arc::new(builder.build().unwrap());
LruCacheLayer::new(Arc::new(file_cache.clone()), 32)
.await
.unwrap()
};
let store = store.layer(cache_layer.clone());
test_object_crud(&store).await?;
test_object_list(&store).await?;
assert_eq!(cache_layer.read_cache_stat().await, (4, 0));
Ok(())
}
async fn assert_lru_cache<C: Accessor + Clone>(
cache_layer: &LruCacheLayer<C>,
file_names: &[&str],
) {
for file_name in file_names {
assert!(cache_layer.lru_contains_key(file_name).await);
assert!(cache_layer.contains_file(file_name).await);
}
}
@@ -265,11 +308,11 @@ async fn test_object_store_cache_policy() -> Result<()> {
let _ = builder
.root(&cache_dir.path().to_string_lossy())
.atomic_write_dir(&cache_dir.path().to_string_lossy());
let cache_accessor = Arc::new(builder.build().unwrap());
let cache_store = OperatorBuilder::new(cache_accessor.clone()).finish();
let file_cache = Arc::new(builder.build().unwrap());
let cache_store = OperatorBuilder::new(file_cache.clone()).finish();
// create operator for cache dir to verify cache file
let cache_layer = LruCacheLayer::new(Arc::new(cache_accessor.clone()), 3)
let cache_layer = LruCacheLayer::new(Arc::new(file_cache.clone()), 38)
.await
.unwrap();
let store = store.layer(cache_layer.clone());
@@ -281,13 +324,14 @@ async fn test_object_store_cache_policy() -> Result<()> {
store.write(p1, "Hello, object1!").await.unwrap();
store.write(p2, "Hello, object2!").await.unwrap();
// create cache by read object
// Try to read p1 and p2
let _ = store.read_with(p1).range(0..).await?;
let _ = store.read(p1).await?;
let _ = store.read_with(p2).range(0..).await?;
let _ = store.read_with(p2).range(7..).await?;
let _ = store.read(p2).await?;
assert_eq!(cache_layer.read_cache_stat().await, (3, 38));
assert_cache_files(
&cache_store,
&[
@@ -302,13 +346,16 @@ async fn test_object_store_cache_policy() -> Result<()> {
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-",
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-",
],
)
.await;
// Delete p2 file
store.delete(p2).await.unwrap();
assert_eq!(cache_layer.read_cache_stat().await, (1, 15));
assert_cache_files(
&cache_store,
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"],
@@ -321,12 +368,17 @@ async fn test_object_store_cache_policy() -> Result<()> {
)
.await;
assert!(store.read(p2).await.is_err());
let p3 = "test_file3";
store.write(p3, "Hello, object3!").await.unwrap();
// Try to read p3
let _ = store.read(p3).await.unwrap();
let _ = store.read_with(p3).range(0..5).await.unwrap();
// The entry count is 4, because we have the p2 `NotFound` cache.
assert_eq!(cache_layer.read_cache_stat().await, (4, 35));
assert_cache_files(
&cache_store,
&[
@@ -347,6 +399,33 @@ async fn test_object_store_cache_policy() -> Result<()> {
)
.await;
// try to read p1, p2, p3
let _ = store.read(p3).await.unwrap();
let _ = store.read_with(p3).range(0..5).await.unwrap();
assert!(store.read(p2).await.is_err());
// Read p1 with range `1..` , the existing p1 with range `0..` must be evicted.
let _ = store.read_with(p1).range(1..15).await.unwrap();
assert_eq!(cache_layer.read_cache_stat().await, (4, 34));
assert_cache_files(
&cache_store,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
&["ello, object1!", "Hello, object3!", "Hello"],
)
.await?;
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],
)
.await;
let handle = metric::try_handle().unwrap();
let metric_text = handle.render();
@@ -354,14 +433,15 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert!(metric_text.contains("object_store_lru_cache_miss"));
drop(cache_layer);
let cache_layer = LruCacheLayer::new(Arc::new(cache_accessor), 3)
.await
.unwrap();
// Test recover
let cache_layer = LruCacheLayer::new(Arc::new(file_cache), 38).await.unwrap();
// The p2 `NotFound` cache will not be recovered
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
assert_lru_cache(
&cache_layer,
&[
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
],

View File

@@ -206,7 +206,7 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te
let mut s3_config = s3_test_config();
if *store_type == StorageType::S3WithCache {
s3_config.cache_path = Some("/tmp/greptimedb_cache".to_string());
s3_config.cache.cache_path = Some("/tmp/greptimedb_cache".to_string());
}
let mut builder = S3::default();

View File

@@ -174,6 +174,7 @@ async fn has_route_cache(instance: &Arc<Instance>, table_id: TableId) -> bool {
cache
.get(TableRouteKey::new(table_id).as_raw_key().as_slice())
.await
.is_some()
}