diff --git a/Cargo.lock b/Cargo.lock index 4d59f400de..c18bddfbcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3253,6 +3253,7 @@ dependencies = [ "metrics", "once_cell", "pin-project-lite", + "scopeguard", "serde", "serde_json", "tempfile", diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index 0877a38dd9..a4adae6146 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -20,6 +20,7 @@ tokio = { workspace = true, features = ["sync", "fs", "io-util"] } tokio-util.workspace = true toml_edit.workspace = true tracing.workspace = true +scopeguard.workspace = true metrics.workspace = true utils.workspace = true pin-project-lite.workspace = true diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 37a6bf23e8..b79d8566bb 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -22,6 +22,7 @@ use aws_sdk_s3::{ }; use aws_smithy_http::body::SdkBody; use hyper::Body; +use scopeguard::ScopeGuard; use tokio::{ io::{self, AsyncRead}, sync::Semaphore, @@ -36,82 +37,9 @@ use crate::{ const MAX_DELETE_OBJECTS_REQUEST_SIZE: usize = 1000; -pub(super) mod metrics { - use metrics::{register_int_counter_vec, IntCounterVec}; - use once_cell::sync::Lazy; +pub(super) mod metrics; - static S3_REQUESTS_COUNT: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "remote_storage_s3_requests_count", - "Number of s3 requests of particular type", - &["request_type"], - ) - .expect("failed to define a metric") - }); - - static S3_REQUESTS_FAIL_COUNT: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "remote_storage_s3_failures_count", - "Number of failed s3 requests of particular type", - &["request_type"], - ) - .expect("failed to define a metric") - }); - - pub fn inc_get_object() { - S3_REQUESTS_COUNT.with_label_values(&["get_object"]).inc(); - } - - pub fn inc_get_object_fail() { - S3_REQUESTS_FAIL_COUNT - .with_label_values(&["get_object"]) - .inc(); - } - - pub fn inc_put_object() { - S3_REQUESTS_COUNT.with_label_values(&["put_object"]).inc(); - } - - pub fn inc_put_object_fail() { - S3_REQUESTS_FAIL_COUNT - .with_label_values(&["put_object"]) - .inc(); - } - - pub fn inc_delete_object() { - S3_REQUESTS_COUNT - .with_label_values(&["delete_object"]) - .inc(); - } - - pub fn inc_delete_objects(count: u64) { - S3_REQUESTS_COUNT - .with_label_values(&["delete_object"]) - .inc_by(count); - } - - pub fn inc_delete_object_fail() { - S3_REQUESTS_FAIL_COUNT - .with_label_values(&["delete_object"]) - .inc(); - } - - pub fn inc_delete_objects_fail(count: u64) { - S3_REQUESTS_FAIL_COUNT - .with_label_values(&["delete_object"]) - .inc_by(count); - } - - pub fn inc_list_objects() { - S3_REQUESTS_COUNT.with_label_values(&["list_objects"]).inc(); - } - - pub fn inc_list_objects_fail() { - S3_REQUESTS_FAIL_COUNT - .with_label_values(&["list_objects"]) - .inc(); - } -} +use self::metrics::{AttemptOutcome, RequestKind}; /// AWS S3 storage. pub struct S3Bucket { @@ -213,17 +141,46 @@ impl S3Bucket { } } - async fn download_object(&self, request: GetObjectRequest) -> Result { + async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> { + let started_at = start_counting_cancelled_wait(kind); + let permit = self + .concurrency_limiter + .acquire() + .await + .expect("semaphore is never closed"); + + let started_at = ScopeGuard::into_inner(started_at); + metrics::BUCKET_METRICS + .wait_seconds + .observe_elapsed(kind, started_at); + + permit + } + + async fn owned_permit(&self, kind: RequestKind) -> tokio::sync::OwnedSemaphorePermit { + let started_at = start_counting_cancelled_wait(kind); let permit = self .concurrency_limiter .clone() .acquire_owned() .await - .context("Concurrency limiter semaphore got closed during S3 download") - .map_err(DownloadError::Other)?; + .expect("semaphore is never closed"); + + let started_at = ScopeGuard::into_inner(started_at); + metrics::BUCKET_METRICS + .wait_seconds + .observe_elapsed(kind, started_at); + permit + } + + async fn download_object(&self, request: GetObjectRequest) -> Result { + let kind = RequestKind::Get; + let permit = self.owned_permit(kind).await; metrics::inc_get_object(); + let started_at = start_measuring_requests(kind); + let get_object = self .client .get_object() @@ -233,26 +190,34 @@ impl S3Bucket { .send() .await; + let started_at = ScopeGuard::into_inner(started_at); + + if get_object.is_err() { + metrics::inc_get_object_fail(); + metrics::BUCKET_METRICS.req_seconds.observe_elapsed( + kind, + AttemptOutcome::Err, + started_at, + ); + } + match get_object { Ok(object_output) => { let metadata = object_output.metadata().cloned().map(StorageMetadata); Ok(Download { metadata, - download_stream: Box::pin(io::BufReader::new(RatelimitedAsyncRead::new( - permit, - object_output.body.into_async_read(), + download_stream: Box::pin(io::BufReader::new(TimedDownload::new( + started_at, + RatelimitedAsyncRead::new(permit, object_output.body.into_async_read()), ))), }) } Err(SdkError::ServiceError(e)) if matches!(e.err(), GetObjectError::NoSuchKey(_)) => { Err(DownloadError::NotFound) } - Err(e) => { - metrics::inc_get_object_fail(); - Err(DownloadError::Other(anyhow::anyhow!( - "Failed to download S3 object: {e}" - ))) - } + Err(e) => Err(DownloadError::Other( + anyhow::Error::new(e).context("download s3 object"), + )), } } } @@ -283,6 +248,54 @@ impl AsyncRead for RatelimitedAsyncRead { } } +pin_project_lite::pin_project! { + /// Times and tracks the outcome of the request. + struct TimedDownload { + started_at: std::time::Instant, + outcome: metrics::AttemptOutcome, + #[pin] + inner: S + } + + impl PinnedDrop for TimedDownload { + fn drop(mut this: Pin<&mut Self>) { + metrics::BUCKET_METRICS.req_seconds.observe_elapsed(RequestKind::Get, this.outcome, this.started_at); + } + } +} + +impl TimedDownload { + fn new(started_at: std::time::Instant, inner: S) -> Self { + TimedDownload { + started_at, + outcome: metrics::AttemptOutcome::Cancelled, + inner, + } + } +} + +impl AsyncRead for TimedDownload { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut io::ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.project(); + let before = buf.filled().len(); + let read = std::task::ready!(this.inner.poll_read(cx, buf)); + + let read_eof = buf.filled().len() == before; + + match read { + Ok(()) if read_eof => *this.outcome = AttemptOutcome::Ok, + Ok(()) => { /* still in progress */ } + Err(_) => *this.outcome = AttemptOutcome::Err, + } + + std::task::Poll::Ready(read) + } +} + #[async_trait::async_trait] impl RemoteStorage for S3Bucket { /// See the doc for `RemoteStorage::list_prefixes` @@ -291,6 +304,8 @@ impl RemoteStorage for S3Bucket { &self, prefix: Option<&RemotePath>, ) -> Result, DownloadError> { + let kind = RequestKind::List; + // get the passed prefix or if it is not set use prefix_in_bucket value let list_prefix = prefix .map(|p| self.relative_path_to_s3_object(p)) @@ -307,15 +322,11 @@ impl RemoteStorage for S3Bucket { let mut document_keys = Vec::new(); let mut continuation_token = None; - loop { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 list") - .map_err(DownloadError::Other)?; + loop { + let _guard = self.permit(kind).await; metrics::inc_list_objects(); + let started_at = start_measuring_requests(kind); let fetch_response = self .client @@ -332,7 +343,15 @@ impl RemoteStorage for S3Bucket { e }) .context("Failed to list S3 prefixes") - .map_err(DownloadError::Other)?; + .map_err(DownloadError::Other); + + let started_at = ScopeGuard::into_inner(started_at); + + metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &fetch_response, started_at); + + let fetch_response = fetch_response?; document_keys.extend( fetch_response @@ -342,10 +361,10 @@ impl RemoteStorage for S3Bucket { .filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))), ); - match fetch_response.next_continuation_token { - Some(new_token) => continuation_token = Some(new_token), + continuation_token = match fetch_response.next_continuation_token { + Some(new_token) => Some(new_token), None => break, - } + }; } Ok(document_keys) @@ -353,6 +372,8 @@ impl RemoteStorage for S3Bucket { /// See the doc for `RemoteStorage::list_files` async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + let kind = RequestKind::List; + let folder_name = folder .map(|p| self.relative_path_to_s3_object(p)) .or_else(|| self.prefix_in_bucket.clone()); @@ -361,12 +382,9 @@ impl RemoteStorage for S3Bucket { let mut continuation_token = None; let mut all_files = vec![]; loop { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 list_files")?; + let _guard = self.permit(kind).await; metrics::inc_list_objects(); + let started_at = start_measuring_requests(kind); let response = self .client @@ -381,7 +399,14 @@ impl RemoteStorage for S3Bucket { metrics::inc_list_objects_fail(); e }) - .context("Failed to list files in S3 bucket")?; + .context("Failed to list files in S3 bucket"); + + let started_at = ScopeGuard::into_inner(started_at); + metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &response, started_at); + + let response = response?; for object in response.contents().unwrap_or_default() { let object_path = object.key().expect("response does not contain a key"); @@ -403,18 +428,17 @@ impl RemoteStorage for S3Bucket { to: &RemotePath, metadata: Option, ) -> anyhow::Result<()> { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 upload")?; + let kind = RequestKind::Put; + let _guard = self.permit(kind).await; metrics::inc_put_object(); + let started_at = start_measuring_requests(kind); let body = Body::wrap_stream(ReaderStream::new(from)); let bytes_stream = ByteStream::new(SdkBody::from(body)); - self.client + let res = self + .client .put_object() .bucket(self.bucket_name.clone()) .key(self.relative_path_to_s3_object(to)) @@ -426,7 +450,15 @@ impl RemoteStorage for S3Bucket { .map_err(|e| { metrics::inc_put_object_fail(); e - })?; + }); + + let started_at = ScopeGuard::into_inner(started_at); + metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &res, started_at); + + res?; + Ok(()) } @@ -463,11 +495,8 @@ impl RemoteStorage for S3Bucket { .await } async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 delete")?; + let kind = RequestKind::Delete; + let _guard = self.permit(kind).await; let mut delete_objects = Vec::with_capacity(paths.len()); for path in paths { @@ -479,6 +508,7 @@ impl RemoteStorage for S3Bucket { for chunk in delete_objects.chunks(MAX_DELETE_OBJECTS_REQUEST_SIZE) { metrics::inc_delete_objects(chunk.len() as u64); + let started_at = start_measuring_requests(kind); let resp = self .client @@ -488,6 +518,11 @@ impl RemoteStorage for S3Bucket { .send() .await; + let started_at = ScopeGuard::into_inner(started_at); + metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &resp, started_at); + match resp { Ok(resp) => { if let Some(errors) = resp.errors { @@ -508,15 +543,14 @@ impl RemoteStorage for S3Bucket { } async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { - let _guard = self - .concurrency_limiter - .acquire() - .await - .context("Concurrency limiter semaphore got closed during S3 delete")?; + let kind = RequestKind::Delete; + let _guard = self.permit(kind).await; metrics::inc_delete_object(); + let started_at = start_measuring_requests(kind); - self.client + let res = self + .client .delete_object() .bucket(self.bucket_name.clone()) .key(self.relative_path_to_s3_object(path)) @@ -525,11 +559,41 @@ impl RemoteStorage for S3Bucket { .map_err(|e| { metrics::inc_delete_object_fail(); e - })?; + }); + + let started_at = ScopeGuard::into_inner(started_at); + metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &res, started_at); + + res?; + Ok(()) } } +/// On drop (cancellation) count towards [`metrics::BucketMetrics::cancelled_waits`]. +fn start_counting_cancelled_wait( + kind: RequestKind, +) -> ScopeGuard { + scopeguard::guard_on_success(std::time::Instant::now(), move |_| { + metrics::BUCKET_METRICS.cancelled_waits.get(kind).inc() + }) +} + +/// On drop (cancellation) add time to [`metrics::BucketMetrics::req_seconds`]. +fn start_measuring_requests( + kind: RequestKind, +) -> ScopeGuard { + scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| { + metrics::BUCKET_METRICS.req_seconds.observe_elapsed( + kind, + AttemptOutcome::Cancelled, + started_at, + ) + }) +} + #[cfg(test)] mod tests { use std::num::NonZeroUsize; diff --git a/libs/remote_storage/src/s3_bucket/metrics.rs b/libs/remote_storage/src/s3_bucket/metrics.rs new file mode 100644 index 0000000000..4b7562867a --- /dev/null +++ b/libs/remote_storage/src/s3_bucket/metrics.rs @@ -0,0 +1,243 @@ +use metrics::{register_histogram_vec, register_int_counter_vec, Histogram, IntCounter}; +use once_cell::sync::Lazy; + +pub(super) static BUCKET_METRICS: Lazy = Lazy::new(Default::default); + +#[derive(Clone, Copy, Debug)] +pub(super) enum RequestKind { + Get = 0, + Put = 1, + Delete = 2, + List = 3, +} + +use RequestKind::*; + +impl RequestKind { + const fn as_str(&self) -> &'static str { + match self { + Get => "get_object", + Put => "put_object", + Delete => "delete_object", + List => "list_objects", + } + } + const fn as_index(&self) -> usize { + *self as usize + } +} + +pub(super) struct RequestTyped([C; 4]); + +impl RequestTyped { + pub(super) fn get(&self, kind: RequestKind) -> &C { + &self.0[kind.as_index()] + } + + fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self { + use RequestKind::*; + let mut it = [Get, Put, Delete, List].into_iter(); + let arr = std::array::from_fn::(|index| { + let next = it.next().unwrap(); + assert_eq!(index, next.as_index()); + f(next) + }); + + if let Some(next) = it.next() { + panic!("unexpected {next:?}"); + } + + RequestTyped(arr) + } +} + +impl RequestTyped { + pub(super) fn observe_elapsed(&self, kind: RequestKind, started_at: std::time::Instant) { + self.get(kind).observe(started_at.elapsed().as_secs_f64()) + } +} + +pub(super) struct PassFailCancelledRequestTyped { + success: RequestTyped, + fail: RequestTyped, + cancelled: RequestTyped, +} + +#[derive(Debug, Clone, Copy)] +pub(super) enum AttemptOutcome { + Ok, + Err, + Cancelled, +} + +impl From<&Result> for AttemptOutcome { + fn from(value: &Result) -> Self { + match value { + Ok(_) => AttemptOutcome::Ok, + Err(_) => AttemptOutcome::Err, + } + } +} + +impl AttemptOutcome { + pub(super) fn as_str(&self) -> &'static str { + match self { + AttemptOutcome::Ok => "ok", + AttemptOutcome::Err => "err", + AttemptOutcome::Cancelled => "cancelled", + } + } +} + +impl PassFailCancelledRequestTyped { + pub(super) fn get(&self, kind: RequestKind, outcome: AttemptOutcome) -> &C { + let target = match outcome { + AttemptOutcome::Ok => &self.success, + AttemptOutcome::Err => &self.fail, + AttemptOutcome::Cancelled => &self.cancelled, + }; + target.get(kind) + } + + fn build_with(mut f: impl FnMut(RequestKind, AttemptOutcome) -> C) -> Self { + let success = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Ok)); + let fail = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Err)); + let cancelled = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Cancelled)); + + PassFailCancelledRequestTyped { + success, + fail, + cancelled, + } + } +} + +impl PassFailCancelledRequestTyped { + pub(super) fn observe_elapsed( + &self, + kind: RequestKind, + outcome: impl Into, + started_at: std::time::Instant, + ) { + self.get(kind, outcome.into()) + .observe(started_at.elapsed().as_secs_f64()) + } +} + +pub(super) struct BucketMetrics { + /// Total requests attempted + // TODO: remove after next release and migrate dashboards to `sum by (result) (remote_storage_s3_requests_count)` + requests: RequestTyped, + /// Subset of attempted requests failed + // TODO: remove after next release and migrate dashboards to `remote_storage_s3_requests_count{result="err"}` + failed: RequestTyped, + + pub(super) req_seconds: PassFailCancelledRequestTyped, + pub(super) wait_seconds: RequestTyped, + + /// Track how many semaphore awaits were cancelled per request type. + /// + /// This is in case cancellations are happening more than expected. + pub(super) cancelled_waits: RequestTyped, +} + +impl Default for BucketMetrics { + fn default() -> Self { + let requests = register_int_counter_vec!( + "remote_storage_s3_requests_count", + "Number of s3 requests of particular type", + &["request_type"], + ) + .expect("failed to define a metric"); + let requests = + RequestTyped::build_with(|kind| requests.with_label_values(&[kind.as_str()])); + + let failed = register_int_counter_vec!( + "remote_storage_s3_failures_count", + "Number of failed s3 requests of particular type", + &["request_type"], + ) + .expect("failed to define a metric"); + let failed = RequestTyped::build_with(|kind| failed.with_label_values(&[kind.as_str()])); + + let buckets = [0.01, 0.10, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0]; + + let req_seconds = register_histogram_vec!( + "remote_storage_s3_request_seconds", + "Seconds to complete a request", + &["request_type", "result"], + buckets.to_vec(), + ) + .unwrap(); + let req_seconds = PassFailCancelledRequestTyped::build_with(|kind, outcome| { + req_seconds.with_label_values(&[kind.as_str(), outcome.as_str()]) + }); + + let wait_seconds = register_histogram_vec!( + "remote_storage_s3_wait_seconds", + "Seconds rate limited", + &["request_type"], + buckets.to_vec(), + ) + .unwrap(); + let wait_seconds = + RequestTyped::build_with(|kind| wait_seconds.with_label_values(&[kind.as_str()])); + + let cancelled_waits = register_int_counter_vec!( + "remote_storage_s3_cancelled_waits_total", + "Times a semaphore wait has been cancelled per request type", + &["request_type"], + ) + .unwrap(); + let cancelled_waits = + RequestTyped::build_with(|kind| cancelled_waits.with_label_values(&[kind.as_str()])); + + Self { + requests, + failed, + req_seconds, + wait_seconds, + cancelled_waits, + } + } +} + +pub fn inc_get_object() { + BUCKET_METRICS.requests.get(Get).inc() +} + +pub fn inc_get_object_fail() { + BUCKET_METRICS.failed.get(Get).inc() +} + +pub fn inc_put_object() { + BUCKET_METRICS.requests.get(Put).inc() +} + +pub fn inc_put_object_fail() { + BUCKET_METRICS.failed.get(Put).inc() +} + +pub fn inc_delete_object() { + BUCKET_METRICS.requests.get(Delete).inc() +} + +pub fn inc_delete_objects(count: u64) { + BUCKET_METRICS.requests.get(Delete).inc_by(count) +} + +pub fn inc_delete_object_fail() { + BUCKET_METRICS.failed.get(Delete).inc() +} + +pub fn inc_delete_objects_fail(count: u64) { + BUCKET_METRICS.failed.get(Delete).inc_by(count) +} + +pub fn inc_list_objects() { + BUCKET_METRICS.requests.get(List).inc() +} + +pub fn inc_list_objects_fail() { + BUCKET_METRICS.failed.get(List).inc() +} diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index f5d4ca3f20..2d6ac6e29f 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -476,7 +476,7 @@ class NeonEnvBuilder: # Prepare the default branch to start the postgres on later. # Pageserver itself does not create tenants and timelines, until started first and asked via HTTP API. - log.info( + log.debug( f"Services started, creating initial tenant {env.initial_tenant} and its initial timeline" ) initial_tenant, initial_timeline = env.neon_cli.create_tenant( diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index ca737ac02d..1e8dd36206 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -272,6 +272,23 @@ def test_delete_timeline_exercise_crash_safety_failpoints( wait_timeline_detail_404( ps_http, env.initial_tenant, timeline_id, iterations=iterations ) + + if failpoint == "timeline-delete-after-index-delete": + m = ps_http.get_metrics() + assert ( + m.query_one( + "remote_storage_s3_request_seconds_count", + filter={"request_type": "get_object", "result": "err"}, + ).value + == 1 + ) + assert ( + m.query_one( + "remote_storage_s3_request_seconds_count", + filter={"request_type": "get_object", "result": "ok"}, + ).value + == 1 + ) elif check is Check.RETRY_WITHOUT_RESTART: # this should succeed # this also checks that delete can be retried even when timeline is in Broken state