Split utils::http to separate crate (#10753)

Avoids compiling the crate and its dependencies into binaries that don't
need them. Shrinks the compute_ctl binary from about 31MB to 28MB in the
release-line-debug-size-lto profile.
This commit is contained in:
Heikki Linnakangas
2025-02-12 00:06:53 +02:00
committed by GitHub
parent 9491154eae
commit 635b67508b
42 changed files with 238 additions and 162 deletions

View File

@@ -10,7 +10,7 @@ use jsonwebtoken::{
};
use serde::{Deserialize, Serialize};
use crate::{http::error::ApiError, id::TenantId};
use crate::id::TenantId;
/// Algorithm to use. We require EdDSA.
const STORAGE_TOKEN_ALGORITHM: Algorithm = Algorithm::EdDSA;
@@ -90,15 +90,6 @@ impl Display for AuthError {
}
}
impl From<AuthError> for ApiError {
fn from(_value: AuthError) -> Self {
// Don't pass on the value of the AuthError as a precautionary measure.
// Being intentionally vague in public error communication hurts debugability
// but it is more secure.
ApiError::Forbidden("JWT authentication error".to_string())
}
}
pub struct JwtAuth {
decoding_keys: Vec<DecodingKey>,
validation: Validation,

View File

@@ -1,13 +1,6 @@
//! Failpoint support code shared between pageserver and safekeepers.
use crate::http::{
error::ApiError,
json::{json_request, json_response},
};
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::*;
/// Declare a failpoint that can use to `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
@@ -184,45 +177,3 @@ fn exit_failpoint() {
tracing::info!("Exit requested by failpoint");
std::process::exit(1);
}
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
/// Information for configuring a single fail point
#[derive(Debug, Serialize, Deserialize)]
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
pub actions: String,
}
/// Configure failpoints through http.
pub async fn failpoints_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Cannot manage failpoints because neon was compiled without failpoints support"
)));
}
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
for fp in failpoints {
info!("cfg failpoint: {} {}", fp.name, fp.actions);
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
if let Err(err_msg) = cfg_result {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Failed to configure failpoints: {err_msg}"
)));
}
}
json_response(StatusCode::OK, ())
}

View File

