From 5a5737278e637245d0b7b89a20b47040d2572a0e Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Wed, 25 May 2022 23:10:44 +0300 Subject: [PATCH] add simple metrics for remote storage operations track number of operations and number of their failures --- Cargo.lock | 2 + libs/remote_storage/Cargo.toml | 11 ++- libs/remote_storage/src/s3_bucket.rs | 109 +++++++++++++++++++++++++-- 3 files changed, 113 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6acad6dac8..840953f645 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2394,6 +2394,8 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "metrics", + "once_cell", "rusoto_core", "rusoto_s3", "serde", diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 291f6e50ac..5c62e28fda 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -5,14 +5,17 @@ edition = "2021" [dependencies] anyhow = { version = "1.0", features = ["backtrace"] } -tokio = { version = "1.17", features = ["sync", "macros", "fs", "io-util"] } -tokio-util = { version = "0.7", features = ["io"] } -tracing = "0.1.27" +async-trait = "0.1" + +metrics = { version = "0.1", path = "../metrics" } +once_cell = "1.8.0" rusoto_core = "0.48" rusoto_s3 = "0.48" serde = { version = "1.0", features = ["derive"] } serde_json = "1" -async-trait = "0.1" +tokio = { version = "1.17", features = ["sync", "macros", "fs", "io-util"] } +tokio-util = { version = "0.7", features = ["io"] } +tracing = "0.1.27" workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 01aaf7ca7e..80d6966494 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -23,6 +23,71 @@ use crate::{strip_path_prefix, RemoteStorage, S3Config}; use super::StorageMetadata; +pub(super) mod metrics { + use metrics::{register_int_counter_vec, IntCounterVec}; + use once_cell::sync::Lazy; + + static S3_REQUESTS_COUNT: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "remote_storage_s3_requests_count", + "Number of s3 requests of particular type", + &["request_type"], + ) + .expect("failed to define a metric") + }); + + static S3_REQUESTS_FAIL_COUNT: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "remote_storage_s3_failures_count", + "Number of failed s3 requests of particular type", + &["request_type"], + ) + .expect("failed to define a metric") + }); + + pub fn inc_get_object() { + S3_REQUESTS_COUNT.with_label_values(&["get_object"]).inc(); + } + + pub fn inc_get_object_fail() { + S3_REQUESTS_FAIL_COUNT + .with_label_values(&["get_object"]) + .inc(); + } + + pub fn inc_put_object() { + S3_REQUESTS_COUNT.with_label_values(&["put_object"]).inc(); + } + + pub fn inc_put_object_fail() { + S3_REQUESTS_FAIL_COUNT + .with_label_values(&["put_object"]) + .inc(); + } + + pub fn inc_delete_object() { + S3_REQUESTS_COUNT + .with_label_values(&["delete_object"]) + .inc(); + } + + pub fn inc_delete_object_fail() { + S3_REQUESTS_FAIL_COUNT + .with_label_values(&["delete_object"]) + .inc(); + } + + pub fn inc_list_objects() { + S3_REQUESTS_COUNT.with_label_values(&["list_objects"]).inc(); + } + + pub fn inc_list_objects_fail() { + S3_REQUESTS_FAIL_COUNT + .with_label_values(&["list_objects"]) + .inc(); + } +} + const S3_PREFIX_SEPARATOR: char = '/'; #[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Hash)] @@ -152,6 +217,9 @@ impl RemoteStorage for S3Bucket { .acquire() .await .context("Concurrency limiter semaphore got closed during S3 list")?; + + metrics::inc_list_objects(); + let fetch_response = self .client .list_objects_v2(ListObjectsV2Request { @@ -160,7 +228,11 @@ impl RemoteStorage for S3Bucket { continuation_token, ..ListObjectsV2Request::default() }) - .await?; + .await + .map_err(|e| { + metrics::inc_list_objects_fail(); + e + })?; document_keys.extend( fetch_response .contents @@ -190,6 +262,8 @@ impl RemoteStorage for S3Bucket { .acquire() .await .context("Concurrency limiter semaphore got closed during S3 upload")?; + + metrics::inc_put_object(); self.client .put_object(PutObjectRequest { body: Some(StreamingBody::new_with_size( @@ -201,7 +275,11 @@ impl RemoteStorage for S3Bucket { metadata: metadata.map(|m| m.0), ..PutObjectRequest::default() }) - .await?; + .await + .map_err(|e| { + metrics::inc_put_object_fail(); + e + })?; Ok(()) } @@ -215,6 +293,9 @@ impl RemoteStorage for S3Bucket { .acquire() .await .context("Concurrency limiter semaphore got closed during S3 download")?; + + metrics::inc_get_object(); + let object_output = self .client .get_object(GetObjectRequest { @@ -222,7 +303,11 @@ impl RemoteStorage for S3Bucket { key: from.key().to_owned(), ..GetObjectRequest::default() }) - .await?; + .await + .map_err(|e| { + metrics::inc_get_object_fail(); + e + })?; if let Some(body) = object_output.body { let mut from = io::BufReader::new(body.into_async_read()); @@ -251,6 +336,9 @@ impl RemoteStorage for S3Bucket { .acquire() .await .context("Concurrency limiter semaphore got closed during S3 range download")?; + + metrics::inc_get_object(); + let object_output = self .client .get_object(GetObjectRequest { @@ -259,7 +347,11 @@ impl RemoteStorage for S3Bucket { range, ..GetObjectRequest::default() }) - .await?; + .await + .map_err(|e| { + metrics::inc_get_object_fail(); + e + })?; if let Some(body) = object_output.body { let mut from = io::BufReader::new(body.into_async_read()); @@ -275,13 +367,20 @@ impl RemoteStorage for S3Bucket { .acquire() .await .context("Concurrency limiter semaphore got closed during S3 delete")?; + + metrics::inc_delete_object(); + self.client .delete_object(DeleteObjectRequest { bucket: self.bucket_name.clone(), key: path.key().to_owned(), ..DeleteObjectRequest::default() }) - .await?; + .await + .map_err(|e| { + metrics::inc_delete_object_fail(); + e + })?; Ok(()) } }