refactor: use streaming in safekeeper /v1/debug_dump http response (#5731)

- Update the handler for `/v1/debug_dump` http response in safekeeper
- Update the `debug_dump::build()` to use the streaming in JSON build
process
This commit is contained in:
duguorong009
2023-11-05 05:16:54 -05:00
committed by GitHub
parent 306c4f9967
commit 09b5954526
7 changed files with 227 additions and 126 deletions

1
Cargo.lock generated
View File

@@ -4478,6 +4478,7 @@ dependencies = [
"tokio",
"tokio-io-timeout",
"tokio-postgres",
"tokio-stream",
"toml_edit",
"tracing",
"url",

View File

@@ -14,6 +14,11 @@ use tracing::{self, debug, info, info_span, warn, Instrument};
use std::future::Future;
use std::str::FromStr;
use bytes::{Bytes, BytesMut};
use std::io::Write as _;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"libmetrics_metric_handler_requests_total",
@@ -146,94 +151,89 @@ impl Drop for RequestCancelled {
}
}
/// An [`std::io::Write`] implementation on top of a channel sending [`bytes::Bytes`] chunks.
pub struct ChannelWriter {
buffer: BytesMut,
pub tx: mpsc::Sender<std::io::Result<Bytes>>,
written: usize,
}
impl ChannelWriter {
pub fn new(buf_len: usize, tx: mpsc::Sender<std::io::Result<Bytes>>) -> Self {
assert_ne!(buf_len, 0);
ChannelWriter {
// split about half off the buffer from the start, because we flush depending on
// capacity. first flush will come sooner than without this, but now resizes will
// have better chance of picking up the "other" half. not guaranteed of course.
buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2),
tx,
written: 0,
}
}
pub fn flush0(&mut self) -> std::io::Result<usize> {
let n = self.buffer.len();
if n == 0 {
return Ok(0);
}
tracing::trace!(n, "flushing");
let ready = self.buffer.split().freeze();
// not ideal to call from blocking code to block_on, but we are sure that this
// operation does not spawn_blocking other tasks
let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async {
self.tx.send(Ok(ready)).await.map_err(|_| ())?;
// throttle sending to allow reuse of our buffer in `write`.
self.tx.reserve().await.map_err(|_| ())?;
// now the response task has picked up the buffer and hopefully started
// sending it to the client.
Ok(())
});
if res.is_err() {
return Err(std::io::ErrorKind::BrokenPipe.into());
}
self.written += n;
Ok(n)
}
pub fn flushed_bytes(&self) -> usize {
self.written
}
}
impl std::io::Write for ChannelWriter {
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
let remaining = self.buffer.capacity() - self.buffer.len();
let out_of_space = remaining < buf.len();
let original_len = buf.len();
if out_of_space {
let can_still_fit = buf.len() - remaining;
self.buffer.extend_from_slice(&buf[..can_still_fit]);
buf = &buf[can_still_fit..];
self.flush0()?;
}
// assume that this will often under normal operation just move the pointer back to the
// beginning of allocation, because previous split off parts are already sent and
// dropped.
self.buffer.extend_from_slice(buf);
Ok(original_len)
}
fn flush(&mut self) -> std::io::Result<()> {
self.flush0().map(|_| ())
}
}
async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
use bytes::{Bytes, BytesMut};
use std::io::Write as _;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
SERVE_METRICS_COUNT.inc();
/// An [`std::io::Write`] implementation on top of a channel sending [`bytes::Bytes`] chunks.
struct ChannelWriter {
buffer: BytesMut,
tx: mpsc::Sender<std::io::Result<Bytes>>,
written: usize,
}
impl ChannelWriter {
fn new(buf_len: usize, tx: mpsc::Sender<std::io::Result<Bytes>>) -> Self {
assert_ne!(buf_len, 0);
ChannelWriter {
// split about half off the buffer from the start, because we flush depending on
// capacity. first flush will come sooner than without this, but now resizes will
// have better chance of picking up the "other" half. not guaranteed of course.
buffer: BytesMut::with_capacity(buf_len).split_off(buf_len / 2),
tx,
written: 0,
}
}
fn flush0(&mut self) -> std::io::Result<usize> {
let n = self.buffer.len();
if n == 0 {
return Ok(0);
}
tracing::trace!(n, "flushing");
let ready = self.buffer.split().freeze();
// not ideal to call from blocking code to block_on, but we are sure that this
// operation does not spawn_blocking other tasks
let res: Result<(), ()> = tokio::runtime::Handle::current().block_on(async {
self.tx.send(Ok(ready)).await.map_err(|_| ())?;
// throttle sending to allow reuse of our buffer in `write`.
self.tx.reserve().await.map_err(|_| ())?;
// now the response task has picked up the buffer and hopefully started
// sending it to the client.
Ok(())
});
if res.is_err() {
return Err(std::io::ErrorKind::BrokenPipe.into());
}
self.written += n;
Ok(n)
}
fn flushed_bytes(&self) -> usize {
self.written
}
}
impl std::io::Write for ChannelWriter {
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
let remaining = self.buffer.capacity() - self.buffer.len();
let out_of_space = remaining < buf.len();
let original_len = buf.len();
if out_of_space {
let can_still_fit = buf.len() - remaining;
self.buffer.extend_from_slice(&buf[..can_still_fit]);
buf = &buf[can_still_fit..];
self.flush0()?;
}
// assume that this will often under normal operation just move the pointer back to the
// beginning of allocation, because previous split off parts are already sent and
// dropped.
self.buffer.extend_from_slice(buf);
Ok(original_len)
}
fn flush(&mut self) -> std::io::Result<()> {
self.flush0().map(|_| ())
}
}
let started_at = std::time::Instant::now();
let (tx, rx) = mpsc::channel(1);