@@ -1,762 +0,0 @@
use crate::auth::{AuthError, Claims, SwappableJwtAuth};
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
use crate::http::request::{get_query_param, parse_query_param};
use crate::pprof;
use ::pprof::protos::Message as _;
use ::pprof::ProfilerGuardBuilder;
use anyhow::{anyhow, Context};
use bytes::{Bytes, BytesMut};
use hyper::header::{HeaderName, AUTHORIZATION, CONTENT_DISPOSITION};
use hyper::http::HeaderValue;
use hyper::Method;
use hyper::{header::CONTENT_TYPE, Body, Request, Response};
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
use once_cell::sync::Lazy;
use regex::Regex;
use routerify::ext::RequestExt;
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
use tokio::sync::{mpsc, Mutex, Notify};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
use tracing::{debug, info, info_span, warn, Instrument};
use std::future::Future;
use std::io::Write as _;
use std::str::FromStr;
use std::time::Duration;
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"libmetrics_metric_handler_requests_total",
"Number of metric requests made"
)
.expect("failed to define a metric")
});
static X_REQUEST_ID_HEADER_STR: &str = "x-request-id";
static X_REQUEST_ID_HEADER: HeaderName = HeaderName::from_static(X_REQUEST_ID_HEADER_STR);
#[derive(Debug, Default, Clone)]
struct RequestId(String);
/// Adds a tracing info_span! instrumentation around the handler events,
/// logs the request start and end events for non-GET requests and non-200 responses.
///
/// Usage: Replace `my_handler` with `|r| request_span(r, my_handler)`
///
/// Use this to distinguish between logs of different HTTP requests: every request handler wrapped
/// with this will get request info logged in the wrapping span, including the unique request ID.
///
/// This also handles errors, logging them and converting them to an HTTP error response.
///
/// NB: If the client disconnects, Hyper will drop the Future, without polling it to
/// completion. In other words, the handler must be async cancellation safe! request_span
/// prints a warning to the log when that happens, so that you have some trace of it in
/// the log.
///
///
/// There could be other ways to implement similar functionality:
///
/// * procmacros placed on top of all handler methods
/// With all the drawbacks of procmacros, brings no difference implementation-wise,
/// and little code reduction compared to the existing approach.
///
/// * Another `TraitExt` with e.g. the `get_with_span`, `post_with_span` methods to do similar logic,
/// implemented for [`RouterBuilder`].
/// Could be simpler, but we don't want to depend on [`routerify`] more, targeting to use other library later.
///
/// * In theory, a span guard could've been created in a pre-request middleware and placed into a global collection, to be dropped
/// later, in a post-response middleware.
/// Due to suspendable nature of the futures, would give contradictive results which is exactly the opposite of what `tracing-futures`
/// tries to achive with its `.instrument` used in the current approach.
///
/// If needed, a declarative macro to substitute the |r| ... closure boilerplate could be introduced.
pub async fn request_span<R, H>(request: Request<Body>, handler: H) -> R::Output
where
R: Future<Output = Result<Response<Body>, ApiError>> + Send + 'static,
H: FnOnce(Request<Body>) -> R + Send + Sync + 'static,
{
let request_id = request.context::<RequestId>().unwrap_or_default().0;
let method = request.method();
let path = request.uri().path();
let request_span = info_span!("request", %method, %path, %request_id);
let log_quietly = method == Method::GET;
async move {
let cancellation_guard = RequestCancelled::warn_when_dropped_without_responding();
if log_quietly {
debug!("Handling request");
} else {
info!("Handling request");
}
// No special handling for panics here. There's a `tracing_panic_hook` from another
// module to do that globally.
let res = handler(request).await;
cancellation_guard.disarm();
// Log the result if needed.
//
// We also convert any errors into an Ok response with HTTP error code here.
// `make_router` sets a last-resort error handler that would do the same, but
// we prefer to do it here, before we exit the request span, so that the error
// is still logged with the span.
//
// (Because we convert errors to Ok response, we never actually return an error,
// and we could declare the function to return the never type (`!`). However,
// using `routerify::RouterBuilder` requires a proper error type.)
match res {
Ok(response) => {
let response_status = response.status();
if log_quietly && response_status.is_success() {
debug!("Request handled, status: {response_status}");
} else {
info!("Request handled, status: {response_status}");
}
Ok(response)
}
Err(err) => Ok(api_error_handler(err)),
}
}
.instrument(request_span)
.await
}
/// Drop guard to WARN in case the request was dropped before completion.
struct RequestCancelled {
warn: Option<tracing::Span>,
}
impl RequestCancelled {
/// Create the drop guard using the [`tracing::Span::current`] as the span.
fn warn_when_dropped_without_responding() -> Self {
RequestCancelled {
warn: Some(tracing::Span::current()),
}
}
/// Consume the drop guard without logging anything.
fn disarm(mut self) {
self.warn = None;
}
}
impl Drop for RequestCancelled {
fn drop(&mut self) {
if std::thread::panicking() {
// we are unwinding due to panicking, assume we are not dropped for cancellation
} else if let Some(span) = self.warn.take() {
// the span has all of the info already, but the outer `.instrument(span)` has already
// been dropped, so we need to manually re-enter it for this message.
//
// this is what the instrument would do before polling so it is fine.
let _g = span.entered();
warn!("request was dropped before completing");
}
}
}
/// An [`std::io::Write`] implementation on top of a channel sending [`bytes::Bytes`] chunks.
pub struct ChannelWriter {
buffer: BytesMut,
pub tx: mpsc::Sender<std::io::Result<Bytes>>,
written: usize,
/// Time spent waiting for the channel to make progress. It is not the same as time to upload a
/// buffer because we cannot know anything about that, but this should allow us to understand
/// the actual time taken without the time spent `std::thread::park`ed.
wait_time: std::time::Duration,
}
impl ChannelWriter {
pub fn new(buf_len: usize, tx: mpsc::Sender<std::io::Result<Bytes>>) -> Self {
assert_ne!(buf_len, 0);
ChannelWriter {
// split about half off the buffer from the start, because we flush depending on
// capacity. first flush will come sooner than without this, but now resizes will
// have better chance of picking up the "other" half. not guaranteed of course.
buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2),
tx,
written: 0,
wait_time: std::time::Duration::ZERO,
}
}
pub fn flush0(&mut self) -> std::io::Result<usize> {
let n = self.buffer.len();
if n == 0 {
return Ok(0);
}
tracing::trace!(n, "flushing");
let ready = self.buffer.split().freeze();
let wait_started_at = std::time::Instant::now();
// not ideal to call from blocking code to block_on, but we are sure that this
// operation does not spawn_blocking other tasks
let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async {
self.tx.send(Ok(ready)).await.map_err(|_| ())?;
// throttle sending to allow reuse of our buffer in `write`.
self.tx.reserve().await.map_err(|_| ())?;
// now the response task has picked up the buffer and hopefully started
// sending it to the client.
Ok(())
});
self.wait_time += wait_started_at.elapsed();
if res.is_err() {
return Err(std::io::ErrorKind::BrokenPipe.into());
}
self.written += n;
Ok(n)
}
pub fn flushed_bytes(&self) -> usize {
self.written
}
pub fn wait_time(&self) -> std::time::Duration {
self.wait_time
}
}
impl std::io::Write for ChannelWriter {
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
let remaining = self.buffer.capacity() - self.buffer.len();
let out_of_space = remaining < buf.len();
let original_len = buf.len();
if out_of_space {
let can_still_fit = buf.len() - remaining;
self.buffer.extend_from_slice(&buf[..can_still_fit]);
buf = &buf[can_still_fit..];
self.flush0()?;
}
// assume that this will often under normal operation just move the pointer back to the
// beginning of allocation, because previous split off parts are already sent and
// dropped.
self.buffer.extend_from_slice(buf);
Ok(original_len)
}
fn flush(&mut self) -> std::io::Result<()> {
self.flush0().map(|_| ())
}
}
pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
SERVE_METRICS_COUNT.inc();
let started_at = std::time::Instant::now();
let (tx, rx) = mpsc::channel(1);
let body = Body::wrap_stream(ReceiverStream::new(rx));
let mut writer = ChannelWriter::new(128 * 1024, tx);
let encoder = TextEncoder::new();
let response = Response::builder()
.status(200)
.header(CONTENT_TYPE, encoder.format_type())
.body(body)
.unwrap();
let span = info_span!("blocking");
tokio::task::spawn_blocking(move || {
// there are situations where we lose scraped metrics under load, try to gather some clues
// since all nodes are queried this, keep the message count low.
let spawned_at = std::time::Instant::now();
let _span = span.entered();
let metrics = metrics::gather();
let gathered_at = std::time::Instant::now();
let res = encoder
.encode(&metrics, &mut writer)
.and_then(|_| writer.flush().map_err(|e| e.into()));
// this instant is not when we finally got the full response sent, sending is done by hyper
// in another task.
let encoded_at = std::time::Instant::now();
let spawned_in = spawned_at - started_at;
let collected_in = gathered_at - spawned_at;
// remove the wait time here in case the tcp connection was clogged
let encoded_in = encoded_at - gathered_at - writer.wait_time();
let total = encoded_at - started_at;
match res {
Ok(()) => {
tracing::info!(
bytes = writer.flushed_bytes(),
total_ms = total.as_millis(),
spawning_ms = spawned_in.as_millis(),
collection_ms = collected_in.as_millis(),
encoding_ms = encoded_in.as_millis(),
"responded /metrics"
);
}
Err(e) => {
// there is a chance that this error is not the BrokenPipe we generate in the writer
// for "closed connection", but it is highly unlikely.
tracing::warn!(
after_bytes = writer.flushed_bytes(),
total_ms = total.as_millis(),
spawning_ms = spawned_in.as_millis(),
collection_ms = collected_in.as_millis(),
encoding_ms = encoded_in.as_millis(),
"failed to write out /metrics response: {e:?}"
);
// semantics of this error are quite... unclear. we want to error the stream out to
// abort the response to somehow notify the client that we failed.
//
// though, most likely the reason for failure is that the receiver is already gone.
drop(
writer
.tx
.blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
);
}
}
});
Ok(response)
}
/// Generates CPU profiles.
pub async fn profile_cpu_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
enum Format {
Pprof,
Svg,
}
// Parameters.
let format = match get_query_param(&req, "format")?.as_deref() {
None => Format::Pprof,
Some("pprof") => Format::Pprof,
Some("svg") => Format::Svg,
Some(format) => return Err(ApiError::BadRequest(anyhow!("invalid format {format}"))),
};
let seconds = match parse_query_param(&req, "seconds")? {
None => 5,
Some(seconds @ 1..=60) => seconds,
Some(_) => return Err(ApiError::BadRequest(anyhow!("duration must be 1-60 secs"))),
};
let frequency_hz = match parse_query_param(&req, "frequency")? {
None => 99,
Some(1001..) => return Err(ApiError::BadRequest(anyhow!("frequency must be <=1000 Hz"))),
Some(frequency) => frequency,
};
let force: bool = parse_query_param(&req, "force")?.unwrap_or_default();
// Take the profile.
static PROFILE_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
static PROFILE_CANCEL: Lazy<Notify> = Lazy::new(Notify::new);
let report = {
// Only allow one profiler at a time. If force is true, cancel a running profile (e.g. a
// Grafana continuous profile). We use a try_lock() loop when cancelling instead of waiting
// for a lock(), to avoid races where the notify isn't currently awaited.
let _lock = loop {
match PROFILE_LOCK.try_lock() {
Ok(lock) => break lock,
Err(_) if force => PROFILE_CANCEL.notify_waiters(),
Err(_) => {
return Err(ApiError::Conflict(
"profiler already running (use ?force=true to cancel it)".into(),
))
}
}
tokio::time::sleep(Duration::from_millis(1)).await; // don't busy-wait
};
let guard = ProfilerGuardBuilder::default()
.frequency(frequency_hz)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.map_err(|err| ApiError::InternalServerError(err.into()))?;
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(seconds)) => {},
_ = PROFILE_CANCEL.notified() => {},
};
guard
.report()
.build()
.map_err(|err| ApiError::InternalServerError(err.into()))?
};
// Return the report in the requested format.
match format {
Format::Pprof => {
let mut body = Vec::new();
report
.pprof()
.map_err(|err| ApiError::InternalServerError(err.into()))?
.write_to_vec(&mut body)
.map_err(|err| ApiError::InternalServerError(err.into()))?;
Response::builder()
.status(200)
.header(CONTENT_TYPE, "application/octet-stream")
.header(CONTENT_DISPOSITION, "attachment; filename=\"profile.pb\"")
.body(Body::from(body))
.map_err(|err| ApiError::InternalServerError(err.into()))
}
Format::Svg => {
let mut body = Vec::new();
report
.flamegraph(&mut body)
.map_err(|err| ApiError::InternalServerError(err.into()))?;
Response::builder()
.status(200)
.header(CONTENT_TYPE, "image/svg+xml")
.body(Body::from(body))
.map_err(|err| ApiError::InternalServerError(err.into()))
}
}
}
/// Generates heap profiles.
///
/// This only works with jemalloc on Linux.
pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
enum Format {
Jemalloc,
Pprof,
Svg,
}
// Parameters.
let format = match get_query_param(&req, "format")?.as_deref() {
None => Format::Pprof,
Some("jemalloc") => Format::Jemalloc,
Some("pprof") => Format::Pprof,
Some("svg") => Format::Svg,
Some(format) => return Err(ApiError::BadRequest(anyhow!("invalid format {format}"))),
};
// Functions and mappings to strip when symbolizing pprof profiles. If true,
// also remove child frames.
static STRIP_FUNCTIONS: Lazy<Vec<(Regex, bool)>> = Lazy::new(|| {
vec![
(Regex::new("^__rust").unwrap(), false),
(Regex::new("^_start$").unwrap(), false),
(Regex::new("^irallocx_prof").unwrap(), true),
(Regex::new("^prof_alloc_prep").unwrap(), true),
(Regex::new("^std::rt::lang_start").unwrap(), false),
(Regex::new("^std::sys::backtrace::__rust").unwrap(), false),
]
});
const STRIP_MAPPINGS: &[&str] = &["libc", "libgcc", "pthread", "vdso"];
// Obtain profiler handle.
let mut prof_ctl = jemalloc_pprof::PROF_CTL
.as_ref()
.ok_or(ApiError::InternalServerError(anyhow!(
"heap profiling not enabled"
)))?
.lock()
.await;
if !prof_ctl.activated() {
return Err(ApiError::InternalServerError(anyhow!(
"heap profiling not enabled"
)));
}
// Take and return the profile.
match format {
Format::Jemalloc => {
// NB: file is an open handle to a tempfile that's already deleted.
let file = tokio::task::spawn_blocking(move || prof_ctl.dump())
.await
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
.map_err(ApiError::InternalServerError)?;
let stream = ReaderStream::new(tokio::fs::File::from_std(file));
Response::builder()
.status(200)
.header(CONTENT_TYPE, "application/octet-stream")
.header(CONTENT_DISPOSITION, "attachment; filename=\"heap.dump\"")
.body(Body::wrap_stream(stream))
.map_err(|err| ApiError::InternalServerError(err.into()))
}
Format::Pprof => {
let data = tokio::task::spawn_blocking(move || {
let bytes = prof_ctl.dump_pprof()?;
// Symbolize the profile.
// TODO: consider moving this upstream to jemalloc_pprof and avoiding the
// serialization roundtrip.
let profile = pprof::decode(&bytes)?;
let profile = pprof::symbolize(profile)?;
let profile = pprof::strip_locations(profile, STRIP_MAPPINGS, &STRIP_FUNCTIONS);
pprof::encode(&profile)
})
.await
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
.map_err(ApiError::InternalServerError)?;
Response::builder()
.status(200)
.header(CONTENT_TYPE, "application/octet-stream")
.header(CONTENT_DISPOSITION, "attachment; filename=\"heap.pb\"")
.body(Body::from(data))
.map_err(|err| ApiError::InternalServerError(err.into()))
}
Format::Svg => {
let body = tokio::task::spawn_blocking(move || {
let bytes = prof_ctl.dump_pprof()?;
let profile = pprof::decode(&bytes)?;
let profile = pprof::symbolize(profile)?;
let profile = pprof::strip_locations(profile, STRIP_MAPPINGS, &STRIP_FUNCTIONS);
let mut opts = inferno::flamegraph::Options::default();
opts.title = "Heap inuse".to_string();
opts.count_name = "bytes".to_string();
pprof::flamegraph(profile, &mut opts)
})
.await
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
.map_err(ApiError::InternalServerError)?;
Response::builder()
.status(200)
.header(CONTENT_TYPE, "image/svg+xml")
.body(Body::from(body))
.map_err(|err| ApiError::InternalServerError(err.into()))
}
}
}
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
) -> Middleware<B, ApiError> {
Middleware::pre(move |req| async move {
let request_id = match req.headers().get(&X_REQUEST_ID_HEADER) {
Some(request_id) => request_id
.to_str()
.expect("extract request id value")
.to_owned(),
None => {
let request_id = uuid::Uuid::new_v4();
request_id.to_string()
}
};
req.set_context(RequestId(request_id));
Ok(req)
})
}
async fn add_request_id_header_to_response(
mut res: Response<Body>,
req_info: RequestInfo,
) -> Result<Response<Body>, ApiError> {
if let Some(request_id) = req_info.context::<RequestId>() {
if let Ok(request_header_value) = HeaderValue::from_str(&request_id.0) {
res.headers_mut()
.insert(&X_REQUEST_ID_HEADER, request_header_value);
};
};
Ok(res)
}
pub fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
Router::builder()
.middleware(add_request_id_middleware())
.middleware(Middleware::post_with_info(
add_request_id_header_to_response,
))
.err_handler(route_error_handler)
}
pub fn attach_openapi_ui(
router_builder: RouterBuilder<hyper::Body, ApiError>,
spec: &'static [u8],
spec_mount_path: &'static str,
ui_mount_path: &'static str,
) -> RouterBuilder<hyper::Body, ApiError> {
router_builder
.get(spec_mount_path,
move |r| request_span(r, move |_| async move {
Ok(Response::builder().body(Body::from(spec)).unwrap())
})
)
.get(ui_mount_path,
move |r| request_span(r, move |_| async move {
Ok(Response::builder().body(Body::from(format!(r#"
<!DOCTYPE html>
<html lang="en">
<head>
<title>rweb</title>
<link href="https://cdn.jsdelivr.net/npm/swagger-ui-dist@3/swagger-ui.css" rel="stylesheet">
</head>
<body>
<div id="swagger-ui"></div>
<script src="https://cdn.jsdelivr.net/npm/swagger-ui-dist@3/swagger-ui-bundle.js" charset="UTF-8"> </script>
<script>
window.onload = function() {{
const ui = SwaggerUIBundle({{
"dom_id": "\#swagger-ui",
presets: [
SwaggerUIBundle.presets.apis,
SwaggerUIBundle.SwaggerUIStandalonePreset
],
layout: "BaseLayout",
deepLinking: true,
showExtensions: true,
showCommonExtensions: true,
url: "{}",
}})
window.ui = ui;
}};
</script>
</body>
</html>
"#, spec_mount_path))).unwrap())
})
)
}
fn parse_token(header_value: &str) -> Result<&str, ApiError> {
// header must be in form Bearer <token>
let (prefix, token) = header_value
.split_once(' ')
.ok_or_else(|| ApiError::Unauthorized("malformed authorization header".to_string()))?;
if prefix != "Bearer" {
return Err(ApiError::Unauthorized(
"malformed authorization header".to_string(),
));
}
Ok(token)
}
pub fn auth_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
provide_auth: fn(&Request<Body>) -> Option<&SwappableJwtAuth>,
) -> Middleware<B, ApiError> {
Middleware::pre(move |req| async move {
if let Some(auth) = provide_auth(&req) {
match req.headers().get(AUTHORIZATION) {
Some(value) => {
let header_value = value.to_str().map_err(|_| {
ApiError::Unauthorized("malformed authorization header".to_string())
})?;
let token = parse_token(header_value)?;
let data = auth.decode(token).map_err(|err| {
warn!("Authentication error: {err}");
// Rely on From<AuthError> for ApiError impl
err
})?;
req.set_context(data.claims);
}
None => {
return Err(ApiError::Unauthorized(
"missing authorization header".to_string(),
))
}
}
}
Ok(req)
})
}
pub fn add_response_header_middleware<B>(
header: &str,
value: &str,
) -> anyhow::Result<Middleware<B, ApiError>>
where
B: hyper::body::HttpBody + Send + Sync + 'static,
{
let name =
HeaderName::from_str(header).with_context(|| format!("invalid header name: {header}"))?;
let value =
HeaderValue::from_str(value).with_context(|| format!("invalid header value: {value}"))?;
Ok(Middleware::post_with_info(
move |mut response, request_info| {
let name = name.clone();
let value = value.clone();
async move {
let headers = response.headers_mut();
if headers.contains_key(&name) {
warn!(
"{} response already contains header {:?}",
request_info.uri(),
&name,
);
} else {
headers.insert(name, value);
}
Ok(response)
}
},
))
}
pub fn check_permission_with(
req: &Request<Body>,
check_permission: impl Fn(&Claims) -> Result<(), AuthError>,
) -> Result<(), ApiError> {
match req.context::<Claims>() {
Some(claims) => Ok(check_permission(&claims)
.map_err(|_err| ApiError::Forbidden("JWT authentication error".to_string()))?),
None => Ok(()), // claims is None because auth is disabled
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::future::poll_fn;
use hyper::service::Service;
use routerify::RequestServiceBuilder;
use std::net::{IpAddr, SocketAddr};
#[tokio::test]
async fn test_request_id_returned() {
let builder = RequestServiceBuilder::new(make_router().build().unwrap()).unwrap();
let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80);
let mut service = builder.build(remote_addr);
if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await {
panic!("request service is not ready: {:?}", e);
}
let mut req: Request<Body> = Request::default();
req.headers_mut()
.append(&X_REQUEST_ID_HEADER, HeaderValue::from_str("42").unwrap());
let resp: Response<hyper::body::Body> = service.call(req).await.unwrap();
let header_val = resp.headers().get(&X_REQUEST_ID_HEADER).unwrap();
assert!(header_val == "42", "response header mismatch");
}
#[tokio::test]
async fn test_request_id_empty() {
let builder = RequestServiceBuilder::new(make_router().build().unwrap()).unwrap();
let remote_addr = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 80);
let mut service = builder.build(remote_addr);
if let Err(e) = poll_fn(|ctx| service.poll_ready(ctx)).await {
panic!("request service is not ready: {:?}", e);
}
let req: Request<Body> = Request::default();
let resp: Response<hyper::body::Body> = service.call(req).await.unwrap();
let header_val = resp.headers().get(&X_REQUEST_ID_HEADER);
assert_ne!(header_val, None, "response header should NOT be empty");
}
}

View File

@@ -1,155 +0,0 @@
use hyper::{header, Body, Response, StatusCode};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::error::Error as StdError;
use thiserror::Error;
use tracing::{error, info, warn};
#[derive(Debug, Error)]
pub enum ApiError {
#[error("Bad request: {0:#?}")]
BadRequest(anyhow::Error),
#[error("Forbidden: {0}")]
Forbidden(String),
#[error("Unauthorized: {0}")]
Unauthorized(String),
#[error("NotFound: {0}")]
NotFound(Box<dyn StdError + Send + Sync + 'static>),
#[error("Conflict: {0}")]
Conflict(String),
#[error("Precondition failed: {0}")]
PreconditionFailed(Box<str>),
#[error("Resource temporarily unavailable: {0}")]
ResourceUnavailable(Cow<'static, str>),
#[error("Too many requests: {0}")]
TooManyRequests(Cow<'static, str>),
#[error("Shutting down")]
ShuttingDown,
#[error("Timeout")]
Timeout(Cow<'static, str>),
#[error("Request cancelled")]
Cancelled,
#[error(transparent)]
InternalServerError(anyhow::Error),
}
impl ApiError {
pub fn into_response(self) -> Response<Body> {
match self {
ApiError::BadRequest(err) => HttpErrorBody::response_from_msg_and_status(
format!("{err:#?}"), // use debug printing so that we give the cause
StatusCode::BAD_REQUEST,
),
ApiError::Forbidden(_) => {
HttpErrorBody::response_from_msg_and_status(self.to_string(), StatusCode::FORBIDDEN)
}
ApiError::Unauthorized(_) => HttpErrorBody::response_from_msg_and_status(
self.to_string(),
StatusCode::UNAUTHORIZED,
),
ApiError::NotFound(_) => {
HttpErrorBody::response_from_msg_and_status(self.to_string(), StatusCode::NOT_FOUND)
}
ApiError::Conflict(_) => {
HttpErrorBody::response_from_msg_and_status(self.to_string(), StatusCode::CONFLICT)
}
ApiError::PreconditionFailed(_) => HttpErrorBody::response_from_msg_and_status(
self.to_string(),
StatusCode::PRECONDITION_FAILED,
),
ApiError::ShuttingDown => HttpErrorBody::response_from_msg_and_status(
"Shutting down".to_string(),
StatusCode::SERVICE_UNAVAILABLE,
),
ApiError::ResourceUnavailable(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
),
ApiError::TooManyRequests(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::TOO_MANY_REQUESTS,
),
ApiError::Timeout(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::REQUEST_TIMEOUT,
),
ApiError::Cancelled => HttpErrorBody::response_from_msg_and_status(
self.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
),
ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status(
format!("{err:#}"), // use alternative formatting so that we give the cause without backtrace
StatusCode::INTERNAL_SERVER_ERROR,
),
}
}
}
#[derive(Serialize, Deserialize)]
pub struct HttpErrorBody {
pub msg: String,
}
impl HttpErrorBody {
pub fn from_msg(msg: String) -> Self {
HttpErrorBody { msg }
}
pub fn response_from_msg_and_status(msg: String, status: StatusCode) -> Response<Body> {
HttpErrorBody { msg }.to_response(status)
}
pub fn to_response(&self, status: StatusCode) -> Response<Body> {
Response::builder()
.status(status)
.header(header::CONTENT_TYPE, "application/json")
// we do not have nested maps with non string keys so serialization shouldn't fail
.body(Body::from(serde_json::to_string(self).unwrap()))
.unwrap()
}
}
pub async fn route_error_handler(err: routerify::RouteError) -> Response<Body> {
match err.downcast::<ApiError>() {
Ok(api_error) => api_error_handler(*api_error),
Err(other_error) => {
// We expect all the request handlers to return an ApiError, so this should
// not be reached. But just in case.
error!("Error processing HTTP request: {other_error:?}");
HttpErrorBody::response_from_msg_and_status(
other_error.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
)
}
}
}
pub fn api_error_handler(api_error: ApiError) -> Response<Body> {
// Print a stack trace for Internal Server errors
match api_error {
ApiError::Forbidden(_) | ApiError::Unauthorized(_) => {
warn!("Error processing HTTP request: {api_error:#}")
}
ApiError::ResourceUnavailable(_) => info!("Error processing HTTP request: {api_error:#}"),
ApiError::NotFound(_) => info!("Error processing HTTP request: {api_error:#}"),
ApiError::InternalServerError(_) => error!("Error processing HTTP request: {api_error:?}"),
ApiError::ShuttingDown => info!("Shut down while processing HTTP request"),
ApiError::Timeout(_) => info!("Timeout while processing HTTP request: {api_error:#}"),
ApiError::Cancelled => info!("Request cancelled while processing HTTP request"),
_ => info!("Error processing HTTP request: {api_error:#}"),
}
api_error.into_response()
}

View File

@@ -1,65 +0,0 @@
use anyhow::Context;
use bytes::Buf;
use hyper::{header, Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use super::error::ApiError;
/// Parse a json request body and deserialize it to the type `T`.
pub async fn json_request<T: for<'de> Deserialize<'de>>(
request: &mut Request<Body>,
) -> Result<T, ApiError> {
let body = hyper::body::aggregate(request.body_mut())
.await
.context("Failed to read request body")
.map_err(ApiError::BadRequest)?;
if body.remaining() == 0 {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"missing request body"
)));
}
let mut deser = serde_json::de::Deserializer::from_reader(body.reader());
serde_path_to_error::deserialize(&mut deser)
// intentionally stringify because the debug version is not helpful in python logs
.map_err(|e| anyhow::anyhow!("Failed to parse json request: {e}"))
.map_err(ApiError::BadRequest)
}
/// Parse a json request body and deserialize it to the type `T`. If the body is empty, return `T::default`.
pub async fn json_request_maybe<T: for<'de> Deserialize<'de> + Default>(
request: &mut Request<Body>,
) -> Result<T, ApiError> {
let body = hyper::body::aggregate(request.body_mut())
.await
.context("Failed to read request body")
.map_err(ApiError::BadRequest)?;
if body.remaining() == 0 {
return Ok(T::default());
}
let mut deser = serde_json::de::Deserializer::from_reader(body.reader());
serde_path_to_error::deserialize(&mut deser)
// intentionally stringify because the debug version is not helpful in python logs
.map_err(|e| anyhow::anyhow!("Failed to parse json request: {e}"))
.map_err(ApiError::BadRequest)
}
pub fn json_response<T: Serialize>(
status: StatusCode,
data: T,
) -> Result<Response<Body>, ApiError> {
let json = serde_json::to_string(&data)
.context("Failed to serialize JSON response")
.map_err(ApiError::InternalServerError)?;
let response = Response::builder()
.status(status)
.header(header::CONTENT_TYPE, "application/json")
.body(Body::from(json))
.map_err(|e| ApiError::InternalServerError(e.into()))?;
Ok(response)
}

View File

@@ -1,8 +0,0 @@
pub mod endpoint;
pub mod error;
pub mod json;
pub mod request;
/// Current fast way to apply simple http routing in various Neon binaries.
/// Re-exported for sake of uniform approach, that could be later replaced with better alternatives, if needed.
pub use routerify::{ext::RequestExt, RouterBuilder, RouterService};

View File

@@ -1,91 +0,0 @@
use core::fmt;
use std::{borrow::Cow, str::FromStr};
use super::error::ApiError;
use anyhow::anyhow;
use hyper::{body::HttpBody, Body, Request};
use routerify::ext::RequestExt;
pub fn get_request_param<'a>(
request: &'a Request<Body>,
param_name: &str,
) -> Result<&'a str, ApiError> {
match request.param(param_name) {
Some(arg) => Ok(arg),
None => Err(ApiError::BadRequest(anyhow!(
"no {param_name} specified in path param",
))),
}
}
pub fn parse_request_param<T: FromStr>(
request: &Request<Body>,
param_name: &str,
) -> Result<T, ApiError> {
match get_request_param(request, param_name)?.parse() {
Ok(v) => Ok(v),
Err(_) => Err(ApiError::BadRequest(anyhow!(
"failed to parse {param_name}",
))),
}
}
pub fn get_query_param<'a>(
request: &'a Request<Body>,
param_name: &str,
) -> Result<Option<Cow<'a, str>>, ApiError> {
let query = match request.uri().query() {
Some(q) => q,
None => return Ok(None),
};
let mut values = url::form_urlencoded::parse(query.as_bytes())
.filter_map(|(k, v)| if k == param_name { Some(v) } else { None })
// we call .next() twice below. If it's None the first time, .fuse() ensures it's None afterwards
.fuse();
let value1 = values.next();
if values.next().is_some() {
return Err(ApiError::BadRequest(anyhow!(
"param {param_name} specified more than once"
)));
}
Ok(value1)
}
pub fn must_get_query_param<'a>(
request: &'a Request<Body>,
param_name: &str,
) -> Result<Cow<'a, str>, ApiError> {
get_query_param(request, param_name)?.ok_or_else(|| {
ApiError::BadRequest(anyhow!("no {param_name} specified in query parameters"))
})
}
pub fn parse_query_param<E: fmt::Display, T: FromStr<Err = E>>(
request: &Request<Body>,
param_name: &str,
) -> Result<Option<T>, ApiError> {
get_query_param(request, param_name)?
.map(|v| {
v.parse().map_err(|e| {
ApiError::BadRequest(anyhow!("cannot parse query param {param_name}: {e}"))
})
})
.transpose()
}
pub fn must_parse_query_param<E: fmt::Display, T: FromStr<Err = E>>(
request: &Request<Body>,
param_name: &str,
) -> Result<T, ApiError> {
parse_query_param(request, param_name)?.ok_or_else(|| {
ApiError::BadRequest(anyhow!("no {param_name} specified in query parameters"))
})
}
pub async fn ensure_no_body(request: &mut Request<Body>) -> Result<(), ApiError> {
match request.body_mut().data().await {
Some(_) => Err(ApiError::BadRequest(anyhow!("Unexpected request body"))),
None => Ok(()),
}
}

