mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-28 00:23:00 +00:00
Add UUID header to mgmt API (#3708)
## Describe your changes ## Issue ticket number and link #3479 ## Checklist before requesting a review - [x] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section.
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -4505,6 +4505,7 @@ dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"criterion",
|
||||
"futures",
|
||||
"git-version",
|
||||
"heapless",
|
||||
"hex",
|
||||
@@ -4534,6 +4535,7 @@ dependencies = [
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"uuid",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -4840,7 +4842,6 @@ dependencies = [
|
||||
"either",
|
||||
"fail",
|
||||
"futures",
|
||||
"futures-channel",
|
||||
"futures-executor",
|
||||
"futures-util",
|
||||
"hashbrown 0.12.3",
|
||||
|
||||
@@ -13,6 +13,7 @@ bincode.workspace = true
|
||||
bytes.workspace = true
|
||||
heapless.workspace = true
|
||||
hyper = { workspace = true, features = ["full"] }
|
||||
futures = { workspace = true}
|
||||
routerify.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
@@ -39,7 +40,7 @@ pq_proto.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
uuid = { version = "1.2", features = ["v4", "serde"] }
|
||||
[dev-dependencies]
|
||||
byteorder.workspace = true
|
||||
bytes.workspace = true
|
||||
|
||||
@@ -8,8 +8,7 @@ use hyper::{Method, StatusCode};
|
||||
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
|
||||
use once_cell::sync::Lazy;
|
||||
use routerify::ext::RequestExt;
|
||||
use routerify::RequestInfo;
|
||||
use routerify::{Middleware, Router, RouterBuilder, RouterService};
|
||||
use routerify::{Middleware, RequestInfo, Router, RouterBuilder, RouterService};
|
||||
use tokio::task::JoinError;
|
||||
use tracing;
|
||||
|
||||
@@ -27,14 +26,35 @@ static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static X_REQUEST_ID_HEADER_STR: &str = "x-request-id";
|
||||
|
||||
static X_REQUEST_ID_HEADER: HeaderName = HeaderName::from_static(X_REQUEST_ID_HEADER_STR);
|
||||
#[derive(Debug, Default, Clone)]
|
||||
struct RequestId(String);
|
||||
|
||||
async fn logger(res: Response<Body>, info: RequestInfo) -> Result<Response<Body>, ApiError> {
|
||||
let request_id = info.context::<RequestId>().unwrap_or_default().0;
|
||||
|
||||
// cannot factor out the Level to avoid the repetition
|
||||
// because tracing can only work with const Level
|
||||
// which is not the case here
|
||||
|
||||
if info.method() == Method::GET && res.status() == StatusCode::OK {
|
||||
tracing::debug!("{} {} {}", info.method(), info.uri().path(), res.status());
|
||||
tracing::debug!(
|
||||
"{} {} {} {}",
|
||||
info.method(),
|
||||
info.uri().path(),
|
||||
request_id,
|
||||
res.status()
|
||||
);
|
||||
} else {
|
||||
tracing::info!("{} {} {}", info.method(), info.uri().path(), res.status());
|
||||
tracing::info!(
|
||||
"{} {} {} {}",
|
||||
info.method(),
|
||||
info.uri().path(),
|
||||
request_id,
|
||||
res.status()
|
||||
);
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
@@ -63,9 +83,52 @@ async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
|
||||
) -> Middleware<B, ApiError> {
|
||||
Middleware::pre(move |req| async move {
|
||||
let request_id = match req.headers().get(&X_REQUEST_ID_HEADER) {
|
||||
Some(request_id) => request_id
|
||||
.to_str()
|
||||
.expect("extract request id value")
|
||||
.to_owned(),
|
||||
None => {
|
||||
let request_id = uuid::Uuid::new_v4();
|
||||
request_id.to_string()
|
||||
}
|
||||
};
|
||||
|
||||
if req.method() == Method::GET {
|
||||
tracing::debug!("{} {} {}", req.method(), req.uri().path(), request_id);
|
||||
} else {
|
||||
tracing::info!("{} {} {}", req.method(), req.uri().path(), request_id);
|
||||
}
|
||||
req.set_context(RequestId(request_id));
|
||||
|
||||
Ok(req)
|
||||
})
|
||||
}
|
||||
|
||||
async fn add_request_id_header_to_response(
|
||||
mut res: Response<Body>,
|
||||
req_info: RequestInfo,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
if let Some(request_id) = req_info.context::<RequestId>() {
|
||||
if let Ok(request_header_value) = HeaderValue::from_str(&request_id.0) {
|
||||
res.headers_mut()
|
||||
.insert(&X_REQUEST_ID_HEADER, request_header_value);
|
||||
};
|
||||
};
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
|
||||
Router::builder()
|
||||
.middleware(add_request_id_middleware())
|
||||
.middleware(Middleware::post_with_info(logger))
|
||||
.middleware(Middleware::post_with_info(
|
||||
add_request_id_header_to_response,
|
||||
))
|
||||
.get("/metrics", prometheus_metrics_handler)
|
||||
.err_handler(error::handler)
|
||||
}
|
||||
@@ -231,3 +294,48 @@ where
|
||||
|
||||
Ok(())
|
||||
}
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures::future::poll_fn;
|
||||
use hyper::service::Service;
|
||||
use routerify::RequestServiceBuilder;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_request_id_returned() {
|
||||
let builder = RequestServiceBuilder::new(make_router().build().unwrap()).unwrap();
|
||||
let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80);
|
||||
let mut service = builder.build(remote_addr);
|
||||
if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await {
|
||||
panic!("request service is not ready: {:?}", e);
|
||||
}
|
||||
|
||||
let mut req: Request<Body> = Request::default();
|
||||
req.headers_mut()
|
||||
.append(&X_REQUEST_ID_HEADER, HeaderValue::from_str("42").unwrap());
|
||||
|
||||
let resp: Response<hyper::body::Body> = service.call(req).await.unwrap();
|
||||
|
||||
let header_val = resp.headers().get(&X_REQUEST_ID_HEADER).unwrap();
|
||||
|
||||
assert!(header_val == "42", "response header mismatch");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_request_id_empty() {
|
||||
let builder = RequestServiceBuilder::new(make_router().build().unwrap()).unwrap();
|
||||
let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80);
|
||||
let mut service = builder.build(remote_addr);
|
||||
if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await {
|
||||
panic!("request service is not ready: {:?}", e);
|
||||
}
|
||||
|
||||
let req: Request<Body> = Request::default();
|
||||
let resp: Response<hyper::body::Body> = service.call(req).await.unwrap();
|
||||
|
||||
let header_val = resp.headers().get(&X_REQUEST_ID_HEADER);
|
||||
|
||||
assert_ne!(header_val, None, "response header should NOT be empty");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ crossbeam-utils = { version = "0.8" }
|
||||
either = { version = "1" }
|
||||
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
|
||||
futures = { version = "0.3" }
|
||||
futures-channel = { version = "0.3", features = ["sink"] }
|
||||
futures-executor = { version = "0.3" }
|
||||
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
|
||||
hashbrown = { version = "0.12", features = ["raw"] }
|
||||
|
||||
Reference in New Issue
Block a user