diff --git a/Cargo.lock b/Cargo.lock index 3e9a7198a9..2203ad462c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4478,6 +4478,7 @@ dependencies = [ "tokio", "tokio-io-timeout", "tokio-postgres", + "tokio-stream", "toml_edit", "tracing", "url", diff --git a/libs/utils/src/http/endpoint.rs b/libs/utils/src/http/endpoint.rs index f3f5e95d0b..a9ee2425a4 100644 --- a/libs/utils/src/http/endpoint.rs +++ b/libs/utils/src/http/endpoint.rs @@ -14,6 +14,11 @@ use tracing::{self, debug, info, info_span, warn, Instrument}; use std::future::Future; use std::str::FromStr; +use bytes::{Bytes, BytesMut}; +use std::io::Write as _; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; + static SERVE_METRICS_COUNT: Lazy = Lazy::new(|| { register_int_counter!( "libmetrics_metric_handler_requests_total", @@ -146,94 +151,89 @@ impl Drop for RequestCancelled { } } +/// An [`std::io::Write`] implementation on top of a channel sending [`bytes::Bytes`] chunks. +pub struct ChannelWriter { + buffer: BytesMut, + pub tx: mpsc::Sender>, + written: usize, +} + +impl ChannelWriter { + pub fn new(buf_len: usize, tx: mpsc::Sender>) -> Self { + assert_ne!(buf_len, 0); + ChannelWriter { + // split about half off the buffer from the start, because we flush depending on + // capacity. first flush will come sooner than without this, but now resizes will + // have better chance of picking up the "other" half. not guaranteed of course. + buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2), + tx, + written: 0, + } + } + + pub fn flush0(&mut self) -> std::io::Result { + let n = self.buffer.len(); + if n == 0 { + return Ok(0); + } + + tracing::trace!(n, "flushing"); + let ready = self.buffer.split().freeze(); + + // not ideal to call from blocking code to block_on, but we are sure that this + // operation does not spawn_blocking other tasks + let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async { + self.tx.send(Ok(ready)).await.map_err(|_| ())?; + + // throttle sending to allow reuse of our buffer in `write`. + self.tx.reserve().await.map_err(|_| ())?; + + // now the response task has picked up the buffer and hopefully started + // sending it to the client. + Ok(()) + }); + if res.is_err() { + return Err(std::io::ErrorKind::BrokenPipe.into()); + } + self.written += n; + Ok(n) + } + + pub fn flushed_bytes(&self) -> usize { + self.written + } +} + +impl std::io::Write for ChannelWriter { + fn write(&mut self, mut buf: &[u8]) -> std::io::Result { + let remaining = self.buffer.capacity() - self.buffer.len(); + + let out_of_space = remaining < buf.len(); + + let original_len = buf.len(); + + if out_of_space { + let can_still_fit = buf.len() - remaining; + self.buffer.extend_from_slice(&buf[..can_still_fit]); + buf = &buf[can_still_fit..]; + self.flush0()?; + } + + // assume that this will often under normal operation just move the pointer back to the + // beginning of allocation, because previous split off parts are already sent and + // dropped. + self.buffer.extend_from_slice(buf); + Ok(original_len) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.flush0().map(|_| ()) + } +} + async fn prometheus_metrics_handler(_req: Request) -> Result, ApiError> { - use bytes::{Bytes, BytesMut}; - use std::io::Write as _; - use tokio::sync::mpsc; - use tokio_stream::wrappers::ReceiverStream; - SERVE_METRICS_COUNT.inc(); - /// An [`std::io::Write`] implementation on top of a channel sending [`bytes::Bytes`] chunks. - struct ChannelWriter { - buffer: BytesMut, - tx: mpsc::Sender>, - written: usize, - } - - impl ChannelWriter { - fn new(buf_len: usize, tx: mpsc::Sender>) -> Self { - assert_ne!(buf_len, 0); - ChannelWriter { - // split about half off the buffer from the start, because we flush depending on - // capacity. first flush will come sooner than without this, but now resizes will - // have better chance of picking up the "other" half. not guaranteed of course. - buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2), - tx, - written: 0, - } - } - - fn flush0(&mut self) -> std::io::Result { - let n = self.buffer.len(); - if n == 0 { - return Ok(0); - } - - tracing::trace!(n, "flushing"); - let ready = self.buffer.split().freeze(); - - // not ideal to call from blocking code to block_on, but we are sure that this - // operation does not spawn_blocking other tasks - let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async { - self.tx.send(Ok(ready)).await.map_err(|_| ())?; - - // throttle sending to allow reuse of our buffer in `write`. - self.tx.reserve().await.map_err(|_| ())?; - - // now the response task has picked up the buffer and hopefully started - // sending it to the client. - Ok(()) - }); - if res.is_err() { - return Err(std::io::ErrorKind::BrokenPipe.into()); - } - self.written += n; - Ok(n) - } - - fn flushed_bytes(&self) -> usize { - self.written - } - } - - impl std::io::Write for ChannelWriter { - fn write(&mut self, mut buf: &[u8]) -> std::io::Result { - let remaining = self.buffer.capacity() - self.buffer.len(); - - let out_of_space = remaining < buf.len(); - - let original_len = buf.len(); - - if out_of_space { - let can_still_fit = buf.len() - remaining; - self.buffer.extend_from_slice(&buf[..can_still_fit]); - buf = &buf[can_still_fit..]; - self.flush0()?; - } - - // assume that this will often under normal operation just move the pointer back to the - // beginning of allocation, because previous split off parts are already sent and - // dropped. - self.buffer.extend_from_slice(buf); - Ok(original_len) - } - - fn flush(&mut self) -> std::io::Result<()> { - self.flush0().map(|_| ()) - } - } - let started_at = std::time::Instant::now(); let (tx, rx) = mpsc::channel(1); diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 64ef9f6997..30516c0763 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -47,6 +47,7 @@ pq_proto.workspace = true remote_storage.workspace = true safekeeper_api.workspace = true storage_broker.workspace = true +tokio-stream.workspace = true utils.workspace = true workspace_hack.workspace = true diff --git a/safekeeper/src/debug_dump.rs b/safekeeper/src/debug_dump.rs index ee9d7118c6..8cbc0aa47f 100644 --- a/safekeeper/src/debug_dump.rs +++ b/safekeeper/src/debug_dump.rs @@ -5,6 +5,7 @@ use std::fs::DirEntry; use std::io::BufReader; use std::io::Read; use std::path::PathBuf; +use std::sync::Arc; use anyhow::Result; use camino::Utf8Path; @@ -28,7 +29,7 @@ use crate::send_wal::WalSenderState; use crate::GlobalTimelines; /// Various filters that influence the resulting JSON output. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct Args { /// Dump all available safekeeper state. False by default. pub dump_all: bool, @@ -53,15 +54,76 @@ pub struct Args { } /// Response for debug dump request. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize)] pub struct Response { pub start_time: DateTime, pub finish_time: DateTime, - pub timelines: Vec, + pub timelines: Vec, pub timelines_count: usize, pub config: Config, } +pub struct TimelineDumpSer { + pub tli: Arc, + pub args: Args, + pub runtime: Arc, +} + +impl std::fmt::Debug for TimelineDumpSer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TimelineDumpSer") + .field("tli", &self.tli.ttid) + .field("args", &self.args) + .finish() + } +} + +impl Serialize for TimelineDumpSer { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + let dump = self + .runtime + .block_on(build_from_tli_dump(self.tli.clone(), self.args.clone())); + dump.serialize(serializer) + } +} + +async fn build_from_tli_dump(timeline: Arc, args: Args) -> Timeline { + let control_file = if args.dump_control_file { + let mut state = timeline.get_state().await.1; + if !args.dump_term_history { + state.acceptor_state.term_history = TermHistory(vec![]); + } + Some(state) + } else { + None + }; + + let memory = if args.dump_memory { + Some(timeline.memory_dump().await) + } else { + None + }; + + let disk_content = if args.dump_disk_content { + // build_disk_content can fail, but we don't want to fail the whole + // request because of that. + build_disk_content(&timeline.timeline_dir).ok() + } else { + None + }; + + Timeline { + tenant_id: timeline.ttid.tenant_id, + timeline_id: timeline.ttid.timeline_id, + control_file, + memory, + disk_content, + } +} + /// Safekeeper configuration. #[derive(Debug, Serialize, Deserialize)] pub struct Config { @@ -140,8 +202,12 @@ pub async fn build(args: Args) -> Result { GlobalTimelines::get_all() }; - // TODO: return Stream instead of Vec let mut timelines = Vec::new(); + let runtime = Arc::new( + tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(), + ); for tli in ptrs_snapshot { let ttid = tli.ttid; if let Some(tenant_id) = args.tenant_id { @@ -155,38 +221,11 @@ pub async fn build(args: Args) -> Result { } } - let control_file = if args.dump_control_file { - let mut state = tli.get_state().await.1; - if !args.dump_term_history { - state.acceptor_state.term_history = TermHistory(vec![]); - } - Some(state) - } else { - None - }; - - let memory = if args.dump_memory { - Some(tli.memory_dump().await) - } else { - None - }; - - let disk_content = if args.dump_disk_content { - // build_disk_content can fail, but we don't want to fail the whole - // request because of that. - build_disk_content(&tli.timeline_dir).ok() - } else { - None - }; - - let timeline = Timeline { - tenant_id: ttid.tenant_id, - timeline_id: ttid.timeline_id, - control_file, - memory, - disk_content, - }; - timelines.push(timeline); + timelines.push(TimelineDumpSer { + tli, + args: args.clone(), + runtime: runtime.clone(), + }); } let config = GlobalTimelines::get_global_config(); diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 940ac82df6..474d636441 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -13,7 +13,12 @@ use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use tokio::fs::File; use tokio::io::AsyncReadExt; -use utils::http::endpoint::request_span; + +use std::io::Write as _; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tracing::info_span; +use utils::http::endpoint::{request_span, ChannelWriter}; use crate::receive_wal::WalReceiverState; use crate::safekeeper::Term; @@ -373,8 +378,52 @@ async fn dump_debug_handler(mut request: Request) -> Result .await .map_err(ApiError::InternalServerError)?; - // TODO: use streaming response - json_response(StatusCode::OK, resp) + let started_at = std::time::Instant::now(); + + let (tx, rx) = mpsc::channel(1); + + let body = Body::wrap_stream(ReceiverStream::new(rx)); + + let mut writer = ChannelWriter::new(128 * 1024, tx); + + let response = Response::builder() + .status(200) + .header(hyper::header::CONTENT_TYPE, "application/octet-stream") + .body(body) + .unwrap(); + + let span = info_span!("blocking"); + tokio::task::spawn_blocking(move || { + let _span = span.entered(); + + let res = serde_json::to_writer(&mut writer, &resp) + .map_err(std::io::Error::from) + .and_then(|_| writer.flush()); + + match res { + Ok(()) => { + tracing::info!( + bytes = writer.flushed_bytes(), + elapsed_ms = started_at.elapsed().as_millis(), + "responded /v1/debug_dump" + ); + } + Err(e) => { + tracing::warn!("failed to write out /v1/debug_dump response: {e:#}"); + // semantics of this error are quite... unclear. we want to error the stream out to + // abort the response to somehow notify the client that we failed. + // + // though, most likely the reason for failure is that the receiver is already gone. + drop( + writer + .tx + .blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())), + ); + } + } + }); + + Ok(response) } /// Safekeeper http router. diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 1343bba5cc..e2f1b9fcff 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -1,3 +1,4 @@ +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use anyhow::{bail, Context, Result}; @@ -32,6 +33,16 @@ pub struct Response { // TODO: add more fields? } +/// Response for debug dump request. +#[derive(Debug, Serialize, Deserialize)] +pub struct DebugDumpResponse { + pub start_time: DateTime, + pub finish_time: DateTime, + pub timelines: Vec, + pub timelines_count: usize, + pub config: debug_dump::Config, +} + /// Find the most advanced safekeeper and pull timeline from it. pub async fn handle_request(request: Request) -> Result { let existing_tli = GlobalTimelines::get(TenantTimelineId::new( @@ -103,7 +114,7 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result // Implementing our own scp over HTTP. // At first, we need to fetch list of files from safekeeper. - let dump: debug_dump::Response = client + let dump: DebugDumpResponse = client .get(format!( "{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}", host, status.tenant_id, status.timeline_id diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 02faf715da..740c05dcea 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2868,7 +2868,7 @@ class SafekeeperHttpClient(requests.Session): params = params or {} res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params) res.raise_for_status() - res_json = res.json() + res_json = json.loads(res.text) assert isinstance(res_json, dict) return res_json