View File

@@ -2,8 +2,6 @@
//! between other crates in this repository.
#![deny(clippy::undocumented_unsafe_blocks)]
extern crate hyper0 as hyper;
pub mod backoff;
/// `Lsn` type implements common tasks on Log Sequence Numbers
@@ -33,9 +31,6 @@ pub mod shard;
mod hex;
pub use hex::Hex;
// http endpoint utils
pub mod http;
// definition of the Generation type for pageserver attachment APIs
pub mod generation;
@@ -96,8 +91,6 @@ pub mod circuit_breaker;
pub mod try_rcu;
pub mod pprof;
pub mod guard_arc_swap;
// Re-export used in macro. Avoids adding git-version as dep in target crates.

View File

@@ -1,247 +0,0 @@
use anyhow::bail;
use flate2::write::{GzDecoder, GzEncoder};
use flate2::Compression;
use itertools::Itertools as _;
use once_cell::sync::Lazy;
use pprof::protos::{Function, Line, Location, Message as _, Profile};
use regex::Regex;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::ffi::c_void;
use std::io::Write as _;
/// Decodes a gzip-compressed Protobuf-encoded pprof profile.
pub fn decode(bytes: &[u8]) -> anyhow::Result<Profile> {
let mut gz = GzDecoder::new(Vec::new());
gz.write_all(bytes)?;
Ok(Profile::parse_from_bytes(&gz.finish()?)?)
}
/// Encodes a pprof profile as gzip-compressed Protobuf.
pub fn encode(profile: &Profile) -> anyhow::Result<Vec<u8>> {
let mut gz = GzEncoder::new(Vec::new(), Compression::default());
profile.write_to_writer(&mut gz)?;
Ok(gz.finish()?)
}
/// Symbolizes a pprof profile using the current binary.
pub fn symbolize(mut profile: Profile) -> anyhow::Result<Profile> {
if !profile.function.is_empty() {
return Ok(profile); // already symbolized
}
// Collect function names.
let mut functions: HashMap<String, Function> = HashMap::new();
let mut strings: HashMap<String, i64> = profile
.string_table
.into_iter()
.enumerate()
.map(|(i, s)| (s, i as i64))
.collect();
// Helper to look up or register a string.
let mut string_id = |s: &str| -> i64 {
// Don't use .entry() to avoid unnecessary allocations.
if let Some(id) = strings.get(s) {
return *id;
}
let id = strings.len() as i64;
strings.insert(s.to_string(), id);
id
};
for loc in &mut profile.location {
if !loc.line.is_empty() {
continue;
}
// Resolve the line and function for each location.
backtrace::resolve(loc.address as *mut c_void, |symbol| {
let Some(symname) = symbol.name() else {
return;
};
let mut name = symname.to_string();
// Strip the Rust monomorphization suffix from the symbol name.
static SUFFIX_REGEX: Lazy<Regex> =
Lazy::new(|| Regex::new("::h[0-9a-f]{16}$").expect("invalid regex"));
if let Some(m) = SUFFIX_REGEX.find(&name) {
name.truncate(m.start());
}
let function_id = match functions.get(&name) {
Some(function) => function.id,
None => {
let id = functions.len() as u64 + 1;
let system_name = String::from_utf8_lossy(symname.as_bytes());
let filename = symbol
.filename()
.map(|path| path.to_string_lossy())
.unwrap_or(Cow::Borrowed(""));
let function = Function {
id,
name: string_id(&name),
system_name: string_id(&system_name),
filename: string_id(&filename),
..Default::default()
};
functions.insert(name, function);
id
}
};
loc.line.push(Line {
function_id,
line: symbol.lineno().unwrap_or(0) as i64,
..Default::default()
});
});
}
// Store the resolved functions, and mark the mapping as resolved.
profile.function = functions.into_values().sorted_by_key(|f| f.id).collect();
profile.string_table = strings
.into_iter()
.sorted_by_key(|(_, i)| *i)
.map(|(s, _)| s)
.collect();
for mapping in &mut profile.mapping {
mapping.has_functions = true;
mapping.has_filenames = true;
}
Ok(profile)
}
/// Strips locations (stack frames) matching the given mappings (substring) or function names
/// (regex). The function bool specifies whether child frames should be stripped as well.
///
/// The string definitions are left behind in the profile for simplicity, to avoid rewriting all
/// string references.
pub fn strip_locations(
mut profile: Profile,
mappings: &[&str],
functions: &[(Regex, bool)],
) -> Profile {
// Strip mappings.
let mut strip_mappings: HashSet<u64> = HashSet::new();
profile.mapping.retain(|mapping| {
let Some(name) = profile.string_table.get(mapping.filename as usize) else {
return true;
};
if mappings.iter().any(|substr| name.contains(substr)) {
strip_mappings.insert(mapping.id);
return false;
}
true
});
// Strip functions.
let mut strip_functions: HashMap<u64, bool> = HashMap::new();
profile.function.retain(|function| {
let Some(name) = profile.string_table.get(function.name as usize) else {
return true;
};
for (regex, strip_children) in functions {
if regex.is_match(name) {
strip_functions.insert(function.id, *strip_children);
return false;
}
}
true
});
// Strip locations. The bool specifies whether child frames should be stripped too.
let mut strip_locations: HashMap<u64, bool> = HashMap::new();
profile.location.retain(|location| {
for line in &location.line {
if let Some(strip_children) = strip_functions.get(&line.function_id) {
strip_locations.insert(location.id, *strip_children);
return false;
}
}
if strip_mappings.contains(&location.mapping_id) {
strip_locations.insert(location.id, false);
return false;
}
true
});
// Strip sample locations.
for sample in &mut profile.sample {
// First, find the uppermost function with child removal and truncate the stack.
if let Some(truncate) = sample
.location_id
.iter()
.rposition(|id| strip_locations.get(id) == Some(&true))
{
sample.location_id.drain(..=truncate);
}
// Next, strip any individual frames without child removal.
sample
.location_id
.retain(|id| !strip_locations.contains_key(id));
}
profile
}
/// Generates an SVG flamegraph from a symbolized pprof profile.
pub fn flamegraph(
profile: Profile,
opts: &mut inferno::flamegraph::Options,
) -> anyhow::Result<Vec<u8>> {
if profile.mapping.iter().any(|m| !m.has_functions) {
bail!("profile not symbolized");
}
// Index locations, functions, and strings.
let locations: HashMap<u64, Location> =
profile.location.into_iter().map(|l| (l.id, l)).collect();
let functions: HashMap<u64, Function> =
profile.function.into_iter().map(|f| (f.id, f)).collect();
let strings = profile.string_table;
// Resolve stacks as function names, and sum sample values per stack. Also reverse the stack,
// since inferno expects it bottom-up.
let mut stacks: HashMap<Vec<&str>, i64> = HashMap::new();
for sample in profile.sample {
let mut stack = Vec::with_capacity(sample.location_id.len());
for location in sample.location_id.into_iter().rev() {
let Some(location) = locations.get(&location) else {
bail!("missing location {location}");
};
for line in location.line.iter().rev() {
let Some(function) = functions.get(&line.function_id) else {
bail!("missing function {}", line.function_id);
};
let Some(name) = strings.get(function.name as usize) else {
bail!("missing string {}", function.name);
};
stack.push(name.as_str());
}
}
let Some(&value) = sample.value.first() else {
bail!("missing value");
};
*stacks.entry(stack).or_default() += value;
}
// Construct stack lines for inferno.
let lines = stacks
.into_iter()
.map(|(stack, value)| (stack.into_iter().join(";"), value))
.map(|(stack, value)| format!("{stack} {value}"))
.sorted()
.collect_vec();
// Construct the flamegraph.
let mut bytes = Vec::new();
let lines = lines.iter().map(|line| line.as_str());
inferno::flamegraph::from_lines(opts, lines, &mut bytes)?;
Ok(bytes)
}