View File

@@ -47,6 +47,7 @@ pq_proto.workspace = true
remote_storage.workspace = true
safekeeper_api.workspace = true
storage_broker.workspace = true
tokio-stream.workspace = true
utils.workspace = true
workspace_hack.workspace = true

View File

@@ -5,6 +5,7 @@ use std::fs::DirEntry;
use std::io::BufReader;
use std::io::Read;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Result;
use camino::Utf8Path;
@@ -28,7 +29,7 @@ use crate::send_wal::WalSenderState;
use crate::GlobalTimelines;
/// Various filters that influence the resulting JSON output.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Args {
/// Dump all available safekeeper state. False by default.
pub dump_all: bool,
@@ -53,15 +54,76 @@ pub struct Args {
}
/// Response for debug dump request.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize)]
pub struct Response {
pub start_time: DateTime<Utc>,
pub finish_time: DateTime<Utc>,
pub timelines: Vec<Timeline>,
pub timelines: Vec<TimelineDumpSer>,
pub timelines_count: usize,
pub config: Config,
}
pub struct TimelineDumpSer {
pub tli: Arc<crate::timeline::Timeline>,
pub args: Args,
pub runtime: Arc<tokio::runtime::Runtime>,
}
impl std::fmt::Debug for TimelineDumpSer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TimelineDumpSer")
.field("tli", &self.tli.ttid)
.field("args", &self.args)
.finish()
}
}
impl Serialize for TimelineDumpSer {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let dump = self
.runtime
.block_on(build_from_tli_dump(self.tli.clone(), self.args.clone()));
dump.serialize(serializer)
}
}
async fn build_from_tli_dump(timeline: Arc<crate::timeline::Timeline>, args: Args) -> Timeline {
let control_file = if args.dump_control_file {
let mut state = timeline.get_state().await.1;
if !args.dump_term_history {
state.acceptor_state.term_history = TermHistory(vec![]);
}
Some(state)
} else {
None
};
let memory = if args.dump_memory {
Some(timeline.memory_dump().await)
} else {
None
};
let disk_content = if args.dump_disk_content {
// build_disk_content can fail, but we don't want to fail the whole
// request because of that.
build_disk_content(&timeline.timeline_dir).ok()
} else {
None
};
Timeline {
tenant_id: timeline.ttid.tenant_id,
timeline_id: timeline.ttid.timeline_id,
control_file,
memory,
disk_content,
}
}
/// Safekeeper configuration.
#[derive(Debug, Serialize, Deserialize)]
pub struct Config {
@@ -140,8 +202,12 @@ pub async fn build(args: Args) -> Result<Response> {
GlobalTimelines::get_all()
};
// TODO: return Stream instead of Vec
let mut timelines = Vec::new();
let runtime = Arc::new(
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap(),
);
for tli in ptrs_snapshot {
let ttid = tli.ttid;
if let Some(tenant_id) = args.tenant_id {
@@ -155,38 +221,11 @@ pub async fn build(args: Args) -> Result<Response> {
}
}
let control_file = if args.dump_control_file {
let mut state = tli.get_state().await.1;
if !args.dump_term_history {
state.acceptor_state.term_history = TermHistory(vec![]);
}
Some(state)
} else {
None
};
let memory = if args.dump_memory {
Some(tli.memory_dump().await)
} else {
None
};
let disk_content = if args.dump_disk_content {
// build_disk_content can fail, but we don't want to fail the whole
// request because of that.
build_disk_content(&tli.timeline_dir).ok()
} else {
None
};
let timeline = Timeline {
tenant_id: ttid.tenant_id,
timeline_id: ttid.timeline_id,
control_file,
memory,
disk_content,
};
timelines.push(timeline);
timelines.push(TimelineDumpSer {
tli,
args: args.clone(),
runtime: runtime.clone(),
});
}
let config = GlobalTimelines::get_global_config();

View File

@@ -13,7 +13,12 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use utils::http::endpoint::request_span;
use std::io::Write as _;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tracing::info_span;
use utils::http::endpoint::{request_span, ChannelWriter};
use crate::receive_wal::WalReceiverState;
use crate::safekeeper::Term;
@@ -373,8 +378,52 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
.await
.map_err(ApiError::InternalServerError)?;
// TODO: use streaming response
json_response(StatusCode::OK, resp)
let started_at = std::time::Instant::now();
let (tx, rx) = mpsc::channel(1);
let body = Body::wrap_stream(ReceiverStream::new(rx));
let mut writer = ChannelWriter::new(128 * 1024, tx);
let response = Response::builder()
.status(200)
.header(hyper::header::CONTENT_TYPE, "application/octet-stream")
.body(body)
.unwrap();
let span = info_span!("blocking");
tokio::task::spawn_blocking(move || {
let _span = span.entered();
let res = serde_json::to_writer(&mut writer, &resp)
.map_err(std::io::Error::from)
.and_then(|_| writer.flush());
match res {
Ok(()) => {
tracing::info!(
bytes = writer.flushed_bytes(),
elapsed_ms = started_at.elapsed().as_millis(),
"responded /v1/debug_dump"
);
}
Err(e) => {
tracing::warn!("failed to write out /v1/debug_dump response: {e:#}");
// semantics of this error are quite... unclear. we want to error the stream out to
// abort the response to somehow notify the client that we failed.
//
// though, most likely the reason for failure is that the receiver is already gone.
drop(
writer
.tx
.blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
);
}
}
});
Ok(response)
}
/// Safekeeper http router.

View File

@@ -1,3 +1,4 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use anyhow::{bail, Context, Result};
@@ -32,6 +33,16 @@ pub struct Response {
// TODO: add more fields?
}
/// Response for debug dump request.
#[derive(Debug, Serialize, Deserialize)]
pub struct DebugDumpResponse {
pub start_time: DateTime<Utc>,
pub finish_time: DateTime<Utc>,
pub timelines: Vec<debug_dump::Timeline>,
pub timelines_count: usize,
pub config: debug_dump::Config,
}
/// Find the most advanced safekeeper and pull timeline from it.
pub async fn handle_request(request: Request) -> Result<Response> {
let existing_tli = GlobalTimelines::get(TenantTimelineId::new(
@@ -103,7 +114,7 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
// Implementing our own scp over HTTP.
// At first, we need to fetch list of files from safekeeper.
let dump: debug_dump::Response = client
let dump: DebugDumpResponse = client
.get(format!(
"{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}",
host, status.tenant_id, status.timeline_id

View File

@@ -2868,7 +2868,7 @@ class SafekeeperHttpClient(requests.Session):
params = params or {}
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)
res.raise_for_status()
res_json = res.json()
res_json = json.loads(res.text)
assert isinstance(res_json, dict)
return res_json