storage_controller: add metrics (#7178)

## Problem
Storage controller had basically no metrics.

## Summary of changes
1. Migrate the existing metrics to use Conrad's
[`measured`](https://docs.rs/measured/0.0.14/measured/) crate.
2. Add metrics for incoming http requests
3. Add metrics for outgoing http requests to the pageserver
4. Add metrics for outgoing pass through requests to the pageserver
5. Add metrics for database queries

Note that the metrics response for the attachment service does not use
chunked encoding like the rest of the metrics endpoints. Conrad has
kindly extended the crate such that it can now be done. Let's leave it
for a follow-up since the payload shouldn't be that big at this point.

Fixes https://github.com/neondatabase/neon/issues/6875
This commit is contained in:
Vlad Lazar
2024-03-21 12:00:20 +00:00
committed by GitHub
parent 5ec6862bcf
commit c75b584430
17 changed files with 1004 additions and 198 deletions

33
Cargo.lock generated
View File

@@ -277,6 +277,7 @@ dependencies = [
"anyhow",
"aws-config",
"aws-sdk-secretsmanager",
"bytes",
"camino",
"clap",
"control_plane",
@@ -288,6 +289,8 @@ dependencies = [
"hex",
"humantime",
"hyper",
"lasso",
"measured",
"metrics",
"once_cell",
"pageserver_api",
@@ -295,6 +298,7 @@ dependencies = [
"postgres_connection",
"r2d2",
"reqwest",
"routerify",
"serde",
"serde_json",
"thiserror",
@@ -2880,6 +2884,35 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "measured"
version = "0.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f246648d027839a34b420e27c7de1165ace96e19ef894985d0a6ff89a7840a9f"
dependencies = [
"bytes",
"hashbrown 0.14.0",
"itoa",
"lasso",
"measured-derive",
"memchr",
"parking_lot 0.12.1",
"rustc-hash",
"ryu",
]
[[package]]
name = "measured-derive"
version = "0.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edaa5cc22d99d5d6d7d99c3b5b5f7e7f8034c22f1b5d62a1adecd2ed005d9b80"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.52",
]
[[package]]
name = "memchr"
version = "2.6.4"

View File

@@ -101,6 +101,7 @@ lasso = "0.7"
leaky-bucket = "1.0.1"
libc = "0.2"
md5 = "0.7.0"
measured = { version = "0.0.13", features=["default", "lasso"] }
memoffset = "0.8"
native-tls = "0.2"
nix = { version = "0.27", features = ["fs", "process", "socket", "signal", "poll"] }

View File

@@ -17,6 +17,7 @@ testing = []
anyhow.workspace = true
aws-config.workspace = true
aws-sdk-secretsmanager.workspace = true
bytes.workspace = true
camino.workspace = true
clap.workspace = true
fail.workspace = true
@@ -25,17 +26,20 @@ git-version.workspace = true
hex.workspace = true
hyper.workspace = true
humantime.workspace = true
lasso.workspace = true
once_cell.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
postgres_connection.workspace = true
reqwest.workspace = true
routerify.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
measured.workspace = true
diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] }
diesel_migrations = { version = "2.1.0" }

View File

@@ -1,5 +1,11 @@
use crate::metrics::{
HttpRequestLatencyLabelGroup, HttpRequestStatusLabelGroup, PageserverRequestLabelGroup,
METRICS_REGISTRY,
};
use crate::reconciler::ReconcileError;
use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
use futures::Future;
use hyper::header::CONTENT_TYPE;
use hyper::{Body, Request, Response};
use hyper::{StatusCode, Uri};
use pageserver_api::models::{
@@ -34,6 +40,8 @@ use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
use control_plane::storage_controller::{AttachHookRequest, InspectRequest};
use routerify::Middleware;
/// State available to HTTP request handlers
#[derive(Clone)]
pub struct HttpState {
@@ -313,7 +321,7 @@ async fn handle_tenant_timeline_passthrough(
tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
// Find the node that holds shard zero
let (base_url, tenant_shard_id) = service.tenant_shard0_baseurl(tenant_id)?;
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id)?;
// Callers will always pass an unsharded tenant ID. Before proxying, we must
// rewrite this to a shard-aware shard zero ID.
@@ -322,12 +330,39 @@ async fn handle_tenant_timeline_passthrough(
let tenant_shard_str = format!("{}", tenant_shard_id);
let path = path.replace(&tenant_str, &tenant_shard_str);
let client = mgmt_api::Client::new(base_url, service.get_config().jwt_token.as_deref());
let latency = &METRICS_REGISTRY
.metrics_group
.storage_controller_passthrough_request_latency;
// This is a bit awkward. We remove the param from the request
// and join the words by '_' to get a label for the request.
let just_path = path.replace(&tenant_shard_str, "");
let path_label = just_path
.split('/')
.filter(|token| !token.is_empty())
.collect::<Vec<_>>()
.join("_");
let labels = PageserverRequestLabelGroup {
pageserver_id: &node.get_id().to_string(),
path: &path_label,
method: crate::metrics::Method::Get,
};
let _timer = latency.start_timer(labels.clone());
let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
let resp = client.get_raw(path).await.map_err(|_e|
// FIXME: give APiError a proper Unavailable variant. We return 503 here because
// if we can't successfully send a request to the pageserver, we aren't available.
ApiError::ShuttingDown)?;
if !resp.status().is_success() {
let error_counter = &METRICS_REGISTRY
.metrics_group
.storage_controller_passthrough_request_error;
error_counter.inc(labels);
}
// We have a reqest::Response, would like a http::Response
let mut builder = hyper::Response::builder()
.status(resp.status())
@@ -498,7 +533,11 @@ impl From<ReconcileError> for ApiError {
/// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
/// be allowed to run if Service has finished its initial reconciliation.
async fn tenant_service_handler<R, H>(request: Request<Body>, handler: H) -> R::Output
async fn tenant_service_handler<R, H>(
request: Request<Body>,
handler: H,
request_name: RequestName,
) -> R::Output
where
R: std::future::Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
H: FnOnce(Arc<Service>, Request<Body>) -> R + Send + Sync + 'static,
@@ -518,9 +557,10 @@ where
));
}
request_span(
named_request_span(
request,
|request| async move { handler(service, request).await },
request_name,
)
.await
}
@@ -531,11 +571,98 @@ fn check_permissions(request: &Request<Body>, required_scope: Scope) -> Result<(
})
}
#[derive(Clone, Debug)]
struct RequestMeta {
method: hyper::http::Method,
at: Instant,
}
fn prologue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
) -> Middleware<B, ApiError> {
Middleware::pre(move |req| async move {
let meta = RequestMeta {
method: req.method().clone(),
at: Instant::now(),
};
req.set_context(meta);
Ok(req)
})
}
fn epilogue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
) -> Middleware<B, ApiError> {
Middleware::post_with_info(move |resp, req_info| async move {
let request_name = match req_info.context::<RequestName>() {
Some(name) => name,
None => {
return Ok(resp);
}
};
if let Some(meta) = req_info.context::<RequestMeta>() {
let status = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_http_request_status;
let latency = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_http_request_latency;
status.inc(HttpRequestStatusLabelGroup {
path: request_name.0,
method: meta.method.clone().into(),
status: crate::metrics::StatusCode(resp.status()),
});
latency.observe(
HttpRequestLatencyLabelGroup {
path: request_name.0,
method: meta.method.into(),
},
meta.at.elapsed().as_secs_f64(),
);
}
Ok(resp)
})
}
pub async fn measured_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
pub const TEXT_FORMAT: &str = "text/plain; version=0.0.4";
let payload = crate::metrics::METRICS_REGISTRY.encode();
let response = Response::builder()
.status(200)
.header(CONTENT_TYPE, TEXT_FORMAT)
.body(payload.into())
.unwrap();
Ok(response)
}
#[derive(Clone)]
struct RequestName(&'static str);
async fn named_request_span<R, H>(
request: Request<Body>,
handler: H,
name: RequestName,
) -> R::Output
where
R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
{
request.set_context(name);
request_span(request, handler).await
}
pub fn make_router(
service: Arc<Service>,
auth: Option<Arc<SwappableJwtAuth>>,
) -> RouterBuilder<hyper::Body, ApiError> {
let mut router = endpoint::make_router();
let mut router = endpoint::make_router()
.middleware(prologue_metrics_middleware())
.middleware(epilogue_metrics_middleware());
if auth.is_some() {
router = router.middleware(auth_middleware(|request| {
let state = get_state(request);
@@ -544,99 +671,166 @@ pub fn make_router(
} else {
state.auth.as_deref()
}
}))
}));
}
router
.data(Arc::new(HttpState::new(service, auth)))
.get("/metrics", |r| {
named_request_span(r, measured_metrics_handler, RequestName("metrics"))
})
// Non-prefixed generic endpoints (status, metrics)
.get("/status", |r| request_span(r, handle_status))
.get("/ready", |r| request_span(r, handle_ready))
.get("/status", |r| {
named_request_span(r, handle_status, RequestName("status"))
})
.get("/ready", |r| {
named_request_span(r, handle_ready, RequestName("ready"))
})
// Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
.post("/upcall/v1/re-attach", |r| {
request_span(r, handle_re_attach)
named_request_span(r, handle_re_attach, RequestName("upcall_v1_reattach"))
})
.post("/upcall/v1/validate", |r| {
named_request_span(r, handle_validate, RequestName("upcall_v1_validate"))
})
.post("/upcall/v1/validate", |r| request_span(r, handle_validate))
// Test/dev/debug endpoints
.post("/debug/v1/attach-hook", |r| {
request_span(r, handle_attach_hook)
named_request_span(r, handle_attach_hook, RequestName("debug_v1_attach_hook"))
})
.post("/debug/v1/inspect", |r| {
named_request_span(r, handle_inspect, RequestName("debug_v1_inspect"))
})
.post("/debug/v1/inspect", |r| request_span(r, handle_inspect))
.post("/debug/v1/tenant/:tenant_id/drop", |r| {
request_span(r, handle_tenant_drop)
named_request_span(r, handle_tenant_drop, RequestName("debug_v1_tenant_drop"))
})
.post("/debug/v1/node/:node_id/drop", |r| {
request_span(r, handle_node_drop)
named_request_span(r, handle_node_drop, RequestName("debug_v1_node_drop"))
})
.get("/debug/v1/tenant", |r| {
named_request_span(r, handle_tenants_dump, RequestName("debug_v1_tenant"))
})
.get("/debug/v1/tenant", |r| request_span(r, handle_tenants_dump))
.get("/debug/v1/tenant/:tenant_id/locate", |r| {
tenant_service_handler(r, handle_tenant_locate)
tenant_service_handler(
r,
handle_tenant_locate,
RequestName("debug_v1_tenant_locate"),
)
})
.get("/debug/v1/scheduler", |r| {
request_span(r, handle_scheduler_dump)
named_request_span(r, handle_scheduler_dump, RequestName("debug_v1_scheduler"))
})
.post("/debug/v1/consistency_check", |r| {
request_span(r, handle_consistency_check)
named_request_span(
r,
handle_consistency_check,
RequestName("debug_v1_consistency_check"),
)
})
.put("/debug/v1/failpoints", |r| {
request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
})
// Node operations
.post("/control/v1/node", |r| {
request_span(r, handle_node_register)
named_request_span(r, handle_node_register, RequestName("control_v1_node"))
})
.get("/control/v1/node", |r| {
named_request_span(r, handle_node_list, RequestName("control_v1_node"))
})
.get("/control/v1/node", |r| request_span(r, handle_node_list))
.put("/control/v1/node/:node_id/config", |r| {
request_span(r, handle_node_configure)
named_request_span(
r,
handle_node_configure,
RequestName("control_v1_node_config"),
)
})
// Tenant Shard operations
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
tenant_service_handler(r, handle_tenant_shard_migrate)
tenant_service_handler(
r,
handle_tenant_shard_migrate,
RequestName("control_v1_tenant_migrate"),
)
})
.put("/control/v1/tenant/:tenant_id/shard_split", |r| {
tenant_service_handler(r, handle_tenant_shard_split)
tenant_service_handler(
r,
handle_tenant_shard_split,
RequestName("control_v1_tenant_shard_split"),
)
})
.get("/control/v1/tenant/:tenant_id", |r| {
tenant_service_handler(r, handle_tenant_describe)
tenant_service_handler(
r,
handle_tenant_describe,
RequestName("control_v1_tenant_describe"),
)
})
// Tenant operations
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
.post("/v1/tenant", |r| {
tenant_service_handler(r, handle_tenant_create)
tenant_service_handler(r, handle_tenant_create, RequestName("v1_tenant"))
})
.delete("/v1/tenant/:tenant_id", |r| {
tenant_service_handler(r, handle_tenant_delete)
tenant_service_handler(r, handle_tenant_delete, RequestName("v1_tenant"))
})
.put("/v1/tenant/config", |r| {
tenant_service_handler(r, handle_tenant_config_set)
tenant_service_handler(r, handle_tenant_config_set, RequestName("v1_tenant_config"))
})
.get("/v1/tenant/:tenant_id/config", |r| {
tenant_service_handler(r, handle_tenant_config_get)
tenant_service_handler(r, handle_tenant_config_get, RequestName("v1_tenant_config"))
})
.put("/v1/tenant/:tenant_shard_id/location_config", |r| {
tenant_service_handler(r, handle_tenant_location_config)
tenant_service_handler(
r,
handle_tenant_location_config,
RequestName("v1_tenant_location_config"),
)
})
.put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
tenant_service_handler(r, handle_tenant_time_travel_remote_storage)
tenant_service_handler(
r,
handle_tenant_time_travel_remote_storage,
RequestName("v1_tenant_time_travel_remote_storage"),
)
})
.post("/v1/tenant/:tenant_id/secondary/download", |r| {
tenant_service_handler(r, handle_tenant_secondary_download)
tenant_service_handler(
r,
handle_tenant_secondary_download,
RequestName("v1_tenant_secondary_download"),
)
})
// Timeline operations
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
tenant_service_handler(r, handle_tenant_timeline_delete)
tenant_service_handler(
r,
handle_tenant_timeline_delete,
RequestName("v1_tenant_timeline"),
)
})
.post("/v1/tenant/:tenant_id/timeline", |r| {
tenant_service_handler(r, handle_tenant_timeline_create)
tenant_service_handler(
r,
handle_tenant_timeline_create,
RequestName("v1_tenant_timeline"),
)
})
// Tenant detail GET passthrough to shard zero
.get("/v1/tenant/:tenant_id", |r| {
tenant_service_handler(r, handle_tenant_timeline_passthrough)
tenant_service_handler(
r,
handle_tenant_timeline_passthrough,
RequestName("v1_tenant_passthrough"),
)
})
// Timeline GET passthrough to shard zero. Note that the `*` in the URL is a wildcard: any future
// timeline GET APIs will be implicitly included.
.get("/v1/tenant/:tenant_id/timeline*", |r| {
tenant_service_handler(r, handle_tenant_timeline_passthrough)
tenant_service_handler(
r,
handle_tenant_timeline_passthrough,
RequestName("v1_tenant_timeline_passthrough"),
)
})
}

