mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-09 06:42:57 +00:00
feat: refactoring LruCacheLayer with list_with_metakey and concurrent_stat_in_list (#4596)
* use list_with_metakey and concurrent_stat_in_list * change concurrent in recover_cache like before * remove stat funcation * use 8 concurrent * use const value * fmt code * Apply suggestions from code review --------- Co-authored-by: ozewr <l19ht@google.com> Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
@@ -18,15 +18,17 @@ use common_telemetry::debug;
|
||||
use futures::FutureExt;
|
||||
use moka::future::Cache;
|
||||
use moka::notification::ListenerFuture;
|
||||
use opendal::raw::oio::{List, Read, Reader, Write};
|
||||
use opendal::raw::{Access, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead};
|
||||
use opendal::{Error as OpendalError, ErrorKind, Result};
|
||||
use opendal::raw::oio::{Read, Reader, Write};
|
||||
use opendal::raw::{Access, OpDelete, OpRead, OpStat, OpWrite, RpRead};
|
||||
use opendal::{Error as OpendalError, ErrorKind, Metakey, OperatorBuilder, Result};
|
||||
|
||||
use crate::metrics::{
|
||||
OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
|
||||
OBJECT_STORE_LRU_CACHE_MISS, OBJECT_STORE_READ_ERROR,
|
||||
};
|
||||
|
||||
const RECOVER_CACHE_LIST_CONCURRENT: usize = 8;
|
||||
|
||||
/// Cache value for read file
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
|
||||
enum ReadResult {
|
||||
@@ -142,19 +144,16 @@ impl<C: Access> ReadCache<C> {
|
||||
/// Recover existing cache items from `file_cache` to `mem_cache`.
|
||||
/// Return entry count and total approximate entry size in bytes.
|
||||
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
|
||||
let (_, mut pager) = self.file_cache.list("/", OpList::default()).await?;
|
||||
let op = OperatorBuilder::new(self.file_cache.clone()).finish();
|
||||
let mut entries = op
|
||||
.list_with("/")
|
||||
.metakey(Metakey::ContentLength | Metakey::ContentType)
|
||||
.concurrent(RECOVER_CACHE_LIST_CONCURRENT)
|
||||
.await?;
|
||||
|
||||
while let Some(entry) = pager.next().await? {
|
||||
while let Some(entry) = entries.pop() {
|
||||
let read_key = entry.path();
|
||||
|
||||
// We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly,
|
||||
// because it's private field.
|
||||
let size = {
|
||||
let stat = self.file_cache.stat(read_key, OpStat::default()).await?;
|
||||
|
||||
stat.into_metadata().content_length()
|
||||
};
|
||||
|
||||
let size = entry.metadata().content_length();
|
||||
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
|
||||
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
|
||||
self.mem_cache
|
||||
|
||||
Reference in New Issue
Block a user