From d19c5248c960baffcd5ae096e8f18bbd5dfd45e0 Mon Sep 17 00:00:00 2001 From: Shany Pozin Date: Wed, 1 Mar 2023 18:09:08 +0200 Subject: [PATCH] Add UUID header to mgmt API (#3708) ## Describe your changes ## Issue ticket number and link #3479 ## Checklist before requesting a review - [x] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. --- Cargo.lock | 3 +- libs/utils/Cargo.toml | 3 +- libs/utils/src/http/endpoint.rs | 116 ++++++++++++++++++++++++++++++-- workspace_hack/Cargo.toml | 1 - 4 files changed, 116 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6b23144182..02b03e02fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4505,6 +4505,7 @@ dependencies = [ "byteorder", "bytes", "criterion", + "futures", "git-version", "heapless", "hex", @@ -4534,6 +4535,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "uuid", "workspace_hack", ] @@ -4840,7 +4842,6 @@ dependencies = [ "either", "fail", "futures", - "futures-channel", "futures-executor", "futures-util", "hashbrown 0.12.3", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 92e805ac58..6acdb6fa53 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -13,6 +13,7 @@ bincode.workspace = true bytes.workspace = true heapless.workspace = true hyper = { workspace = true, features = ["full"] } +futures = { workspace = true} routerify.workspace = true serde.workspace = true serde_json.workspace = true @@ -39,7 +40,7 @@ pq_proto.workspace = true workspace_hack.workspace = true url.workspace = true - +uuid = { version = "1.2", features = ["v4", "serde"] } [dev-dependencies] byteorder.workspace = true bytes.workspace = true diff --git a/libs/utils/src/http/endpoint.rs b/libs/utils/src/http/endpoint.rs index 9c300de7a7..41975f6944 100644 --- a/libs/utils/src/http/endpoint.rs +++ b/libs/utils/src/http/endpoint.rs @@ -8,8 +8,7 @@ use hyper::{Method, StatusCode}; use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder}; use once_cell::sync::Lazy; use routerify::ext::RequestExt; -use routerify::RequestInfo; -use routerify::{Middleware, Router, RouterBuilder, RouterService}; +use routerify::{Middleware, RequestInfo, Router, RouterBuilder, RouterService}; use tokio::task::JoinError; use tracing; @@ -27,14 +26,35 @@ static SERVE_METRICS_COUNT: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); +static X_REQUEST_ID_HEADER_STR: &str = "x-request-id"; + +static X_REQUEST_ID_HEADER: HeaderName = HeaderName::from_static(X_REQUEST_ID_HEADER_STR); +#[derive(Debug, Default, Clone)] +struct RequestId(String); + async fn logger(res: Response, info: RequestInfo) -> Result, ApiError> { + let request_id = info.context::().unwrap_or_default().0; + // cannot factor out the Level to avoid the repetition // because tracing can only work with const Level // which is not the case here + if info.method() == Method::GET && res.status() == StatusCode::OK { - tracing::debug!("{} {} {}", info.method(), info.uri().path(), res.status()); + tracing::debug!( + "{} {} {} {}", + info.method(), + info.uri().path(), + request_id, + res.status() + ); } else { - tracing::info!("{} {} {}", info.method(), info.uri().path(), res.status()); + tracing::info!( + "{} {} {} {}", + info.method(), + info.uri().path(), + request_id, + res.status() + ); } Ok(res) } @@ -63,9 +83,52 @@ async fn prometheus_metrics_handler(_req: Request) -> Result( +) -> Middleware { + Middleware::pre(move |req| async move { + let request_id = match req.headers().get(&X_REQUEST_ID_HEADER) { + Some(request_id) => request_id + .to_str() + .expect("extract request id value") + .to_owned(), + None => { + let request_id = uuid::Uuid::new_v4(); + request_id.to_string() + } + }; + + if req.method() == Method::GET { + tracing::debug!("{} {} {}", req.method(), req.uri().path(), request_id); + } else { + tracing::info!("{} {} {}", req.method(), req.uri().path(), request_id); + } + req.set_context(RequestId(request_id)); + + Ok(req) + }) +} + +async fn add_request_id_header_to_response( + mut res: Response, + req_info: RequestInfo, +) -> Result, ApiError> { + if let Some(request_id) = req_info.context::() { + if let Ok(request_header_value) = HeaderValue::from_str(&request_id.0) { + res.headers_mut() + .insert(&X_REQUEST_ID_HEADER, request_header_value); + }; + }; + + Ok(res) +} + pub fn make_router() -> RouterBuilder { Router::builder() + .middleware(add_request_id_middleware()) .middleware(Middleware::post_with_info(logger)) + .middleware(Middleware::post_with_info( + add_request_id_header_to_response, + )) .get("/metrics", prometheus_metrics_handler) .err_handler(error::handler) } @@ -231,3 +294,48 @@ where Ok(()) } +#[cfg(test)] +mod tests { + use super::*; + use futures::future::poll_fn; + use hyper::service::Service; + use routerify::RequestServiceBuilder; + use std::net::{IpAddr, SocketAddr}; + + #[tokio::test] + async fn test_request_id_returned() { + let builder = RequestServiceBuilder::new(make_router().build().unwrap()).unwrap(); + let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80); + let mut service = builder.build(remote_addr); + if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await { + panic!("request service is not ready: {:?}", e); + } + + let mut req: Request = Request::default(); + req.headers_mut() + .append(&X_REQUEST_ID_HEADER, HeaderValue::from_str("42").unwrap()); + + let resp: Response = service.call(req).await.unwrap(); + + let header_val = resp.headers().get(&X_REQUEST_ID_HEADER).unwrap(); + + assert!(header_val == "42", "response header mismatch"); + } + + #[tokio::test] + async fn test_request_id_empty() { + let builder = RequestServiceBuilder::new(make_router().build().unwrap()).unwrap(); + let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80); + let mut service = builder.build(remote_addr); + if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await { + panic!("request service is not ready: {:?}", e); + } + + let req: Request = Request::default(); + let resp: Response = service.call(req).await.unwrap(); + + let header_val = resp.headers().get(&X_REQUEST_ID_HEADER); + + assert_ne!(header_val, None, "response header should NOT be empty"); + } +} diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index c0cf3c5611..bd21095fff 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -21,7 +21,6 @@ crossbeam-utils = { version = "0.8" } either = { version = "1" } fail = { version = "0.5", default-features = false, features = ["failpoints"] } futures = { version = "0.3" } -futures-channel = { version = "0.3", features = ["sink"] } futures-executor = { version = "0.3" } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } hashbrown = { version = "0.12", features = ["raw"] }