fix: remove path label for cache store (#4336)

* fix: remove path label for cache store

* fix: ignore path label for intermediatemanager

* refactor: remove unused object store
This commit is contained in:
Ning Sun
2024-07-15 11:34:19 +08:00
committed by GitHub
parent 4b8b04ffa2
commit b8bd8456f0
8 changed files with 46 additions and 29 deletions

View File

@@ -31,7 +31,7 @@ pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
.expect("input error level must be valid"),
)
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::PrometheusMetricsLayer)
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.finish();
Ok(object_store)
}

View File

@@ -94,7 +94,7 @@ pub fn build_s3_backend(
.expect("input error level must be valid"),
)
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::PrometheusMetricsLayer)
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.finish())
}

View File

@@ -60,7 +60,7 @@ pub(crate) async fn new_object_store(
object_store
};
let store = with_instrument_layers(object_store);
let store = with_instrument_layers(object_store, true);
Ok(store)
}

View File

@@ -208,18 +208,15 @@ pub(crate) struct SstWriteRequest {
pub(crate) fulltext_index_config: FulltextIndexConfig,
}
/// Creates a fs object store with atomic write dir.
pub(crate) async fn new_fs_object_store(root: &str) -> Result<ObjectStore> {
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {
let atomic_write_dir = join_dir(root, ".tmp/");
clean_dir(&atomic_write_dir).await?;
let mut builder = Fs::default();
builder.root(root).atomic_write_dir(&atomic_write_dir);
let object_store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
// Add layers.
let object_store = with_instrument_layers(object_store);
Ok(object_store)
Ok(with_instrument_layers(store, false))
}
/// Clean the directory.

View File

@@ -24,7 +24,7 @@ use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
use snafu::ResultExt;
use crate::access_layer::{new_fs_object_store, SstWriteRequest};
use crate::access_layer::{new_fs_cache_store, SstWriteRequest};
use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue};
use crate::error::{self, Result};
use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL};
@@ -86,7 +86,7 @@ impl WriteCache {
) -> Result<Self> {
info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
let local_store = new_fs_object_store(cache_dir).await?;
let local_store = new_fs_cache_store(cache_dir).await?;
Self::new(
local_store,
object_store_manager,

View File

@@ -19,7 +19,7 @@ use object_store::util::{self, normalize_dir};
use store_api::storage::{ColumnId, RegionId};
use uuid::Uuid;
use crate::access_layer::new_fs_object_store;
use crate::access_layer::new_fs_cache_store;
use crate::error::Result;
use crate::sst::file::FileId;
use crate::sst::index::store::InstrumentedStore;
@@ -37,7 +37,7 @@ impl IntermediateManager {
/// Create a new `IntermediateManager` with the given root path.
/// It will clean up all garbage intermediate files from previous runs.
pub async fn init_fs(aux_path: impl AsRef<str>) -> Result<Self> {
let store = new_fs_object_store(&normalize_dir(aux_path.as_ref())).await?;
let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?;
let store = InstrumentedStore::new(store);
// Remove all garbage intermediate files from previous runs.

View File

@@ -84,7 +84,15 @@ fn increment_errors_total(op: Operation, kind: ErrorKind) {
///
/// The metric buckets for these histograms are automatically generated based on the `exponential_buckets(0.01, 2.0, 16)` configuration.
#[derive(Default, Debug, Clone)]
pub struct PrometheusMetricsLayer;
pub struct PrometheusMetricsLayer {
pub path_label: bool,
}
impl PrometheusMetricsLayer {
pub fn new(path_label: bool) -> Self {
Self { path_label }
}
}
impl<A: Access> Layer<A> for PrometheusMetricsLayer {
type LayeredAccess = PrometheusAccess<A>;
@@ -96,6 +104,7 @@ impl<A: Access> Layer<A> for PrometheusMetricsLayer {
PrometheusAccess {
inner,
scheme: scheme.to_string(),
path_label: self.path_label,
}
}
}
@@ -104,6 +113,17 @@ impl<A: Access> Layer<A> for PrometheusMetricsLayer {
pub struct PrometheusAccess<A: Access> {
inner: A,
scheme: String,
path_label: bool,
}
impl<A: Access> PrometheusAccess<A> {
fn get_path_label<'a>(&self, path: &'a str) -> &'a str {
if self.path_label {
extract_parent_path(path)
} else {
""
}
}
}
impl<A: Access> Debug for PrometheusAccess<A> {
@@ -128,7 +148,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}
async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label])
.inc();
@@ -146,7 +166,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label])
.inc();
@@ -176,7 +196,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label])
.inc();
@@ -206,7 +226,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label])
.inc();
@@ -223,7 +243,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label])
.inc();
@@ -241,7 +261,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::List.into_static(), path_label])
.inc();
@@ -277,7 +297,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label])
.inc();
@@ -295,7 +315,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}
fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
@@ -322,7 +342,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
@@ -363,7 +383,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
@@ -404,7 +424,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}
fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
@@ -429,7 +449,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}
fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,
@@ -455,7 +475,7 @@ impl<A: Access> LayeredAccess for PrometheusAccess<A> {
}
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
let path_label = extract_parent_path(path);
let path_label = self.get_path_label(path);
REQUESTS_TOTAL
.with_label_values(&[
&self.scheme,

View File

@@ -138,7 +138,7 @@ pub(crate) fn extract_parent_path(path: &str) -> &str {
}
/// Attaches instrument layers to the object store.
pub fn with_instrument_layers(object_store: ObjectStore) -> ObjectStore {
pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
object_store
.layer(
LoggingLayer::default()
@@ -148,7 +148,7 @@ pub fn with_instrument_layers(object_store: ObjectStore) -> ObjectStore {
.expect("input error level must be valid"),
)
.layer(TracingLayer)
.layer(PrometheusMetricsLayer)
.layer(PrometheusMetricsLayer::new(path_label))
}
#[cfg(test)]