mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 05:42:57 +00:00
chore: Downgrade opendal for releasing 0.11.1
Revert "feat: bump opendal and switch prometheus layer to the upstream impl (#5179)"
This reverts commit 422d18da8b.
This commit is contained in:
25
Cargo.lock
generated
25
Cargo.lock
generated
@@ -896,6 +896,18 @@ dependencies = [
|
|||||||
"rand",
|
"rand",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "backon"
|
||||||
|
version = "0.4.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d67782c3f868daa71d3533538e98a8e13713231969def7536e8039606fc46bf0"
|
||||||
|
dependencies = [
|
||||||
|
"fastrand",
|
||||||
|
"futures-core",
|
||||||
|
"pin-project",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "backon"
|
name = "backon"
|
||||||
version = "1.2.0"
|
version = "1.2.0"
|
||||||
@@ -2252,7 +2264,7 @@ version = "0.12.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"async-stream",
|
"async-stream",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"backon",
|
"backon 1.2.0",
|
||||||
"common-base",
|
"common-base",
|
||||||
"common-error",
|
"common-error",
|
||||||
"common-macro",
|
"common-macro",
|
||||||
@@ -7469,13 +7481,13 @@ checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "opendal"
|
name = "opendal"
|
||||||
version = "0.50.2"
|
version = "0.49.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "cb28bb6c64e116ceaf8dd4e87099d3cfea4a58e85e62b104fef74c91afba0f44"
|
checksum = "9b04d09b9822c2f75a1d2fc513a2c1279c70e91e7407936fffdf6a6976ec530a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"backon",
|
"backon 0.4.4",
|
||||||
"base64 0.22.1",
|
"base64 0.22.1",
|
||||||
"bytes",
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
@@ -7488,7 +7500,6 @@ dependencies = [
|
|||||||
"md-5",
|
"md-5",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
"prometheus",
|
|
||||||
"quick-xml 0.36.2",
|
"quick-xml 0.36.2",
|
||||||
"reqsign",
|
"reqsign",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
@@ -9504,9 +9515,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "reqsign"
|
name = "reqsign"
|
||||||
version = "0.16.1"
|
version = "0.16.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "eb0075a66c8bfbf4cc8b70dca166e722e1f55a3ea9250ecbb85f4d92a5f64149"
|
checksum = "03dd4ba7c3901dd43e6b8c7446a760d45bc1ea4301002e1a6fa48f97c3a796fa"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
|
|||||||
DefaultLoggingInterceptor,
|
DefaultLoggingInterceptor,
|
||||||
))
|
))
|
||||||
.layer(object_store::layers::TracingLayer)
|
.layer(object_store::layers::TracingLayer)
|
||||||
.layer(object_store::layers::build_prometheus_metrics_layer(true))
|
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
|
||||||
.finish();
|
.finish();
|
||||||
Ok(object_store)
|
Ok(object_store)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -89,7 +89,7 @@ pub fn build_s3_backend(
|
|||||||
DefaultLoggingInterceptor,
|
DefaultLoggingInterceptor,
|
||||||
))
|
))
|
||||||
.layer(object_store::layers::TracingLayer)
|
.layer(object_store::layers::TracingLayer)
|
||||||
.layer(object_store::layers::build_prometheus_metrics_layer(true))
|
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
|
||||||
.finish())
|
.finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -544,7 +544,7 @@ mod tests {
|
|||||||
use common_test_util::temp_dir::create_temp_dir;
|
use common_test_util::temp_dir::create_temp_dir;
|
||||||
use futures_util::future::BoxFuture;
|
use futures_util::future::BoxFuture;
|
||||||
use futures_util::FutureExt;
|
use futures_util::FutureExt;
|
||||||
use object_store::{EntryMode, ObjectStore};
|
use object_store::ObjectStore;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -578,11 +578,7 @@ mod tests {
|
|||||||
) {
|
) {
|
||||||
let dir = proc_path!(procedure_store, "{procedure_id}/");
|
let dir = proc_path!(procedure_store, "{procedure_id}/");
|
||||||
let lister = object_store.list(&dir).await.unwrap();
|
let lister = object_store.list(&dir).await.unwrap();
|
||||||
let mut files_in_dir: Vec<_> = lister
|
let mut files_in_dir: Vec<_> = lister.into_iter().map(|de| de.name().to_string()).collect();
|
||||||
.into_iter()
|
|
||||||
.filter(|x| x.metadata().mode() == EntryMode::FILE)
|
|
||||||
.map(|de| de.name().to_string())
|
|
||||||
.collect();
|
|
||||||
files_in_dir.sort_unstable();
|
files_in_dir.sort_unstable();
|
||||||
assert_eq!(files, files_in_dir);
|
assert_eq!(files, files_in_dir);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -193,14 +193,6 @@ pub enum Error {
|
|||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Failed to build http client"))]
|
|
||||||
BuildHttpClient {
|
|
||||||
#[snafu(implicit)]
|
|
||||||
location: Location,
|
|
||||||
#[snafu(source)]
|
|
||||||
error: reqwest::Error,
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("Missing required field: {}", name))]
|
#[snafu(display("Missing required field: {}", name))]
|
||||||
MissingRequiredField {
|
MissingRequiredField {
|
||||||
name: String,
|
name: String,
|
||||||
@@ -414,10 +406,9 @@ impl ErrorExt for Error {
|
|||||||
| MissingKvBackend { .. }
|
| MissingKvBackend { .. }
|
||||||
| TomlFormat { .. } => StatusCode::InvalidArguments,
|
| TomlFormat { .. } => StatusCode::InvalidArguments,
|
||||||
|
|
||||||
PayloadNotExist { .. }
|
PayloadNotExist { .. } | Unexpected { .. } | WatchAsyncTaskChange { .. } => {
|
||||||
| Unexpected { .. }
|
StatusCode::Unexpected
|
||||||
| WatchAsyncTaskChange { .. }
|
}
|
||||||
| BuildHttpClient { .. } => StatusCode::Unexpected,
|
|
||||||
|
|
||||||
AsyncTaskExecute { source, .. } => source.status_code(),
|
AsyncTaskExecute { source, .. } => source.status_code(),
|
||||||
|
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, O
|
|||||||
use snafu::prelude::*;
|
use snafu::prelude::*;
|
||||||
|
|
||||||
use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
|
use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
|
||||||
use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result};
|
use crate::error::{self, CreateDirSnafu, Result};
|
||||||
|
|
||||||
pub(crate) async fn new_raw_object_store(
|
pub(crate) async fn new_raw_object_store(
|
||||||
store: &ObjectStoreConfig,
|
store: &ObjectStoreConfig,
|
||||||
@@ -236,8 +236,7 @@ pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient>
|
|||||||
builder.timeout(config.timeout)
|
builder.timeout(config.timeout)
|
||||||
};
|
};
|
||||||
|
|
||||||
let client = http_builder.build().context(BuildHttpClientSnafu)?;
|
HttpClient::build(http_builder).context(error::InitBackendSnafu)
|
||||||
Ok(HttpClient::with(client))
|
|
||||||
}
|
}
|
||||||
struct PrintDetailedError;
|
struct PrintDetailedError;
|
||||||
|
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ impl FileRegionManifest {
|
|||||||
pub async fn store(&self, region_dir: &str, object_store: &ObjectStore) -> Result<()> {
|
pub async fn store(&self, region_dir: &str, object_store: &ObjectStore) -> Result<()> {
|
||||||
let path = ®ion_manifest_path(region_dir);
|
let path = ®ion_manifest_path(region_dir);
|
||||||
let exist = object_store
|
let exist = object_store
|
||||||
.exists(path)
|
.is_exist(path)
|
||||||
.await
|
.await
|
||||||
.context(CheckObjectSnafu { path })?;
|
.context(CheckObjectSnafu { path })?;
|
||||||
ensure!(!exist, ManifestExistsSnafu { path });
|
ensure!(!exist, ManifestExistsSnafu { path });
|
||||||
|
|||||||
@@ -130,7 +130,7 @@ mod tests {
|
|||||||
assert_eq!(region.metadata.primary_key, vec![1]);
|
assert_eq!(region.metadata.primary_key, vec![1]);
|
||||||
|
|
||||||
assert!(object_store
|
assert!(object_store
|
||||||
.exists("create_region_dir/manifest/_file_manifest")
|
.is_exist("create_region_dir/manifest/_file_manifest")
|
||||||
.await
|
.await
|
||||||
.unwrap());
|
.unwrap());
|
||||||
|
|
||||||
@@ -198,13 +198,13 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
assert!(object_store
|
assert!(object_store
|
||||||
.exists("drop_region_dir/manifest/_file_manifest")
|
.is_exist("drop_region_dir/manifest/_file_manifest")
|
||||||
.await
|
.await
|
||||||
.unwrap());
|
.unwrap());
|
||||||
|
|
||||||
FileRegion::drop(®ion, &object_store).await.unwrap();
|
FileRegion::drop(®ion, &object_store).await.unwrap();
|
||||||
assert!(!object_store
|
assert!(!object_store
|
||||||
.exists("drop_region_dir/manifest/_file_manifest")
|
.is_exist("drop_region_dir/manifest/_file_manifest")
|
||||||
.await
|
.await
|
||||||
.unwrap());
|
.unwrap());
|
||||||
|
|
||||||
|
|||||||
@@ -313,12 +313,12 @@ mod test {
|
|||||||
let region_dir = "test_metric_region";
|
let region_dir = "test_metric_region";
|
||||||
// assert metadata region's dir
|
// assert metadata region's dir
|
||||||
let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR);
|
let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR);
|
||||||
let exist = object_store.exists(&metadata_region_dir).await.unwrap();
|
let exist = object_store.is_exist(&metadata_region_dir).await.unwrap();
|
||||||
assert!(exist);
|
assert!(exist);
|
||||||
|
|
||||||
// assert data region's dir
|
// assert data region's dir
|
||||||
let data_region_dir = join_dir(region_dir, DATA_REGION_SUBDIR);
|
let data_region_dir = join_dir(region_dir, DATA_REGION_SUBDIR);
|
||||||
let exist = object_store.exists(&data_region_dir).await.unwrap();
|
let exist = object_store.is_exist(&data_region_dir).await.unwrap();
|
||||||
assert!(exist);
|
assert!(exist);
|
||||||
|
|
||||||
// check mito engine
|
// check mito engine
|
||||||
|
|||||||
4
src/mito2/src/cache/file_cache.rs
vendored
4
src/mito2/src/cache/file_cache.rs
vendored
@@ -286,7 +286,7 @@ impl FileCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
|
async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
|
||||||
if self.local_store.exists(file_path).await? {
|
if self.local_store.is_exist(file_path).await? {
|
||||||
Ok(Some(self.local_store.reader(file_path).await?))
|
Ok(Some(self.local_store.reader(file_path).await?))
|
||||||
} else {
|
} else {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
@@ -480,7 +480,7 @@ mod tests {
|
|||||||
cache.memory_index.run_pending_tasks().await;
|
cache.memory_index.run_pending_tasks().await;
|
||||||
|
|
||||||
// The file also not exists.
|
// The file also not exists.
|
||||||
assert!(!local_store.exists(&file_path).await.unwrap());
|
assert!(!local_store.is_exist(&file_path).await.unwrap());
|
||||||
assert_eq!(0, cache.memory_index.weighted_size());
|
assert_eq!(0, cache.memory_index.weighted_size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -192,12 +192,12 @@ async fn test_engine_create_with_custom_store() {
|
|||||||
assert!(object_store_manager
|
assert!(object_store_manager
|
||||||
.find("Gcs")
|
.find("Gcs")
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.exists(region_dir)
|
.is_exist(region_dir)
|
||||||
.await
|
.await
|
||||||
.unwrap());
|
.unwrap());
|
||||||
assert!(!object_store_manager
|
assert!(!object_store_manager
|
||||||
.default_object_store()
|
.default_object_store()
|
||||||
.exists(region_dir)
|
.is_exist(region_dir)
|
||||||
.await
|
.await
|
||||||
.unwrap());
|
.unwrap());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -71,7 +71,7 @@ async fn test_engine_drop_region() {
|
|||||||
assert!(!env
|
assert!(!env
|
||||||
.get_object_store()
|
.get_object_store()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.exists(&join_path(®ion_dir, DROPPING_MARKER_FILE))
|
.is_exist(&join_path(®ion_dir, DROPPING_MARKER_FILE))
|
||||||
.await
|
.await
|
||||||
.unwrap());
|
.unwrap());
|
||||||
|
|
||||||
@@ -93,7 +93,7 @@ async fn test_engine_drop_region() {
|
|||||||
listener.wait().await;
|
listener.wait().await;
|
||||||
|
|
||||||
let object_store = env.get_object_store().unwrap();
|
let object_store = env.get_object_store().unwrap();
|
||||||
assert!(!object_store.exists(®ion_dir).await.unwrap());
|
assert!(!object_store.is_exist(®ion_dir).await.unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -167,13 +167,13 @@ async fn test_engine_drop_region_for_custom_store() {
|
|||||||
assert!(object_store_manager
|
assert!(object_store_manager
|
||||||
.find("Gcs")
|
.find("Gcs")
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.exists(&custom_region_dir)
|
.is_exist(&custom_region_dir)
|
||||||
.await
|
.await
|
||||||
.unwrap());
|
.unwrap());
|
||||||
assert!(object_store_manager
|
assert!(object_store_manager
|
||||||
.find("default")
|
.find("default")
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.exists(&global_region_dir)
|
.is_exist(&global_region_dir)
|
||||||
.await
|
.await
|
||||||
.unwrap());
|
.unwrap());
|
||||||
|
|
||||||
@@ -190,13 +190,13 @@ async fn test_engine_drop_region_for_custom_store() {
|
|||||||
assert!(!object_store_manager
|
assert!(!object_store_manager
|
||||||
.find("Gcs")
|
.find("Gcs")
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.exists(&custom_region_dir)
|
.is_exist(&custom_region_dir)
|
||||||
.await
|
.await
|
||||||
.unwrap());
|
.unwrap());
|
||||||
assert!(object_store_manager
|
assert!(object_store_manager
|
||||||
.find("default")
|
.find("default")
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.exists(&global_region_dir)
|
.is_exist(&global_region_dir)
|
||||||
.await
|
.await
|
||||||
.unwrap());
|
.unwrap());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -228,13 +228,13 @@ async fn test_engine_region_open_with_custom_store() {
|
|||||||
let object_store_manager = env.get_object_store_manager().unwrap();
|
let object_store_manager = env.get_object_store_manager().unwrap();
|
||||||
assert!(!object_store_manager
|
assert!(!object_store_manager
|
||||||
.default_object_store()
|
.default_object_store()
|
||||||
.exists(region.access_layer.region_dir())
|
.is_exist(region.access_layer.region_dir())
|
||||||
.await
|
.await
|
||||||
.unwrap());
|
.unwrap());
|
||||||
assert!(object_store_manager
|
assert!(object_store_manager
|
||||||
.find("Gcs")
|
.find("Gcs")
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.exists(region.access_layer.region_dir())
|
.is_exist(region.access_layer.region_dir())
|
||||||
.await
|
.await
|
||||||
.unwrap());
|
.unwrap());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -84,7 +84,6 @@ async fn manager_without_checkpoint() {
|
|||||||
|
|
||||||
// check files
|
// check files
|
||||||
let mut expected = vec![
|
let mut expected = vec![
|
||||||
"/",
|
|
||||||
"00000000000000000010.json",
|
"00000000000000000010.json",
|
||||||
"00000000000000000009.json",
|
"00000000000000000009.json",
|
||||||
"00000000000000000008.json",
|
"00000000000000000008.json",
|
||||||
@@ -131,7 +130,6 @@ async fn manager_with_checkpoint_distance_1() {
|
|||||||
|
|
||||||
// check files
|
// check files
|
||||||
let mut expected = vec![
|
let mut expected = vec![
|
||||||
"/",
|
|
||||||
"00000000000000000009.checkpoint",
|
"00000000000000000009.checkpoint",
|
||||||
"00000000000000000010.checkpoint",
|
"00000000000000000010.checkpoint",
|
||||||
"00000000000000000010.json",
|
"00000000000000000010.json",
|
||||||
|
|||||||
@@ -185,7 +185,7 @@ mod tests {
|
|||||||
|
|
||||||
scheduler.stop(true).await.unwrap();
|
scheduler.stop(true).await.unwrap();
|
||||||
|
|
||||||
assert!(!object_store.exists(&path).await.unwrap());
|
assert!(!object_store.is_exist(&path).await.unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -247,7 +247,7 @@ mod tests {
|
|||||||
|
|
||||||
scheduler.stop(true).await.unwrap();
|
scheduler.stop(true).await.unwrap();
|
||||||
|
|
||||||
assert!(!object_store.exists(&path).await.unwrap());
|
assert!(!object_store.is_exist(&path).await.unwrap());
|
||||||
assert!(!object_store.exists(&index_path).await.unwrap());
|
assert!(!object_store.is_exist(&index_path).await.unwrap());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
|||||||
// Check if this region is pending drop. And clean the entire dir if so.
|
// Check if this region is pending drop. And clean the entire dir if so.
|
||||||
if !self.dropping_regions.is_region_exists(region_id)
|
if !self.dropping_regions.is_region_exists(region_id)
|
||||||
&& object_store
|
&& object_store
|
||||||
.exists(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
|
.is_exist(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
|
||||||
.await
|
.await
|
||||||
.context(OpenDalSnafu)?
|
.context(OpenDalSnafu)?
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -17,9 +17,8 @@ futures.workspace = true
|
|||||||
lazy_static.workspace = true
|
lazy_static.workspace = true
|
||||||
md5 = "0.7"
|
md5 = "0.7"
|
||||||
moka = { workspace = true, features = ["future"] }
|
moka = { workspace = true, features = ["future"] }
|
||||||
opendal = { version = "0.50", features = [
|
opendal = { version = "0.49", features = [
|
||||||
"layers-tracing",
|
"layers-tracing",
|
||||||
"layers-prometheus",
|
|
||||||
"services-azblob",
|
"services-azblob",
|
||||||
"services-fs",
|
"services-fs",
|
||||||
"services-gcs",
|
"services-gcs",
|
||||||
|
|||||||
@@ -13,37 +13,8 @@
|
|||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
mod lru_cache;
|
mod lru_cache;
|
||||||
|
mod prometheus;
|
||||||
|
|
||||||
pub use lru_cache::*;
|
pub use lru_cache::*;
|
||||||
pub use opendal::layers::*;
|
pub use opendal::layers::*;
|
||||||
pub use prometheus::build_prometheus_metrics_layer;
|
pub use prometheus::PrometheusMetricsLayer;
|
||||||
|
|
||||||
mod prometheus {
|
|
||||||
use std::sync::{Mutex, OnceLock};
|
|
||||||
|
|
||||||
use opendal::layers::PrometheusLayer;
|
|
||||||
|
|
||||||
static PROMETHEUS_LAYER: OnceLock<Mutex<PrometheusLayer>> = OnceLock::new();
|
|
||||||
|
|
||||||
pub fn build_prometheus_metrics_layer(with_path_label: bool) -> PrometheusLayer {
|
|
||||||
PROMETHEUS_LAYER
|
|
||||||
.get_or_init(|| {
|
|
||||||
// This logical tries to extract parent path from the object storage operation
|
|
||||||
// the function also relies on assumption that the region path is built from
|
|
||||||
// pattern `<data|index>/catalog/schema/table_id/....`
|
|
||||||
//
|
|
||||||
// We'll get the data/catalog/schema from path.
|
|
||||||
let path_level = if with_path_label { 3 } else { 0 };
|
|
||||||
|
|
||||||
let layer = PrometheusLayer::builder()
|
|
||||||
.path_label(path_level)
|
|
||||||
.register_default()
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
Mutex::new(layer)
|
|
||||||
})
|
|
||||||
.lock()
|
|
||||||
.unwrap()
|
|
||||||
.clone()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -156,12 +156,9 @@ impl<C: Access> ReadCache<C> {
|
|||||||
let size = entry.metadata().content_length();
|
let size = entry.metadata().content_length();
|
||||||
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
|
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
|
||||||
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
|
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
|
||||||
// ignore root path
|
self.mem_cache
|
||||||
if entry.path() != "/" {
|
.insert(read_key.to_string(), ReadResult::Success(size as u32))
|
||||||
self.mem_cache
|
.await;
|
||||||
.insert(read_key.to_string(), ReadResult::Success(size as u32))
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(self.cache_stat().await)
|
Ok(self.cache_stat().await)
|
||||||
|
|||||||
584
src/object-store/src/layers/prometheus.rs
Normal file
584
src/object-store/src/layers/prometheus.rs
Normal file
@@ -0,0 +1,584 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
//! code originally from <https://github.com/apache/incubator-opendal/blob/main/core/src/layers/prometheus.rs>, make a tiny change to avoid crash in multi thread env
|
||||||
|
|
||||||
|
use std::fmt::{Debug, Formatter};
|
||||||
|
|
||||||
|
use common_telemetry::debug;
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
use opendal::raw::*;
|
||||||
|
use opendal::{Buffer, ErrorKind};
|
||||||
|
use prometheus::{
|
||||||
|
exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec,
|
||||||
|
Histogram, HistogramTimer, HistogramVec, IntCounterVec,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::util::extract_parent_path;
|
||||||
|
|
||||||
|
type Result<T> = std::result::Result<T, opendal::Error>;
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static ref REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!(
|
||||||
|
"opendal_requests_total",
|
||||||
|
"Total times of all kinds of operation being called",
|
||||||
|
&["scheme", "operation", "path"],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
static ref REQUESTS_DURATION_SECONDS: HistogramVec = register_histogram_vec!(
|
||||||
|
histogram_opts!(
|
||||||
|
"opendal_requests_duration_seconds",
|
||||||
|
"Histogram of the time spent on specific operation",
|
||||||
|
exponential_buckets(0.01, 2.0, 16).unwrap()
|
||||||
|
),
|
||||||
|
&["scheme", "operation", "path"]
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
static ref BYTES_TOTAL: HistogramVec = register_histogram_vec!(
|
||||||
|
histogram_opts!(
|
||||||
|
"opendal_bytes_total",
|
||||||
|
"Total size of sync or async Read/Write",
|
||||||
|
exponential_buckets(0.01, 2.0, 16).unwrap()
|
||||||
|
),
|
||||||
|
&["scheme", "operation", "path"]
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn increment_errors_total(op: Operation, kind: ErrorKind) {
|
||||||
|
debug!(
|
||||||
|
"Prometheus statistics metrics error, operation {} error {}",
|
||||||
|
op.into_static(),
|
||||||
|
kind.into_static()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Please refer to [prometheus](https://docs.rs/prometheus) for every operation.
|
||||||
|
///
|
||||||
|
/// # Prometheus Metrics
|
||||||
|
///
|
||||||
|
/// In this section, we will introduce three metrics that are currently being exported by opendal. These metrics are essential for understanding the behavior and performance of opendal.
|
||||||
|
///
|
||||||
|
///
|
||||||
|
/// | Metric Name | Type | Description | Labels |
|
||||||
|
/// |-----------------------------------|-----------|------------------------------------------------------|---------------------|
|
||||||
|
/// | opendal_requests_total | Counter | Total times of all kinds of operation being called | scheme, operation |
|
||||||
|
/// | opendal_requests_duration_seconds | Histogram | Histogram of the time spent on specific operation | scheme, operation |
|
||||||
|
/// | opendal_bytes_total | Histogram | Total size of sync or async Read/Write | scheme, operation |
|
||||||
|
///
|
||||||
|
/// For a more detailed explanation of these metrics and how they are used, please refer to the [Prometheus documentation](https://prometheus.io/docs/introduction/overview/).
|
||||||
|
///
|
||||||
|
/// # Histogram Configuration
|
||||||
|
///
|
||||||
|
/// The metric buckets for these histograms are automatically generated based on the `exponential_buckets(0.01, 2.0, 16)` configuration.
|
||||||
|
#[derive(Default, Debug, Clone)]
|
||||||
|
pub struct PrometheusMetricsLayer {
|
||||||
|
pub path_label: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PrometheusMetricsLayer {
|
||||||
|
pub fn new(path_label: bool) -> Self {
|
||||||
|
Self { path_label }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A: Access> Layer<A> for PrometheusMetricsLayer {
|
||||||
|
type LayeredAccess = PrometheusAccess<A>;
|
||||||
|
|
||||||
|
fn layer(&self, inner: A) -> Self::LayeredAccess {
|
||||||
|
let meta = inner.info();
|
||||||
|
let scheme = meta.scheme();
|
||||||
|
|
||||||
|
PrometheusAccess {
|
||||||
|
inner,
|
||||||
|
scheme: scheme.to_string(),
|
||||||
|
path_label: self.path_label,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct PrometheusAccess<A: Access> {
|
||||||
|
inner: A,
|
||||||
|
scheme: String,
|
||||||
|
path_label: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A: Access> PrometheusAccess<A> {
|
||||||
|
fn get_path_label<'a>(&self, path: &'a str) -> &'a str {
|
||||||
|
if self.path_label {
|
||||||
|
extract_parent_path(path)
|
||||||
|
} else {
|
||||||
|
""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A: Access> Debug for PrometheusAccess<A> {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.debug_struct("PrometheusAccessor")
|
||||||
|
.field("inner", &self.inner)
|
||||||
|
.finish_non_exhaustive()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A: Access> LayeredAccess for PrometheusAccess<A> {
|
||||||
|
type Inner = A;
|
||||||
|
type Reader = PrometheusMetricWrapper<A::Reader>;
|
||||||
|
type BlockingReader = PrometheusMetricWrapper<A::BlockingReader>;
|
||||||
|
type Writer = PrometheusMetricWrapper<A::Writer>;
|
||||||
|
type BlockingWriter = PrometheusMetricWrapper<A::BlockingWriter>;
|
||||||
|
type Lister = A::Lister;
|
||||||
|
type BlockingLister = A::BlockingLister;
|
||||||
|
|
||||||
|
fn inner(&self) -> &Self::Inner {
|
||||||
|
&self.inner
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
|
||||||
|
let path_label = self.get_path_label(path);
|
||||||
|
REQUESTS_TOTAL
|
||||||
|
.with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label])
|
||||||
|
.inc();
|
||||||
|
|
||||||
|
let timer = REQUESTS_DURATION_SECONDS
|
||||||
|
.with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label])
|
||||||
|
.start_timer();
|
||||||
|
let create_res = self.inner.create_dir(path, args).await;
|
||||||
|
|
||||||
|
timer.observe_duration();
|
||||||
|
create_res.inspect_err(|e| {
|
||||||
|
increment_errors_total(Operation::CreateDir, e.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
|
||||||
|
let path_label = self.get_path_label(path);
|
||||||
|
REQUESTS_TOTAL
|
||||||
|
.with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label])
|
||||||
|
.inc();
|
||||||
|
|
||||||
|
let timer = REQUESTS_DURATION_SECONDS
|
||||||
|
.with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label])
|
||||||
|
.start_timer();
|
||||||
|
|
||||||
|
let (rp, r) = self.inner.read(path, args).await.inspect_err(|e| {
|
||||||
|
increment_errors_total(Operation::Read, e.kind());
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
rp,
|
||||||
|
PrometheusMetricWrapper::new(
|
||||||
|
r,
|
||||||
|
Operation::Read,
|
||||||
|
BYTES_TOTAL.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::Read.into_static(),
|
||||||
|
path_label,
|
||||||
|
]),
|
||||||
|
timer,
|
||||||
|
),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
|
||||||
|
let path_label = self.get_path_label(path);
|
||||||
|
REQUESTS_TOTAL
|
||||||
|
.with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label])
|
||||||
|
.inc();
|
||||||
|
|
||||||
|
let timer = REQUESTS_DURATION_SECONDS
|
||||||
|
.with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label])
|
||||||
|
.start_timer();
|
||||||
|
|
||||||
|
let (rp, r) = self.inner.write(path, args).await.inspect_err(|e| {
|
||||||
|
increment_errors_total(Operation::Write, e.kind());
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
rp,
|
||||||
|
PrometheusMetricWrapper::new(
|
||||||
|
r,
|
||||||
|
Operation::Write,
|
||||||
|
BYTES_TOTAL.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::Write.into_static(),
|
||||||
|
path_label,
|
||||||
|
]),
|
||||||
|
timer,
|
||||||
|
),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
|
||||||
|
let path_label = self.get_path_label(path);
|
||||||
|
REQUESTS_TOTAL
|
||||||
|
.with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label])
|
||||||
|
.inc();
|
||||||
|
let timer = REQUESTS_DURATION_SECONDS
|
||||||
|
.with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label])
|
||||||
|
.start_timer();
|
||||||
|
|
||||||
|
let stat_res = self.inner.stat(path, args).await;
|
||||||
|
timer.observe_duration();
|
||||||
|
stat_res.inspect_err(|e| {
|
||||||
|
increment_errors_total(Operation::Stat, e.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
|
||||||
|
let path_label = self.get_path_label(path);
|
||||||
|
REQUESTS_TOTAL
|
||||||
|
.with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label])
|
||||||
|
.inc();
|
||||||
|
|
||||||
|
let timer = REQUESTS_DURATION_SECONDS
|
||||||
|
.with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label])
|
||||||
|
.start_timer();
|
||||||
|
|
||||||
|
let delete_res = self.inner.delete(path, args).await;
|
||||||
|
timer.observe_duration();
|
||||||
|
delete_res.inspect_err(|e| {
|
||||||
|
increment_errors_total(Operation::Delete, e.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
|
||||||
|
let path_label = self.get_path_label(path);
|
||||||
|
REQUESTS_TOTAL
|
||||||
|
.with_label_values(&[&self.scheme, Operation::List.into_static(), path_label])
|
||||||
|
.inc();
|
||||||
|
|
||||||
|
let timer = REQUESTS_DURATION_SECONDS
|
||||||
|
.with_label_values(&[&self.scheme, Operation::List.into_static(), path_label])
|
||||||
|
.start_timer();
|
||||||
|
|
||||||
|
let list_res = self.inner.list(path, args).await;
|
||||||
|
|
||||||
|
timer.observe_duration();
|
||||||
|
list_res.inspect_err(|e| {
|
||||||
|
increment_errors_total(Operation::List, e.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
|
||||||
|
REQUESTS_TOTAL
|
||||||
|
.with_label_values(&[&self.scheme, Operation::Batch.into_static(), ""])
|
||||||
|
.inc();
|
||||||
|
|
||||||
|
let timer = REQUESTS_DURATION_SECONDS
|
||||||
|
.with_label_values(&[&self.scheme, Operation::Batch.into_static(), ""])
|
||||||
|
.start_timer();
|
||||||
|
let result = self.inner.batch(args).await;
|
||||||
|
|
||||||
|
timer.observe_duration();
|
||||||
|
result.inspect_err(|e| {
|
||||||
|
increment_errors_total(Operation::Batch, e.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
|
||||||
|
let path_label = self.get_path_label(path);
|
||||||
|
REQUESTS_TOTAL
|
||||||
|
.with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label])
|
||||||
|
.inc();
|
||||||
|
|
||||||
|
let timer = REQUESTS_DURATION_SECONDS
|
||||||
|
.with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label])
|
||||||
|
.start_timer();
|
||||||
|
let result = self.inner.presign(path, args).await;
|
||||||
|
timer.observe_duration();
|
||||||
|
|
||||||
|
result.inspect_err(|e| {
|
||||||
|
increment_errors_total(Operation::Presign, e.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
|
||||||
|
let path_label = self.get_path_label(path);
|
||||||
|
REQUESTS_TOTAL
|
||||||
|
.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::BlockingCreateDir.into_static(),
|
||||||
|
path_label,
|
||||||
|
])
|
||||||
|
.inc();
|
||||||
|
|
||||||
|
let timer = REQUESTS_DURATION_SECONDS
|
||||||
|
.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::BlockingCreateDir.into_static(),
|
||||||
|
path_label,
|
||||||
|
])
|
||||||
|
.start_timer();
|
||||||
|
let result = self.inner.blocking_create_dir(path, args);
|
||||||
|
|
||||||
|
timer.observe_duration();
|
||||||
|
|
||||||
|
result.inspect_err(|e| {
|
||||||
|
increment_errors_total(Operation::BlockingCreateDir, e.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
|
||||||
|
let path_label = self.get_path_label(path);
|
||||||
|
REQUESTS_TOTAL
|
||||||
|
.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::BlockingRead.into_static(),
|
||||||
|
path_label,
|
||||||
|
])
|
||||||
|
.inc();
|
||||||
|
|
||||||
|
let timer = REQUESTS_DURATION_SECONDS
|
||||||
|
.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::BlockingRead.into_static(),
|
||||||
|
path_label,
|
||||||
|
])
|
||||||
|
.start_timer();
|
||||||
|
|
||||||
|
self.inner
|
||||||
|
.blocking_read(path, args)
|
||||||
|
.map(|(rp, r)| {
|
||||||
|
(
|
||||||
|
rp,
|
||||||
|
PrometheusMetricWrapper::new(
|
||||||
|
r,
|
||||||
|
Operation::BlockingRead,
|
||||||
|
BYTES_TOTAL.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::BlockingRead.into_static(),
|
||||||
|
path_label,
|
||||||
|
]),
|
||||||
|
timer,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.inspect_err(|e| {
|
||||||
|
increment_errors_total(Operation::BlockingRead, e.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
|
||||||
|
let path_label = self.get_path_label(path);
|
||||||
|
REQUESTS_TOTAL
|
||||||
|
.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::BlockingWrite.into_static(),
|
||||||
|
path_label,
|
||||||
|
])
|
||||||
|
.inc();
|
||||||
|
|
||||||
|
let timer = REQUESTS_DURATION_SECONDS
|
||||||
|
.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::BlockingWrite.into_static(),
|
||||||
|
path_label,
|
||||||
|
])
|
||||||
|
.start_timer();
|
||||||
|
|
||||||
|
self.inner
|
||||||
|
.blocking_write(path, args)
|
||||||
|
.map(|(rp, r)| {
|
||||||
|
(
|
||||||
|
rp,
|
||||||
|
PrometheusMetricWrapper::new(
|
||||||
|
r,
|
||||||
|
Operation::BlockingWrite,
|
||||||
|
BYTES_TOTAL.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::BlockingWrite.into_static(),
|
||||||
|
path_label,
|
||||||
|
]),
|
||||||
|
timer,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.inspect_err(|e| {
|
||||||
|
increment_errors_total(Operation::BlockingWrite, e.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
|
||||||
|
let path_label = self.get_path_label(path);
|
||||||
|
REQUESTS_TOTAL
|
||||||
|
.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::BlockingStat.into_static(),
|
||||||
|
path_label,
|
||||||
|
])
|
||||||
|
.inc();
|
||||||
|
|
||||||
|
let timer = REQUESTS_DURATION_SECONDS
|
||||||
|
.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::BlockingStat.into_static(),
|
||||||
|
path_label,
|
||||||
|
])
|
||||||
|
.start_timer();
|
||||||
|
let result = self.inner.blocking_stat(path, args);
|
||||||
|
timer.observe_duration();
|
||||||
|
result.inspect_err(|e| {
|
||||||
|
increment_errors_total(Operation::BlockingStat, e.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
|
||||||
|
let path_label = self.get_path_label(path);
|
||||||
|
REQUESTS_TOTAL
|
||||||
|
.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::BlockingDelete.into_static(),
|
||||||
|
path_label,
|
||||||
|
])
|
||||||
|
.inc();
|
||||||
|
|
||||||
|
let timer = REQUESTS_DURATION_SECONDS
|
||||||
|
.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::BlockingDelete.into_static(),
|
||||||
|
path_label,
|
||||||
|
])
|
||||||
|
.start_timer();
|
||||||
|
let result = self.inner.blocking_delete(path, args);
|
||||||
|
timer.observe_duration();
|
||||||
|
|
||||||
|
result.inspect_err(|e| {
|
||||||
|
increment_errors_total(Operation::BlockingDelete, e.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
|
||||||
|
let path_label = self.get_path_label(path);
|
||||||
|
REQUESTS_TOTAL
|
||||||
|
.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::BlockingList.into_static(),
|
||||||
|
path_label,
|
||||||
|
])
|
||||||
|
.inc();
|
||||||
|
|
||||||
|
let timer = REQUESTS_DURATION_SECONDS
|
||||||
|
.with_label_values(&[
|
||||||
|
&self.scheme,
|
||||||
|
Operation::BlockingList.into_static(),
|
||||||
|
path_label,
|
||||||
|
])
|
||||||
|
.start_timer();
|
||||||
|
let result = self.inner.blocking_list(path, args);
|
||||||
|
timer.observe_duration();
|
||||||
|
|
||||||
|
result.inspect_err(|e| {
|
||||||
|
increment_errors_total(Operation::BlockingList, e.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct PrometheusMetricWrapper<R> {
|
||||||
|
inner: R,
|
||||||
|
|
||||||
|
op: Operation,
|
||||||
|
bytes_counter: Histogram,
|
||||||
|
_requests_duration_timer: HistogramTimer,
|
||||||
|
bytes: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R> Drop for PrometheusMetricWrapper<R> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.bytes_counter.observe(self.bytes as f64);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R> PrometheusMetricWrapper<R> {
|
||||||
|
fn new(
|
||||||
|
inner: R,
|
||||||
|
op: Operation,
|
||||||
|
bytes_counter: Histogram,
|
||||||
|
requests_duration_timer: HistogramTimer,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
inner,
|
||||||
|
op,
|
||||||
|
bytes_counter,
|
||||||
|
_requests_duration_timer: requests_duration_timer,
|
||||||
|
bytes: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
|
||||||
|
async fn read(&mut self) -> Result<Buffer> {
|
||||||
|
self.inner.read().await.inspect_err(|err| {
|
||||||
|
increment_errors_total(self.op, err.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
|
||||||
|
fn read(&mut self) -> opendal::Result<Buffer> {
|
||||||
|
self.inner.read().inspect_err(|err| {
|
||||||
|
increment_errors_total(self.op, err.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
|
||||||
|
async fn write(&mut self, bs: Buffer) -> Result<()> {
|
||||||
|
let bytes = bs.len();
|
||||||
|
match self.inner.write(bs).await {
|
||||||
|
Ok(_) => {
|
||||||
|
self.bytes += bytes as u64;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
increment_errors_total(self.op, err.kind());
|
||||||
|
Err(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn close(&mut self) -> Result<()> {
|
||||||
|
self.inner.close().await.inspect_err(|err| {
|
||||||
|
increment_errors_total(self.op, err.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn abort(&mut self) -> Result<()> {
|
||||||
|
self.inner.close().await.inspect_err(|err| {
|
||||||
|
increment_errors_total(self.op, err.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
|
||||||
|
fn write(&mut self, bs: Buffer) -> Result<()> {
|
||||||
|
let bytes = bs.len();
|
||||||
|
self.inner
|
||||||
|
.write(bs)
|
||||||
|
.map(|_| {
|
||||||
|
self.bytes += bytes as u64;
|
||||||
|
})
|
||||||
|
.inspect_err(|err| {
|
||||||
|
increment_errors_total(self.op, err.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn close(&mut self) -> Result<()> {
|
||||||
|
self.inner.close().inspect_err(|err| {
|
||||||
|
increment_errors_total(self.op, err.kind());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -15,12 +15,19 @@
|
|||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
|
|
||||||
use common_telemetry::{debug, error, trace};
|
use common_telemetry::{debug, error, trace};
|
||||||
|
use futures::TryStreamExt;
|
||||||
use opendal::layers::{LoggingInterceptor, LoggingLayer, TracingLayer};
|
use opendal::layers::{LoggingInterceptor, LoggingLayer, TracingLayer};
|
||||||
use opendal::raw::{AccessorInfo, Operation};
|
use opendal::raw::{AccessorInfo, Operation};
|
||||||
use opendal::ErrorKind;
|
use opendal::{Entry, ErrorKind, Lister};
|
||||||
|
|
||||||
|
use crate::layers::PrometheusMetricsLayer;
|
||||||
use crate::ObjectStore;
|
use crate::ObjectStore;
|
||||||
|
|
||||||
|
/// Collect all entries from the [Lister].
|
||||||
|
pub async fn collect(stream: Lister) -> Result<Vec<Entry>, opendal::Error> {
|
||||||
|
stream.try_collect::<Vec<_>>().await
|
||||||
|
}
|
||||||
|
|
||||||
/// Join two paths and normalize the output dir.
|
/// Join two paths and normalize the output dir.
|
||||||
///
|
///
|
||||||
/// The output dir is always ends with `/`. e.g.
|
/// The output dir is always ends with `/`. e.g.
|
||||||
@@ -120,12 +127,26 @@ pub fn normalize_path(path: &str) -> String {
|
|||||||
p
|
p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This logical tries to extract parent path from the object storage operation
|
||||||
|
// the function also relies on assumption that the region path is built from
|
||||||
|
// pattern `<data|index>/catalog/schema/table_id/....`
|
||||||
|
//
|
||||||
|
// this implementation tries to extract at most 3 levels of parent path
|
||||||
|
pub(crate) fn extract_parent_path(path: &str) -> &str {
|
||||||
|
// split the path into `catalog`, `schema` and others
|
||||||
|
path.char_indices()
|
||||||
|
.filter(|&(_, c)| c == '/')
|
||||||
|
// we get the data/catalog/schema from path, split at the 3rd /
|
||||||
|
.nth(2)
|
||||||
|
.map_or(path, |(i, _)| &path[..i])
|
||||||
|
}
|
||||||
|
|
||||||
/// Attaches instrument layers to the object store.
|
/// Attaches instrument layers to the object store.
|
||||||
pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
|
pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore {
|
||||||
object_store
|
object_store
|
||||||
.layer(LoggingLayer::new(DefaultLoggingInterceptor))
|
.layer(LoggingLayer::new(DefaultLoggingInterceptor))
|
||||||
.layer(TracingLayer)
|
.layer(TracingLayer)
|
||||||
.layer(crate::layers::build_prometheus_metrics_layer(path_label))
|
.layer(PrometheusMetricsLayer::new(path_label))
|
||||||
}
|
}
|
||||||
|
|
||||||
static LOGGING_TARGET: &str = "opendal::services";
|
static LOGGING_TARGET: &str = "opendal::services";
|
||||||
@@ -242,4 +263,28 @@ mod tests {
|
|||||||
assert_eq!("/abc", join_path("//", "/abc"));
|
assert_eq!("/abc", join_path("//", "/abc"));
|
||||||
assert_eq!("abc/def", join_path("abc/", "//def"));
|
assert_eq!("abc/def", join_path("abc/", "//def"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_path_extraction() {
|
||||||
|
assert_eq!(
|
||||||
|
"data/greptime/public",
|
||||||
|
extract_parent_path("data/greptime/public/1024/1024_0000000000/")
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
"data/greptime/public",
|
||||||
|
extract_parent_path("data/greptime/public/1/")
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
"data/greptime/public",
|
||||||
|
extract_parent_path("data/greptime/public")
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!("data/greptime/", extract_parent_path("data/greptime/"));
|
||||||
|
|
||||||
|
assert_eq!("data/", extract_parent_path("data/"));
|
||||||
|
|
||||||
|
assert_eq!("/", extract_parent_path("/"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -65,38 +65,23 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> {
|
|||||||
store.write(p3, "Hello, object3!").await?;
|
store.write(p3, "Hello, object3!").await?;
|
||||||
|
|
||||||
// List objects
|
// List objects
|
||||||
let entries = store
|
let entries = store.list("/").await?;
|
||||||
.list("/")
|
|
||||||
.await?
|
|
||||||
.into_iter()
|
|
||||||
.filter(|x| x.metadata().mode() == EntryMode::FILE)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
assert_eq!(3, entries.len());
|
assert_eq!(3, entries.len());
|
||||||
|
|
||||||
store.delete(p1).await?;
|
store.delete(p1).await?;
|
||||||
store.delete(p3).await?;
|
store.delete(p3).await?;
|
||||||
|
|
||||||
// List objects again
|
// List objects again
|
||||||
// Only o2 and root exist
|
// Only o2 is exists
|
||||||
let entries = store
|
let entries = store.list("/").await?;
|
||||||
.list("/")
|
|
||||||
.await?
|
|
||||||
.into_iter()
|
|
||||||
.filter(|x| x.metadata().mode() == EntryMode::FILE)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
assert_eq!(1, entries.len());
|
assert_eq!(1, entries.len());
|
||||||
assert_eq!(p2, entries[0].path());
|
assert_eq!(p2, entries.first().unwrap().path());
|
||||||
|
|
||||||
let content = store.read(p2).await?;
|
let content = store.read(p2).await?;
|
||||||
assert_eq!("Hello, object2!", String::from_utf8(content.to_vec())?);
|
assert_eq!("Hello, object2!", String::from_utf8(content.to_vec())?);
|
||||||
|
|
||||||
store.delete(p2).await?;
|
store.delete(p2).await?;
|
||||||
let entries = store
|
let entries = store.list("/").await?;
|
||||||
.list("/")
|
|
||||||
.await?
|
|
||||||
.into_iter()
|
|
||||||
.filter(|x| x.metadata().mode() == EntryMode::FILE)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
assert!(entries.is_empty());
|
assert!(entries.is_empty());
|
||||||
|
|
||||||
assert!(store.read(p1).await.is_err());
|
assert!(store.read(p1).await.is_err());
|
||||||
@@ -267,7 +252,7 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
|
|||||||
|
|
||||||
async fn assert_lru_cache<C: Access>(cache_layer: &LruCacheLayer<C>, file_names: &[&str]) {
|
async fn assert_lru_cache<C: Access>(cache_layer: &LruCacheLayer<C>, file_names: &[&str]) {
|
||||||
for file_name in file_names {
|
for file_name in file_names {
|
||||||
assert!(cache_layer.contains_file(file_name).await, "{file_name}");
|
assert!(cache_layer.contains_file(file_name).await);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -279,9 +264,7 @@ async fn assert_cache_files<C: Access>(
|
|||||||
let (_, mut lister) = store.list("/", OpList::default()).await?;
|
let (_, mut lister) = store.list("/", OpList::default()).await?;
|
||||||
let mut objects = vec![];
|
let mut objects = vec![];
|
||||||
while let Some(e) = lister.next().await? {
|
while let Some(e) = lister.next().await? {
|
||||||
if e.mode() == EntryMode::FILE {
|
objects.push(e);
|
||||||
objects.push(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// compare the cache file with the expected cache file; ignore orders
|
// compare the cache file with the expected cache file; ignore orders
|
||||||
@@ -349,9 +332,9 @@ async fn test_object_store_cache_policy() -> Result<()> {
|
|||||||
assert_cache_files(
|
assert_cache_files(
|
||||||
&cache_store,
|
&cache_store,
|
||||||
&[
|
&[
|
||||||
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
|
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
|
||||||
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-",
|
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14",
|
||||||
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-",
|
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14",
|
||||||
],
|
],
|
||||||
&["Hello, object1!", "object2!", "Hello, object2!"],
|
&["Hello, object1!", "object2!", "Hello, object2!"],
|
||||||
)
|
)
|
||||||
@@ -359,9 +342,9 @@ async fn test_object_store_cache_policy() -> Result<()> {
|
|||||||
assert_lru_cache(
|
assert_lru_cache(
|
||||||
&cache_layer,
|
&cache_layer,
|
||||||
&[
|
&[
|
||||||
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
|
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
|
||||||
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-",
|
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14",
|
||||||
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-",
|
"ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@@ -372,13 +355,13 @@ async fn test_object_store_cache_policy() -> Result<()> {
|
|||||||
assert_eq!(cache_layer.read_cache_stat().await, (1, 15));
|
assert_eq!(cache_layer.read_cache_stat().await, (1, 15));
|
||||||
assert_cache_files(
|
assert_cache_files(
|
||||||
&cache_store,
|
&cache_store,
|
||||||
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"],
|
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14"],
|
||||||
&["Hello, object1!"],
|
&["Hello, object1!"],
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
assert_lru_cache(
|
assert_lru_cache(
|
||||||
&cache_layer,
|
&cache_layer,
|
||||||
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"],
|
&["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14"],
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -405,8 +388,8 @@ async fn test_object_store_cache_policy() -> Result<()> {
|
|||||||
assert_cache_files(
|
assert_cache_files(
|
||||||
&cache_store,
|
&cache_store,
|
||||||
&[
|
&[
|
||||||
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
|
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
|
||||||
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
|
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
|
||||||
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
|
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
|
||||||
],
|
],
|
||||||
&["Hello, object1!", "Hello, object3!", "Hello"],
|
&["Hello, object1!", "Hello, object3!", "Hello"],
|
||||||
@@ -415,8 +398,8 @@ async fn test_object_store_cache_policy() -> Result<()> {
|
|||||||
assert_lru_cache(
|
assert_lru_cache(
|
||||||
&cache_layer,
|
&cache_layer,
|
||||||
&[
|
&[
|
||||||
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-",
|
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14",
|
||||||
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
|
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
|
||||||
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
|
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@@ -433,7 +416,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
|
|||||||
&cache_store,
|
&cache_store,
|
||||||
&[
|
&[
|
||||||
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
|
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
|
||||||
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
|
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
|
||||||
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
|
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
|
||||||
],
|
],
|
||||||
&["ello, object1!", "Hello, object3!", "Hello"],
|
&["ello, object1!", "Hello, object3!", "Hello"],
|
||||||
@@ -443,7 +426,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
|
|||||||
&cache_layer,
|
&cache_layer,
|
||||||
&[
|
&[
|
||||||
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
|
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
|
||||||
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
|
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
|
||||||
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
|
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@@ -465,7 +448,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
|
|||||||
&cache_layer,
|
&cache_layer,
|
||||||
&[
|
&[
|
||||||
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
|
"6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14",
|
||||||
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-",
|
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14",
|
||||||
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
|
"a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|||||||
Reference in New Issue
Block a user