mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 12:30:38 +00:00
add simple metrics for remote storage operations
track number of operations and number of their failures
This commit is contained in:
committed by
Dmitry Rodionov
parent
06f5e017a1
commit
5a5737278e
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2394,6 +2394,8 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"rusoto_core",
|
||||
"rusoto_s3",
|
||||
"serde",
|
||||
|
||||
@@ -5,14 +5,17 @@ edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
tokio = { version = "1.17", features = ["sync", "macros", "fs", "io-util"] }
|
||||
tokio-util = { version = "0.7", features = ["io"] }
|
||||
tracing = "0.1.27"
|
||||
async-trait = "0.1"
|
||||
|
||||
metrics = { version = "0.1", path = "../metrics" }
|
||||
once_cell = "1.8.0"
|
||||
rusoto_core = "0.48"
|
||||
rusoto_s3 = "0.48"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
async-trait = "0.1"
|
||||
tokio = { version = "1.17", features = ["sync", "macros", "fs", "io-util"] }
|
||||
tokio-util = { version = "0.7", features = ["io"] }
|
||||
tracing = "0.1.27"
|
||||
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
|
||||
@@ -23,6 +23,71 @@ use crate::{strip_path_prefix, RemoteStorage, S3Config};
|
||||
|
||||
use super::StorageMetadata;
|
||||
|
||||
pub(super) mod metrics {
|
||||
use metrics::{register_int_counter_vec, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
static S3_REQUESTS_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"remote_storage_s3_requests_count",
|
||||
"Number of s3 requests of particular type",
|
||||
&["request_type"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static S3_REQUESTS_FAIL_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"remote_storage_s3_failures_count",
|
||||
"Number of failed s3 requests of particular type",
|
||||
&["request_type"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub fn inc_get_object() {
|
||||
S3_REQUESTS_COUNT.with_label_values(&["get_object"]).inc();
|
||||
}
|
||||
|
||||
pub fn inc_get_object_fail() {
|
||||
S3_REQUESTS_FAIL_COUNT
|
||||
.with_label_values(&["get_object"])
|
||||
.inc();
|
||||
}
|
||||
|
||||
pub fn inc_put_object() {
|
||||
S3_REQUESTS_COUNT.with_label_values(&["put_object"]).inc();
|
||||
}
|
||||
|
||||
pub fn inc_put_object_fail() {
|
||||
S3_REQUESTS_FAIL_COUNT
|
||||
.with_label_values(&["put_object"])
|
||||
.inc();
|
||||
}
|
||||
|
||||
pub fn inc_delete_object() {
|
||||
S3_REQUESTS_COUNT
|
||||
.with_label_values(&["delete_object"])
|
||||
.inc();
|
||||
}
|
||||
|
||||
pub fn inc_delete_object_fail() {
|
||||
S3_REQUESTS_FAIL_COUNT
|
||||
.with_label_values(&["delete_object"])
|
||||
.inc();
|
||||
}
|
||||
|
||||
pub fn inc_list_objects() {
|
||||
S3_REQUESTS_COUNT.with_label_values(&["list_objects"]).inc();
|
||||
}
|
||||
|
||||
pub fn inc_list_objects_fail() {
|
||||
S3_REQUESTS_FAIL_COUNT
|
||||
.with_label_values(&["list_objects"])
|
||||
.inc();
|
||||
}
|
||||
}
|
||||
|
||||
const S3_PREFIX_SEPARATOR: char = '/';
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Hash)]
|
||||
@@ -152,6 +217,9 @@ impl RemoteStorage for S3Bucket {
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 list")?;
|
||||
|
||||
metrics::inc_list_objects();
|
||||
|
||||
let fetch_response = self
|
||||
.client
|
||||
.list_objects_v2(ListObjectsV2Request {
|
||||
@@ -160,7 +228,11 @@ impl RemoteStorage for S3Bucket {
|
||||
continuation_token,
|
||||
..ListObjectsV2Request::default()
|
||||
})
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| {
|
||||
metrics::inc_list_objects_fail();
|
||||
e
|
||||
})?;
|
||||
document_keys.extend(
|
||||
fetch_response
|
||||
.contents
|
||||
@@ -190,6 +262,8 @@ impl RemoteStorage for S3Bucket {
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 upload")?;
|
||||
|
||||
metrics::inc_put_object();
|
||||
self.client
|
||||
.put_object(PutObjectRequest {
|
||||
body: Some(StreamingBody::new_with_size(
|
||||
@@ -201,7 +275,11 @@ impl RemoteStorage for S3Bucket {
|
||||
metadata: metadata.map(|m| m.0),
|
||||
..PutObjectRequest::default()
|
||||
})
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| {
|
||||
metrics::inc_put_object_fail();
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -215,6 +293,9 @@ impl RemoteStorage for S3Bucket {
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 download")?;
|
||||
|
||||
metrics::inc_get_object();
|
||||
|
||||
let object_output = self
|
||||
.client
|
||||
.get_object(GetObjectRequest {
|
||||
@@ -222,7 +303,11 @@ impl RemoteStorage for S3Bucket {
|
||||
key: from.key().to_owned(),
|
||||
..GetObjectRequest::default()
|
||||
})
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| {
|
||||
metrics::inc_get_object_fail();
|
||||
e
|
||||
})?;
|
||||
|
||||
if let Some(body) = object_output.body {
|
||||
let mut from = io::BufReader::new(body.into_async_read());
|
||||
@@ -251,6 +336,9 @@ impl RemoteStorage for S3Bucket {
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 range download")?;
|
||||
|
||||
metrics::inc_get_object();
|
||||
|
||||
let object_output = self
|
||||
.client
|
||||
.get_object(GetObjectRequest {
|
||||
@@ -259,7 +347,11 @@ impl RemoteStorage for S3Bucket {
|
||||
range,
|
||||
..GetObjectRequest::default()
|
||||
})
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| {
|
||||
metrics::inc_get_object_fail();
|
||||
e
|
||||
})?;
|
||||
|
||||
if let Some(body) = object_output.body {
|
||||
let mut from = io::BufReader::new(body.into_async_read());
|
||||
@@ -275,13 +367,20 @@ impl RemoteStorage for S3Bucket {
|
||||
.acquire()
|
||||
.await
|
||||
.context("Concurrency limiter semaphore got closed during S3 delete")?;
|
||||
|
||||
metrics::inc_delete_object();
|
||||
|
||||
self.client
|
||||
.delete_object(DeleteObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: path.key().to_owned(),
|
||||
..DeleteObjectRequest::default()
|
||||
})
|
||||
.await?;
|
||||
.await
|
||||
.map_err(|e| {
|
||||
metrics::inc_delete_object_fail();
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user