View File

@@ -8,6 +8,7 @@ pub mod http;
mod id_lock_map;
pub mod metrics;
mod node;
mod pageserver_client;
pub mod persistence;
mod reconciler;
mod scheduler;

View File

@@ -1,32 +1,284 @@
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
//!
//! This module provides metric definitions for the storage controller.
//!
//! All metrics are grouped in [`StorageControllerMetricGroup`]. [`StorageControllerMetrics`] holds
//! the mentioned metrics and their encoder. It's globally available via the [`METRICS_REGISTRY`]
//! constant.
//!
//! The rest of the code defines label group types and deals with converting outer types to labels.
//!
use bytes::Bytes;
use measured::{
label::{LabelValue, StaticLabelSet},
FixedCardinalityLabel, MetricGroup,
};
use once_cell::sync::Lazy;
use std::sync::Mutex;
pub(crate) struct ReconcilerMetrics {
pub(crate) spawned: IntCounter,
pub(crate) complete: IntCounterVec,
}
use crate::persistence::{DatabaseError, DatabaseOperation};
impl ReconcilerMetrics {
// Labels used on [`Self::complete`]
pub(crate) const SUCCESS: &'static str = "ok";
pub(crate) const ERROR: &'static str = "success";
pub(crate) const CANCEL: &'static str = "cancel";
}
pub(crate) static RECONCILER: Lazy<ReconcilerMetrics> = Lazy::new(|| ReconcilerMetrics {
spawned: register_int_counter!(
"storage_controller_reconcile_spawn",
"Count of how many times we spawn a reconcile task",
)
.expect("failed to define a metric"),
complete: register_int_counter_vec!(
"storage_controller_reconcile_complete",
"Reconciler tasks completed, broken down by success/failure/cancelled",
&["status"],
)
.expect("failed to define a metric"),
});
pub(crate) static METRICS_REGISTRY: Lazy<StorageControllerMetrics> =
Lazy::new(StorageControllerMetrics::default);
pub fn preinitialize_metrics() {
Lazy::force(&RECONCILER);
Lazy::force(&METRICS_REGISTRY);
}
pub(crate) struct StorageControllerMetrics {
pub(crate) metrics_group: StorageControllerMetricGroup,
encoder: Mutex<measured::text::TextEncoder>,
}
#[derive(measured::MetricGroup)]
pub(crate) struct StorageControllerMetricGroup {
/// Count of how many times we spawn a reconcile task
pub(crate) storage_controller_reconcile_spawn: measured::Counter,
/// Reconciler tasks completed, broken down by success/failure/cancelled
pub(crate) storage_controller_reconcile_complete:
measured::CounterVec<ReconcileCompleteLabelGroupSet>,
/// HTTP request status counters for handled requests
pub(crate) storage_controller_http_request_status:
measured::CounterVec<HttpRequestStatusLabelGroupSet>,
/// HTTP request handler latency across all status codes
pub(crate) storage_controller_http_request_latency:
measured::HistogramVec<HttpRequestLatencyLabelGroupSet, 5>,
/// Count of HTTP requests to the pageserver that resulted in an error,
/// broken down by the pageserver node id, request name and method
pub(crate) storage_controller_pageserver_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Latency of HTTP requests to the pageserver, broken down by pageserver
/// node id, request name and method. This include both successful and unsuccessful
/// requests.
pub(crate) storage_controller_pageserver_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
/// broken down by the pageserver node id, request name and method
pub(crate) storage_controller_passthrough_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Latency of pass-through HTTP requests to the pageserver, broken down by pageserver
/// node id, request name and method. This include both successful and unsuccessful
/// requests.
pub(crate) storage_controller_passthrough_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Count of errors in database queries, broken down by error type and operation.
pub(crate) storage_controller_database_query_error:
measured::CounterVec<DatabaseQueryErrorLabelGroupSet>,
/// Latency of database queries, broken down by operation.
pub(crate) storage_controller_database_query_latency:
measured::HistogramVec<DatabaseQueryLatencyLabelGroupSet, 5>,
}
impl StorageControllerMetrics {
pub(crate) fn encode(&self) -> Bytes {
let mut encoder = self.encoder.lock().unwrap();
self.metrics_group.collect_into(&mut *encoder);
encoder.finish()
}
}
impl Default for StorageControllerMetrics {
fn default() -> Self {
Self {
metrics_group: StorageControllerMetricGroup::new(),
encoder: Mutex::new(measured::text::TextEncoder::new()),
}
}
}
impl StorageControllerMetricGroup {
pub(crate) fn new() -> Self {
Self {
storage_controller_reconcile_spawn: measured::Counter::new(),
storage_controller_reconcile_complete: measured::CounterVec::new(
ReconcileCompleteLabelGroupSet {
status: StaticLabelSet::new(),
},
),
storage_controller_http_request_status: measured::CounterVec::new(
HttpRequestStatusLabelGroupSet {
path: lasso::ThreadedRodeo::new(),
method: StaticLabelSet::new(),
status: StaticLabelSet::new(),
},
),
storage_controller_http_request_latency: measured::HistogramVec::new(
measured::metric::histogram::Thresholds::exponential_buckets(0.1, 2.0),
),
storage_controller_pageserver_request_error: measured::CounterVec::new(
PageserverRequestLabelGroupSet {
pageserver_id: lasso::ThreadedRodeo::new(),
path: lasso::ThreadedRodeo::new(),
method: StaticLabelSet::new(),
},
),
storage_controller_pageserver_request_latency: measured::HistogramVec::new(
measured::metric::histogram::Thresholds::exponential_buckets(0.1, 2.0),
),
storage_controller_passthrough_request_error: measured::CounterVec::new(
PageserverRequestLabelGroupSet {
pageserver_id: lasso::ThreadedRodeo::new(),
path: lasso::ThreadedRodeo::new(),
method: StaticLabelSet::new(),
},
),
storage_controller_passthrough_request_latency: measured::HistogramVec::new(
measured::metric::histogram::Thresholds::exponential_buckets(0.1, 2.0),
),
storage_controller_database_query_error: measured::CounterVec::new(
DatabaseQueryErrorLabelGroupSet {
operation: StaticLabelSet::new(),
error_type: StaticLabelSet::new(),
},
),
storage_controller_database_query_latency: measured::HistogramVec::new(
measured::metric::histogram::Thresholds::exponential_buckets(0.1, 2.0),
),
}
}
}
#[derive(measured::LabelGroup)]
#[label(set = ReconcileCompleteLabelGroupSet)]
pub(crate) struct ReconcileCompleteLabelGroup {
pub(crate) status: ReconcileOutcome,
}
#[derive(measured::LabelGroup)]
#[label(set = HttpRequestStatusLabelGroupSet)]
pub(crate) struct HttpRequestStatusLabelGroup<'a> {
#[label(dynamic_with = lasso::ThreadedRodeo)]
pub(crate) path: &'a str,
pub(crate) method: Method,
pub(crate) status: StatusCode,
}
#[derive(measured::LabelGroup)]
#[label(set = HttpRequestLatencyLabelGroupSet)]
pub(crate) struct HttpRequestLatencyLabelGroup<'a> {
#[label(dynamic_with = lasso::ThreadedRodeo)]
pub(crate) path: &'a str,
pub(crate) method: Method,
}
impl Default for HttpRequestLatencyLabelGroupSet {
fn default() -> Self {
Self {
path: lasso::ThreadedRodeo::new(),
method: StaticLabelSet::new(),
}
}
}
#[derive(measured::LabelGroup, Clone)]
#[label(set = PageserverRequestLabelGroupSet)]
pub(crate) struct PageserverRequestLabelGroup<'a> {
#[label(dynamic_with = lasso::ThreadedRodeo)]
pub(crate) pageserver_id: &'a str,
#[label(dynamic_with = lasso::ThreadedRodeo)]
pub(crate) path: &'a str,
pub(crate) method: Method,
}
impl Default for PageserverRequestLabelGroupSet {
fn default() -> Self {
Self {
pageserver_id: lasso::ThreadedRodeo::new(),
path: lasso::ThreadedRodeo::new(),
method: StaticLabelSet::new(),
}
}
}
#[derive(measured::LabelGroup)]
#[label(set = DatabaseQueryErrorLabelGroupSet)]
pub(crate) struct DatabaseQueryErrorLabelGroup {
pub(crate) error_type: DatabaseErrorLabel,
pub(crate) operation: DatabaseOperation,
}
#[derive(measured::LabelGroup)]
#[label(set = DatabaseQueryLatencyLabelGroupSet)]
pub(crate) struct DatabaseQueryLatencyLabelGroup {
pub(crate) operation: DatabaseOperation,
}
#[derive(FixedCardinalityLabel)]
pub(crate) enum ReconcileOutcome {
#[label(rename = "ok")]
Success,
Error,
Cancel,
}
#[derive(FixedCardinalityLabel, Clone)]
pub(crate) enum Method {
Get,
Put,
Post,
Delete,
Other,
}
impl From<hyper::Method> for Method {
fn from(value: hyper::Method) -> Self {
if value == hyper::Method::GET {
Method::Get
} else if value == hyper::Method::PUT {
Method::Put
} else if value == hyper::Method::POST {
Method::Post
} else if value == hyper::Method::DELETE {
Method::Delete
} else {
Method::Other
}
}
}
pub(crate) struct StatusCode(pub(crate) hyper::http::StatusCode);
impl LabelValue for StatusCode {
fn visit<V: measured::label::LabelVisitor>(&self, v: V) -> V::Output {
v.write_int(self.0.as_u16() as u64)
}
}
impl FixedCardinalityLabel for StatusCode {
fn cardinality() -> usize {
(100..1000).len()
}
fn encode(&self) -> usize {
self.0.as_u16() as usize
}
fn decode(value: usize) -> Self {
Self(hyper::http::StatusCode::from_u16(u16::try_from(value).unwrap()).unwrap())
}
}
#[derive(FixedCardinalityLabel)]
pub(crate) enum DatabaseErrorLabel {
Query,
Connection,
ConnectionPool,
Logical,
}
impl DatabaseError {
pub(crate) fn error_label(&self) -> DatabaseErrorLabel {
match self {
Self::Query(_) => DatabaseErrorLabel::Query,
Self::Connection(_) => DatabaseErrorLabel::Connection,
Self::ConnectionPool(_) => DatabaseErrorLabel::ConnectionPool,
Self::Logical(_) => DatabaseErrorLabel::Logical,
}
}
}

View File

@@ -12,7 +12,9 @@ use serde::Serialize;
use tokio_util::sync::CancellationToken;
use utils::{backoff, id::NodeId};
use crate::{persistence::NodePersistence, scheduler::MaySchedule};
use crate::{
pageserver_client::PageserverClient, persistence::NodePersistence, scheduler::MaySchedule,
};
/// Represents the in-memory description of a Node.
///
@@ -202,7 +204,7 @@ impl Node {
cancel: &CancellationToken,
) -> Option<mgmt_api::Result<T>>
where
O: FnMut(mgmt_api::Client) -> F,
O: FnMut(PageserverClient) -> F,
F: std::future::Future<Output = mgmt_api::Result<T>>,
{
fn is_fatal(e: &mgmt_api::Error) -> bool {
@@ -224,8 +226,12 @@ impl Node {
.build()
.expect("Failed to construct HTTP client");
let client =
mgmt_api::Client::from_client(http_client, self.base_url(), jwt.as_deref());
let client = PageserverClient::from_client(
self.get_id(),
http_client,
self.base_url(),
jwt.as_deref(),
);
let node_cancel_fut = self.cancel.cancelled();

View File

@@ -0,0 +1,203 @@
use pageserver_api::{
models::{
LocationConfig, LocationConfigListResponse, PageserverUtilization, SecondaryProgress,
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
},
shard::TenantShardId,
};
use pageserver_client::mgmt_api::{Client, Result};
use reqwest::StatusCode;
use utils::id::{NodeId, TimelineId};
/// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage
/// controller to collect metrics in a non-intrusive manner.
#[derive(Debug, Clone)]
pub(crate) struct PageserverClient {
inner: Client,
node_id_label: String,
}
macro_rules! measured_request {
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
let labels = crate::metrics::PageserverRequestLabelGroup {
pageserver_id: $node_id,
path: $name,
method: $method,
};
let latency = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_pageserver_request_latency;
let _timer_guard = latency.start_timer(labels.clone());
let res = $invoke;
if res.is_err() {
let error_counters = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_pageserver_request_error;
error_counters.inc(labels)
}
res
}};
}
impl PageserverClient {
pub(crate) fn new(node_id: NodeId, mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
Self {
inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
pub(crate) fn from_client(
node_id: NodeId,
raw_client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<&str>,
) -> Self {
Self {
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
pub(crate) async fn tenant_delete(&self, tenant_shard_id: TenantShardId) -> Result<StatusCode> {
measured_request!(
"tenant",
crate::metrics::Method::Delete,
&self.node_id_label,
self.inner.tenant_delete(tenant_shard_id).await
)
}
pub(crate) async fn tenant_time_travel_remote_storage(
&self,
tenant_shard_id: TenantShardId,
timestamp: &str,
done_if_after: &str,
) -> Result<()> {
measured_request!(
"tenant_time_travel_remote_storage",
crate::metrics::Method::Put,
&self.node_id_label,
self.inner
.tenant_time_travel_remote_storage(tenant_shard_id, timestamp, done_if_after)
.await
)
}
pub(crate) async fn tenant_secondary_download(
&self,
tenant_id: TenantShardId,
wait: Option<std::time::Duration>,
) -> Result<(StatusCode, SecondaryProgress)> {
measured_request!(
"tenant_secondary_download",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.tenant_secondary_download(tenant_id, wait).await
)
}
pub(crate) async fn location_config(
&self,
tenant_shard_id: TenantShardId,
config: LocationConfig,
flush_ms: Option<std::time::Duration>,
lazy: bool,
) -> Result<()> {
measured_request!(
"location_config",
crate::metrics::Method::Put,
&self.node_id_label,
self.inner
.location_config(tenant_shard_id, config, flush_ms, lazy)
.await
)
}
pub(crate) async fn list_location_config(&self) -> Result<LocationConfigListResponse> {
measured_request!(
"location_configs",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner.list_location_config().await
)
}
pub(crate) async fn get_location_config(
&self,
tenant_shard_id: TenantShardId,
) -> Result<Option<LocationConfig>> {
measured_request!(
"location_config",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner.get_location_config(tenant_shard_id).await
)
}
pub(crate) async fn timeline_create(
&self,
tenant_shard_id: TenantShardId,
req: &TimelineCreateRequest,
) -> Result<TimelineInfo> {
measured_request!(
"timeline",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.timeline_create(tenant_shard_id, req).await
)
}
pub(crate) async fn timeline_delete(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<StatusCode> {
measured_request!(
"timeline",
crate::metrics::Method::Delete,
&self.node_id_label,
self.inner
.timeline_delete(tenant_shard_id, timeline_id)
.await
)
}
pub(crate) async fn tenant_shard_split(
&self,
tenant_shard_id: TenantShardId,
req: TenantShardSplitRequest,
) -> Result<TenantShardSplitResponse> {
measured_request!(
"tenant_shard_split",
crate::metrics::Method::Put,
&self.node_id_label,
self.inner.tenant_shard_split(tenant_shard_id, req).await
)
}
pub(crate) async fn timeline_list(
&self,
tenant_shard_id: &TenantShardId,
) -> Result<Vec<TimelineInfo>> {
measured_request!(
"timelines",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner.timeline_list(tenant_shard_id).await
)
}
pub(crate) async fn get_utilization(&self) -> Result<PageserverUtilization> {
measured_request!(
"utilization",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner.get_utilization().await
)
}
}

View File

@@ -19,6 +19,9 @@ use serde::{Deserialize, Serialize};
use utils::generation::Generation;
use utils::id::{NodeId, TenantId};
use crate::metrics::{
DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
};
use crate::node::Node;
/// ## What do we store?
@@ -75,6 +78,25 @@ pub(crate) enum DatabaseError {
Logical(String),
}
#[derive(measured::FixedCardinalityLabel, Clone)]
pub(crate) enum DatabaseOperation {
InsertNode,
UpdateNode,
DeleteNode,
ListNodes,
BeginShardSplit,
CompleteShardSplit,
AbortShardSplit,
Detach,
ReAttach,
IncrementGeneration,
ListTenantShards,
InsertTenantShards,
UpdateTenantShard,
DeleteTenant,
UpdateTenantConfig,
}
#[must_use]
pub(crate) enum AbortShardSplitStatus {
/// We aborted the split in the database by reverting to the parent shards
@@ -115,6 +137,34 @@ impl Persistence {
}
}
/// Wraps `with_conn` in order to collect latency and error metrics
async fn with_measured_conn<F, R>(&self, op: DatabaseOperation, func: F) -> DatabaseResult<R>
where
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
R: Send + 'static,
{
let latency = &METRICS_REGISTRY
.metrics_group
.storage_controller_database_query_latency;
let _timer = latency.start_timer(DatabaseQueryLatencyLabelGroup {
operation: op.clone(),
});
let res = self.with_conn(func).await;
if let Err(err) = &res {
let error_counter = &METRICS_REGISTRY
.metrics_group
.storage_controller_database_query_error;
error_counter.inc(DatabaseQueryErrorLabelGroup {
error_type: err.error_label(),
operation: op,
})
}
res
}
/// Call the provided function in a tokio blocking thread, with a Diesel database connection.
async fn with_conn<F, R>(&self, func: F) -> DatabaseResult<R>
where
@@ -130,21 +180,27 @@ impl Persistence {
/// When a node is first registered, persist it before using it for anything
pub(crate) async fn insert_node(&self, node: &Node) -> DatabaseResult<()> {
let np = node.to_persistent();
self.with_conn(move |conn| -> DatabaseResult<()> {
diesel::insert_into(crate::schema::nodes::table)
.values(&np)
.execute(conn)?;
Ok(())
})
self.with_measured_conn(
DatabaseOperation::InsertNode,
move |conn| -> DatabaseResult<()> {
diesel::insert_into(crate::schema::nodes::table)
.values(&np)
.execute(conn)?;
Ok(())
},
)
.await
}
/// At startup, populate the list of nodes which our shards may be placed on
pub(crate) async fn list_nodes(&self) -> DatabaseResult<Vec<NodePersistence>> {
let nodes: Vec<NodePersistence> = self
.with_conn(move |conn| -> DatabaseResult<_> {
Ok(crate::schema::nodes::table.load::<NodePersistence>(conn)?)
})
.with_measured_conn(
DatabaseOperation::ListNodes,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::nodes::table.load::<NodePersistence>(conn)?)
},
)
.await?;
tracing::info!("list_nodes: loaded {} nodes", nodes.len());
@@ -159,7 +215,7 @@ impl Persistence {
) -> DatabaseResult<()> {
use crate::schema::nodes::dsl::*;
let updated = self
.with_conn(move |conn| {
.with_measured_conn(DatabaseOperation::UpdateNode, move |conn| {
let updated = diesel::update(nodes)
.filter(node_id.eq(input_node_id.0 as i64))
.set((scheduling_policy.eq(String::from(input_scheduling)),))
@@ -181,9 +237,12 @@ impl Persistence {
/// be enriched at runtime with state discovered on pageservers.
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
let loaded = self
.with_conn(move |conn| -> DatabaseResult<_> {
Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
})
.with_measured_conn(
DatabaseOperation::ListTenantShards,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::tenant_shards::table.load::<TenantShardPersistence>(conn)?)
},
)
.await?;
if loaded.is_empty() {
@@ -260,17 +319,20 @@ impl Persistence {
shards: Vec<TenantShardPersistence>,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<()> {
conn.transaction(|conn| -> QueryResult<()> {
for tenant in &shards {
diesel::insert_into(tenant_shards)
.values(tenant)
.execute(conn)?;
}
self.with_measured_conn(
DatabaseOperation::InsertTenantShards,
move |conn| -> DatabaseResult<()> {
conn.transaction(|conn| -> QueryResult<()> {
for tenant in &shards {
diesel::insert_into(tenant_shards)
.values(tenant)
.execute(conn)?;
}
Ok(())
})?;
Ok(())
})?;
Ok(())
})
},
)
.await
}
@@ -278,25 +340,31 @@ impl Persistence {
/// the tenant from memory on this server.
pub(crate) async fn delete_tenant(&self, del_tenant_id: TenantId) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<()> {
diesel::delete(tenant_shards)
.filter(tenant_id.eq(del_tenant_id.to_string()))
.execute(conn)?;
self.with_measured_conn(
DatabaseOperation::DeleteTenant,
move |conn| -> DatabaseResult<()> {
diesel::delete(tenant_shards)
.filter(tenant_id.eq(del_tenant_id.to_string()))
.execute(conn)?;
Ok(())
})
Ok(())
},
)
.await
}
pub(crate) async fn delete_node(&self, del_node_id: NodeId) -> DatabaseResult<()> {
use crate::schema::nodes::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<()> {
diesel::delete(nodes)
.filter(node_id.eq(del_node_id.0 as i64))
.execute(conn)?;
self.with_measured_conn(
DatabaseOperation::DeleteNode,
move |conn| -> DatabaseResult<()> {
diesel::delete(nodes)
.filter(node_id.eq(del_node_id.0 as i64))
.execute(conn)?;
Ok(())
})
Ok(())
},
)
.await
}
@@ -310,7 +378,7 @@ impl Persistence {
) -> DatabaseResult<HashMap<TenantShardId, Generation>> {
use crate::schema::tenant_shards::dsl::*;
let updated = self
.with_conn(move |conn| {
.with_measured_conn(DatabaseOperation::ReAttach, move |conn| {
let rows_updated = diesel::update(tenant_shards)
.filter(generation_pageserver.eq(node_id.0 as i64))
.set(generation.eq(generation + 1))
@@ -360,7 +428,7 @@ impl Persistence {
) -> anyhow::Result<Generation> {
use crate::schema::tenant_shards::dsl::*;
let updated = self
.with_conn(move |conn| {
.with_measured_conn(DatabaseOperation::IncrementGeneration, move |conn| {
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
@@ -404,7 +472,7 @@ impl Persistence {
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| {
self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| {
let query = diesel::update(tenant_shards)
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
@@ -445,7 +513,7 @@ impl Persistence {
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| {
self.with_measured_conn(DatabaseOperation::UpdateTenantConfig, move |conn| {
diesel::update(tenant_shards)
.filter(tenant_id.eq(input_tenant_id.to_string()))
.set((config.eq(serde_json::to_string(&input_config).unwrap()),))
@@ -460,7 +528,7 @@ impl Persistence {
pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| {
self.with_measured_conn(DatabaseOperation::Detach, move |conn| {
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
@@ -490,7 +558,7 @@ impl Persistence {
parent_to_children: Vec<(TenantShardId, Vec<TenantShardPersistence>)>,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<()> {
self.with_measured_conn(DatabaseOperation::BeginShardSplit, move |conn| -> DatabaseResult<()> {
conn.transaction(|conn| -> DatabaseResult<()> {
// Mark parent shards as splitting
@@ -554,26 +622,29 @@ impl Persistence {
old_shard_count: ShardCount,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<()> {
conn.transaction(|conn| -> QueryResult<()> {
// Drop parent shards
diesel::delete(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(old_shard_count.literal() as i32))
.execute(conn)?;
self.with_measured_conn(
DatabaseOperation::CompleteShardSplit,
move |conn| -> DatabaseResult<()> {
conn.transaction(|conn| -> QueryResult<()> {
// Drop parent shards
diesel::delete(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(old_shard_count.literal() as i32))
.execute(conn)?;
// Clear sharding flag
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.set((splitting.eq(0),))
.execute(conn)?;
debug_assert!(updated > 0);
// Clear sharding flag
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.set((splitting.eq(0),))
.execute(conn)?;
debug_assert!(updated > 0);
Ok(())
})?;
Ok(())
})?;
Ok(())
})
},
)
.await
}
@@ -585,40 +656,44 @@ impl Persistence {
new_shard_count: ShardCount,
) -> DatabaseResult<AbortShardSplitStatus> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| -> DatabaseResult<AbortShardSplitStatus> {
let aborted = conn.transaction(|conn| -> DatabaseResult<AbortShardSplitStatus> {
// Clear the splitting state on parent shards
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.ne(new_shard_count.literal() as i32))
.set((splitting.eq(0),))
.execute(conn)?;
self.with_measured_conn(
DatabaseOperation::AbortShardSplit,
move |conn| -> DatabaseResult<AbortShardSplitStatus> {
let aborted =
conn.transaction(|conn| -> DatabaseResult<AbortShardSplitStatus> {
// Clear the splitting state on parent shards
let updated = diesel::update(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.ne(new_shard_count.literal() as i32))
.set((splitting.eq(0),))
.execute(conn)?;
// Parent shards are already gone: we cannot abort.
if updated == 0 {
return Ok(AbortShardSplitStatus::Complete);
}
// Parent shards are already gone: we cannot abort.
if updated == 0 {
return Ok(AbortShardSplitStatus::Complete);
}
// Sanity check: if parent shards were present, their cardinality should
// be less than the number of child shards.
if updated >= new_shard_count.count() as usize {
return Err(DatabaseError::Logical(format!(
"Unexpected parent shard count {updated} while aborting split to \
// Sanity check: if parent shards were present, their cardinality should
// be less than the number of child shards.
if updated >= new_shard_count.count() as usize {
return Err(DatabaseError::Logical(format!(
"Unexpected parent shard count {updated} while aborting split to \
count {new_shard_count:?} on tenant {split_tenant_id}"
)));
}
)));
}
// Erase child shards
diesel::delete(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(new_shard_count.literal() as i32))
.execute(conn)?;
// Erase child shards
diesel::delete(tenant_shards)
.filter(tenant_id.eq(split_tenant_id.to_string()))
.filter(shard_count.eq(new_shard_count.literal() as i32))
.execute(conn)?;
Ok(AbortShardSplitStatus::Aborted)
})?;
Ok(AbortShardSplitStatus::Aborted)
})?;
Ok(aborted)
})
Ok(aborted)
},
)
.await
}
}

View File

@@ -1,3 +1,4 @@
use crate::pageserver_client::PageserverClient;
use crate::persistence::Persistence;
use crate::service;
use hyper::StatusCode;
@@ -243,8 +244,11 @@ impl Reconciler {
tenant_shard_id: TenantShardId,
node: &Node,
) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
let client =
mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.service_config.jwt_token.as_deref(),
);
let timelines = client.timeline_list(&tenant_shard_id).await?;
Ok(timelines

View File

@@ -27,6 +27,7 @@ use pageserver_api::{
models::{SecondaryProgress, TenantConfigRequest},
};
use crate::pageserver_client::PageserverClient;
use pageserver_api::{
models::{
self, LocationConfig, LocationConfigListResponse, LocationConfigMode,
@@ -551,7 +552,11 @@ impl Service {
break;
}
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.config.jwt_token.as_deref(),
);
match client
.location_config(
tenant_shard_id,
@@ -2096,8 +2101,11 @@ impl Service {
})
.collect::<Vec<_>>();
for tenant_shard_id in shard_ids {
let client =
mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.config.jwt_token.as_deref(),
);
tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
@@ -2149,7 +2157,11 @@ impl Service {
// Issue concurrent requests to all shards' locations
let mut futs = FuturesUnordered::new();
for (tenant_shard_id, node) in targets {
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.config.jwt_token.as_deref(),
);
futs.push(async move {
let result = client
.tenant_secondary_download(tenant_shard_id, wait)
@@ -2242,7 +2254,11 @@ impl Service {
// Phase 1: delete on the pageservers
let mut any_pending = false;
for (tenant_shard_id, node) in targets {
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.config.jwt_token.as_deref(),
);
// TODO: this, like many other places, requires proper retry handling for 503, timeout: those should not
// surface immediately as an error to our caller.
let status = client.tenant_delete(tenant_shard_id).await.map_err(|e| {
@@ -2354,7 +2370,7 @@ impl Service {
tenant_shard_id,
create_req.new_timeline_id,
);
let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
client
.timeline_create(tenant_shard_id, &create_req)
@@ -2478,7 +2494,7 @@ impl Service {
"Deleting timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
);
let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
let client = PageserverClient::new(node.get_id(), node.base_url(), jwt.as_deref());
client
.timeline_delete(tenant_shard_id, timeline_id)
.await
@@ -2519,11 +2535,11 @@ impl Service {
}
/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
/// function looks it up and returns the url. If the tenant isn't found, returns Err(ApiError::NotFound)
pub(crate) fn tenant_shard0_baseurl(
/// function looks up and returns node. If the tenant isn't found, returns Err(ApiError::NotFound)
pub(crate) fn tenant_shard0_node(
&self,
tenant_id: TenantId,
) -> Result<(String, TenantShardId), ApiError> {
) -> Result<(Node, TenantShardId), ApiError> {
let locked = self.inner.read().unwrap();
let Some((tenant_shard_id, shard)) = locked
.tenants
@@ -2555,7 +2571,7 @@ impl Service {
)));
};
Ok((node.base_url(), *tenant_shard_id))
Ok((node.clone(), *tenant_shard_id))
}
pub(crate) fn tenant_locate(
@@ -3215,7 +3231,11 @@ impl Service {
node,
child_ids,
} = target;
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
let client = PageserverClient::new(
node.get_id(),
node.base_url(),
self.config.jwt_token.as_deref(),
);
let response = client
.tenant_shard_split(
*parent_id,

View File

@@ -4,7 +4,10 @@ use std::{
time::Duration,
};
use crate::{metrics, persistence::TenantShardPersistence};
use crate::{
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
persistence::TenantShardPersistence,
};
use pageserver_api::controller_api::PlacementPolicy;
use pageserver_api::{
models::{LocationConfig, LocationConfigMode, TenantConfig},
@@ -718,7 +721,10 @@ impl TenantState {
let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
tenant_id=%reconciler.tenant_shard_id.tenant_id,
shard_id=%reconciler.tenant_shard_id.shard_slug());
metrics::RECONCILER.spawned.inc();
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_reconcile_spawn
.inc();
let result_tx = result_tx.clone();
let join_handle = tokio::task::spawn(
async move {
@@ -736,10 +742,12 @@ impl TenantState {
// TODO: wrap all remote API operations in cancellation check
// as well.
if reconciler.cancel.is_cancelled() {
metrics::RECONCILER
.complete
.with_label_values(&[metrics::ReconcilerMetrics::CANCEL])
.inc();
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_reconcile_complete
.inc(ReconcileCompleteLabelGroup {
status: ReconcileOutcome::Cancel,
});
return;
}
@@ -754,18 +762,18 @@ impl TenantState {
}
// Update result counter
match &result {
Ok(_) => metrics::RECONCILER
.complete
.with_label_values(&[metrics::ReconcilerMetrics::SUCCESS]),
Err(ReconcileError::Cancel) => metrics::RECONCILER
.complete
.with_label_values(&[metrics::ReconcilerMetrics::CANCEL]),
Err(_) => metrics::RECONCILER
.complete
.with_label_values(&[metrics::ReconcilerMetrics::ERROR]),
}
.inc();
let outcome_label = match &result {
Ok(_) => ReconcileOutcome::Success,
Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
Err(_) => ReconcileOutcome::Error,
};
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_reconcile_complete
.inc(ReconcileCompleteLabelGroup {
status: outcome_label,
});
result_tx
.send(ReconcileResult {

View File

@@ -245,7 +245,7 @@ impl std::io::Write for ChannelWriter {
}
}
async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
SERVE_METRICS_COUNT.inc();
let started_at = std::time::Instant::now();
@@ -367,7 +367,6 @@ pub fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
.middleware(Middleware::post_with_info(
add_request_id_header_to_response,
))
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
.err_handler(route_error_handler)
}

View File

@@ -36,6 +36,7 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::JwtAuth;
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::prometheus_metrics_handler;
use utils::http::endpoint::request_span;
use utils::http::json::json_request_or_empty_body;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
@@ -2266,6 +2267,7 @@ pub fn make_router(
Ok(router
.data(state)
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
.get("/v1/status", |r| api_handler(r, status_handler))
.put("/v1/failpoints", |r| {
testing_api_handler("manage failpoints", r, failpoints_handler)

View File

@@ -2,14 +2,21 @@ use anyhow::{anyhow, bail};
use hyper::{Body, Request, Response, StatusCode};
use std::{convert::Infallible, net::TcpListener};
use tracing::info;
use utils::http::{endpoint, error::ApiError, json::json_response, RouterBuilder, RouterService};
use utils::http::{
endpoint::{self, prometheus_metrics_handler, request_span},
error::ApiError,
json::json_response,
RouterBuilder, RouterService,
};
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(StatusCode::OK, "")
}
fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
endpoint::make_router().get("/v1/status", status_handler)
endpoint::make_router()
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
.get("/v1/status", status_handler)
}
pub async fn task_main(http_listener: TcpListener) -> anyhow::Result<Infallible> {

View File

@@ -20,7 +20,7 @@ use std::io::Write as _;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::{info_span, Instrument};
use utils::http::endpoint::{request_span, ChannelWriter};
use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter};
use crate::debug_dump::TimelineDigestRequest;
use crate::receive_wal::WalReceiverState;
@@ -515,6 +515,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
router
.data(Arc::new(conf))
.data(auth)
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
.get("/v1/status", |r| request_span(r, status_handler))
.put("/v1/failpoints", |r| {
request_span(r, move |r| async {

View File

@@ -278,18 +278,14 @@ def test_sharding_split_smoke(
# Check that no cancelled or errored reconciliations occurred: this test does no
# failure injection and should run clean.
assert (
env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "cancel"}
)
is None
cancelled_reconciles = env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "cancel"}
)
assert (
env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "error"}
)
is None
errored_reconciles = env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "error"}
)
assert cancelled_reconciles is not None and int(cancelled_reconciles) == 0
assert errored_reconciles is not None and int(errored_reconciles) == 0
env.storage_controller.consistency_check()