Create /v1/debug_dump safekeepers endpoint (#3710)

Add HTTP endpoint to get full safekeeper state of all existing timelines
(all in-memory values and info about all files stored on disk).

Example:
https://gist.github.com/petuhovskiy/3cbb8f870401e9f486731d145161c286
This commit is contained in:
Arthur Petukhovsky
2023-03-03 14:01:05 +03:00
committed by GitHub
parent 5e514b8465
commit b23742e09c
12 changed files with 468 additions and 48 deletions

1
Cargo.lock generated
View File

@@ -3307,6 +3307,7 @@ dependencies = [
"async-trait",
"byteorder",
"bytes",
"chrono",
"clap 4.1.4",
"const_format",
"crc32c",

View File

@@ -1,7 +1,9 @@
use std::fmt::Display;
use anyhow::Context;
use bytes::Buf;
use hyper::{header, Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Serialize, Serializer};
use super::error::ApiError;
@@ -31,3 +33,12 @@ pub fn json_response<T: Serialize>(
.map_err(|e| ApiError::InternalServerError(e.into()))?;
Ok(response)
}
/// Serialize through Display trait.
pub fn display_serialize<S, F>(z: &F, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
F: Display,
{
s.serialize_str(&format!("{}", z))
}

View File

@@ -10,6 +10,7 @@ anyhow.workspace = true
async-trait.workspace = true
byteorder.workspace = true
bytes.workspace = true
chrono.workspace = true
clap = { workspace = true, features = ["derive"] }
const_format.workspace = true
crc32c.workspace = true

View File

@@ -0,0 +1,264 @@
//! Utils for dumping full state of the safekeeper.
use std::fs;
use std::fs::DirEntry;
use std::io::BufReader;
use std::io::Read;
use std::path::PathBuf;
use anyhow::Result;
use chrono::{DateTime, Utc};
use postgres_ffi::XLogSegNo;
use serde::Serialize;
use utils::http::json::display_serialize;
use utils::id::NodeId;
use utils::id::TenantTimelineId;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::safekeeper::SafeKeeperState;
use crate::safekeeper::SafekeeperMemState;
use crate::safekeeper::TermHistory;
use crate::SafeKeeperConf;
use crate::timeline::ReplicaState;
use crate::GlobalTimelines;
/// Various filters that influence the resulting JSON output.
#[derive(Debug, Serialize)]
pub struct Args {
/// Dump all available safekeeper state. False by default.
pub dump_all: bool,
/// Dump control_file content. Uses value of `dump_all` by default.
pub dump_control_file: bool,
/// Dump in-memory state. Uses value of `dump_all` by default.
pub dump_memory: bool,
/// Dump all disk files in a timeline directory. Uses value of `dump_all` by default.
pub dump_disk_content: bool,
/// Dump full term history. True by default.
pub dump_term_history: bool,
/// Filter timelines by tenant_id.
pub tenant_id: Option<TenantId>,
/// Filter timelines by timeline_id.
pub timeline_id: Option<TimelineId>,
}
/// Response for debug dump request.
#[derive(Debug, Serialize)]
pub struct Response {
pub start_time: DateTime<Utc>,
pub finish_time: DateTime<Utc>,
pub timelines: Vec<Timeline>,
pub timelines_count: usize,
pub config: Config,
}
/// Safekeeper configuration.
#[derive(Debug, Serialize)]
pub struct Config {
pub id: NodeId,
pub workdir: PathBuf,
pub listen_pg_addr: String,
pub listen_http_addr: String,
pub no_sync: bool,
pub max_offloader_lag_bytes: u64,
pub wal_backup_enabled: bool,
}
#[derive(Debug, Serialize)]
pub struct Timeline {
#[serde(serialize_with = "display_serialize")]
pub tenant_id: TenantId,
#[serde(serialize_with = "display_serialize")]
pub timeline_id: TimelineId,
pub control_file: Option<SafeKeeperState>,
pub memory: Option<Memory>,
pub disk_content: Option<DiskContent>,
}
#[derive(Debug, Serialize)]
pub struct Memory {
pub is_cancelled: bool,
pub peers_info_len: usize,
pub replicas: Vec<Option<ReplicaState>>,
pub wal_backup_active: bool,
pub active: bool,
pub num_computes: u32,
pub last_removed_segno: XLogSegNo,
pub epoch_start_lsn: Lsn,
pub mem_state: SafekeeperMemState,
// PhysicalStorage state.
pub write_lsn: Lsn,
pub write_record_lsn: Lsn,
pub flush_lsn: Lsn,
pub file_open: bool,
}
#[derive(Debug, Serialize)]
pub struct DiskContent {
pub files: Vec<FileInfo>,
}
#[derive(Debug, Serialize)]
pub struct FileInfo {
pub name: String,
pub size: u64,
pub created: DateTime<Utc>,
pub modified: DateTime<Utc>,
pub start_zeroes: u64,
pub end_zeroes: u64,
// TODO: add sha256 checksum
}
/// Build debug dump response, using the provided [`Args`] filters.
pub fn build(args: Args) -> Result<Response> {
let start_time = Utc::now();
let timelines_count = GlobalTimelines::timelines_count();
let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() {
// If both tenant_id and timeline_id are specified, we can just get the
// timeline directly, without taking a snapshot of the whole list.
let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap());
if let Ok(tli) = GlobalTimelines::get(ttid) {
vec![tli]
} else {
vec![]
}
} else {
// Otherwise, take a snapshot of the whole list.
GlobalTimelines::get_all()
};
// TODO: return Stream instead of Vec
let mut timelines = Vec::new();
for tli in ptrs_snapshot {
let ttid = tli.ttid;
if let Some(tenant_id) = args.tenant_id {
if tenant_id != ttid.tenant_id {
continue;
}
}
if let Some(timeline_id) = args.timeline_id {
if timeline_id != ttid.timeline_id {
continue;
}
}
let control_file = if args.dump_control_file {
let mut state = tli.get_state().1;
if !args.dump_term_history {
state.acceptor_state.term_history = TermHistory(vec![]);
}
Some(state)
} else {
None
};
let memory = if args.dump_memory {
Some(tli.memory_dump())
} else {
None
};
let disk_content = if args.dump_disk_content {
// build_disk_content can fail, but we don't want to fail the whole
// request because of that.
build_disk_content(&tli.timeline_dir).ok()
} else {
None
};
let timeline = Timeline {
tenant_id: ttid.tenant_id,
timeline_id: ttid.timeline_id,
control_file,
memory,
disk_content,
};
timelines.push(timeline);
}
let config = GlobalTimelines::get_global_config();
Ok(Response {
start_time,
finish_time: Utc::now(),
timelines,
timelines_count,
config: build_config(config),
})
}
/// Builds DiskContent from a directory path. It can fail if the directory
/// is deleted between the time we get the path and the time we try to open it.
fn build_disk_content(path: &std::path::Path) -> Result<DiskContent> {
let mut files = Vec::new();
for entry in fs::read_dir(path)? {
if entry.is_err() {
continue;
}
let file = build_file_info(entry?);
if file.is_err() {
continue;
}
files.push(file?);
}
Ok(DiskContent { files })
}
/// Builds FileInfo from DirEntry. Sometimes it can return an error
/// if the file is deleted between the time we get the DirEntry
/// and the time we try to open it.
fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
let metadata = entry.metadata()?;
let path = entry.path();
let name = path
.file_name()
.and_then(|x| x.to_str())
.unwrap_or("")
.to_owned();
let mut file = fs::File::open(path)?;
let mut reader = BufReader::new(&mut file).bytes().filter_map(|x| x.ok());
let start_zeroes = reader.by_ref().take_while(|&x| x == 0).count() as u64;
let mut end_zeroes = 0;
for b in reader {
if b == 0 {
end_zeroes += 1;
} else {
end_zeroes = 0;
}
}
Ok(FileInfo {
name,
size: metadata.len(),
created: DateTime::from(metadata.created()?),
modified: DateTime::from(metadata.modified()?),
start_zeroes,
end_zeroes,
})
}
/// Converts SafeKeeperConf to Config, filtering out the fields that are not
/// supposed to be exposed.
fn build_config(config: SafeKeeperConf) -> Config {
Config {
id: config.my_id,
workdir: config.workdir,
listen_pg_addr: config.listen_pg_addr,
listen_http_addr: config.listen_http_addr,
no_sync: config.no_sync,
max_offloader_lag_bytes: config.max_offloader_lag_bytes,
wal_backup_enabled: config.wal_backup_enabled,
}
}

View File

@@ -5,14 +5,16 @@ use once_cell::sync::Lazy;
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::SkTimelineInfo;
use serde::Serialize;
use serde::Serializer;
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use tokio::task::JoinError;
use utils::http::json::display_serialize;
use crate::debug_dump;
use crate::safekeeper::ServerInfo;
use crate::safekeeper::Term;
@@ -54,15 +56,6 @@ fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
.as_ref()
}
/// Serialize through Display trait.
fn display_serialize<S, F>(z: &F, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
F: Display,
{
s.serialize_str(&format!("{}", z))
}
/// Same as TermSwitchEntry, but serializes LSN using display serializer
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
#[derive(Debug, Serialize)]
@@ -276,6 +269,69 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
json_response(StatusCode::OK, ())
}
fn parse_kv_str<E: fmt::Display, T: FromStr<Err = E>>(k: &str, v: &str) -> Result<T, ApiError> {
v.parse()
.map_err(|e| ApiError::BadRequest(anyhow::anyhow!("cannot parse {k}: {e}")))
}
/// Dump debug info about all available safekeeper state.
async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
ensure_no_body(&mut request).await?;
let mut dump_all: Option<bool> = None;
let mut dump_control_file: Option<bool> = None;
let mut dump_memory: Option<bool> = None;
let mut dump_disk_content: Option<bool> = None;
let mut dump_term_history: Option<bool> = None;
let mut tenant_id: Option<TenantId> = None;
let mut timeline_id: Option<TimelineId> = None;
let query = request.uri().query().unwrap_or("");
let mut values = url::form_urlencoded::parse(query.as_bytes());
for (k, v) in &mut values {
match k.as_ref() {
"dump_all" => dump_all = Some(parse_kv_str(&k, &v)?),
"dump_control_file" => dump_control_file = Some(parse_kv_str(&k, &v)?),
"dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
"dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
"dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
"tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
"timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
_ => Err(ApiError::BadRequest(anyhow::anyhow!(
"Unknown query parameter: {}",
k
)))?,
}
}
let dump_all = dump_all.unwrap_or(false);
let dump_control_file = dump_control_file.unwrap_or(dump_all);
let dump_memory = dump_memory.unwrap_or(dump_all);
let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
let dump_term_history = dump_term_history.unwrap_or(true);
let args = debug_dump::Args {
dump_all,
dump_control_file,
dump_memory,
dump_disk_content,
dump_term_history,
tenant_id,
timeline_id,
};
let resp = tokio::task::spawn_blocking(move || {
debug_dump::build(args).map_err(ApiError::InternalServerError)
})
.await
.map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??;
// TODO: use streaming response
json_response(StatusCode::OK, resp)
}
/// Safekeeper http router.
pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
let mut router = endpoint::make_router();
@@ -316,6 +372,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
"/v1/record_safekeeper_info/:tenant_id/:timeline_id",
record_safekeeper_info,
)
.get("/v1/debug_dump", dump_debug_handler)
}
#[cfg(test)]

View File

@@ -10,6 +10,7 @@ mod auth;
pub mod broker;
pub mod control_file;
pub mod control_file_upgrade;
pub mod debug_dump;
pub mod handler;
pub mod http;
pub mod json_ctrl;

View File

@@ -204,7 +204,7 @@ pub struct SafeKeeperState {
pub peers: PersistedPeers,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
// In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values
// are not flushed yet.
pub struct SafekeeperMemState {
@@ -212,6 +212,7 @@ pub struct SafekeeperMemState {
pub backup_lsn: Lsn,
pub peer_horizon_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
#[serde(with = "hex")]
pub proposer_uuid: PgUuid,
}

View File

@@ -5,6 +5,7 @@ use anyhow::{bail, Result};
use parking_lot::{Mutex, MutexGuard};
use postgres_ffi::XLogSegNo;
use pq_proto::ReplicationFeedback;
use serde::Serialize;
use std::cmp::{max, min};
use std::path::PathBuf;
use tokio::{
@@ -28,9 +29,9 @@ use crate::send_wal::HotStandbyFeedback;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
use crate::metrics::FullTimelineInfo;
use crate::wal_storage;
use crate::wal_storage::Storage as wal_storage_iface;
use crate::SafeKeeperConf;
use crate::{debug_dump, wal_storage};
/// Things safekeeper should know about timeline state on peers.
#[derive(Debug, Clone)]
@@ -80,7 +81,7 @@ impl PeersInfo {
}
/// Replica status update + hot standby feedback
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, Serialize)]
pub struct ReplicaState {
/// last known lsn received by replica
pub last_received_lsn: Lsn, // None means we don't know
@@ -381,7 +382,7 @@ pub struct Timeline {
cancellation_rx: watch::Receiver<bool>,
/// Directory where timeline state is stored.
timeline_dir: PathBuf,
pub timeline_dir: PathBuf,
}
impl Timeline {
@@ -588,38 +589,6 @@ impl Timeline {
self.write_shared_state().wal_backup_attend()
}
/// Returns full timeline info, required for the metrics. If the timeline is
/// not active, returns None instead.
pub fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
if self.is_cancelled() {
return None;
}
let state = self.write_shared_state();
if state.active {
Some(FullTimelineInfo {
ttid: self.ttid,
replicas: state
.replicas
.iter()
.filter_map(|r| r.as_ref())
.copied()
.collect(),
wal_backup_active: state.wal_backup_active,
timeline_is_active: state.active,
num_computes: state.num_computes,
last_removed_segno: state.last_removed_segno,
epoch_start_lsn: state.sk.epoch_start_lsn,
mem_state: state.sk.inmem.clone(),
persisted_state: state.sk.state.clone(),
flush_lsn: state.sk.wal_store.flush_lsn(),
wal_storage: state.sk.wal_store.get_metrics(),
})
} else {
None
}
}
/// Returns commit_lsn watch channel.
pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
self.commit_lsn_watch_rx.clone()
@@ -784,6 +753,62 @@ impl Timeline {
shared_state.last_removed_segno = horizon_segno;
Ok(())
}
/// Returns full timeline info, required for the metrics. If the timeline is
/// not active, returns None instead.
pub fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
if self.is_cancelled() {
return None;
}
let state = self.write_shared_state();
if state.active {
Some(FullTimelineInfo {
ttid: self.ttid,
replicas: state
.replicas
.iter()
.filter_map(|r| r.as_ref())
.copied()
.collect(),
wal_backup_active: state.wal_backup_active,
timeline_is_active: state.active,
num_computes: state.num_computes,
last_removed_segno: state.last_removed_segno,
epoch_start_lsn: state.sk.epoch_start_lsn,
mem_state: state.sk.inmem.clone(),
persisted_state: state.sk.state.clone(),
flush_lsn: state.sk.wal_store.flush_lsn(),
wal_storage: state.sk.wal_store.get_metrics(),
})
} else {
None
}
}
/// Returns in-memory timeline state to build a full debug dump.
pub fn memory_dump(&self) -> debug_dump::Memory {
let state = self.write_shared_state();
let (write_lsn, write_record_lsn, flush_lsn, file_open) =
state.sk.wal_store.internal_state();
debug_dump::Memory {
is_cancelled: self.is_cancelled(),
peers_info_len: state.peers_info.0.len(),
replicas: state.replicas.clone(),
wal_backup_active: state.wal_backup_active,
active: state.active,
num_computes: state.num_computes,
last_removed_segno: state.last_removed_segno,
epoch_start_lsn: state.sk.epoch_start_lsn,
mem_state: state.sk.inmem.clone(),
write_lsn,
write_record_lsn,
flush_lsn,
file_open,
}
}
}
/// Deletes directory and it's contents. Returns false if directory does not exist.

View File

@@ -159,6 +159,16 @@ impl GlobalTimelines {
Ok(())
}
/// Get the number of timelines in the map.
pub fn timelines_count() -> usize {
TIMELINES_STATE.lock().unwrap().timelines.len()
}
/// Get the global safekeeper config.
pub fn get_global_config() -> SafeKeeperConf {
TIMELINES_STATE.lock().unwrap().get_conf().clone()
}
/// Create a new timeline with the given id. If the timeline already exists, returns
/// an existing timeline.
pub fn create(

View File

@@ -165,6 +165,16 @@ impl PhysicalStorage {
})
}
/// Get all known state of the storage.
pub fn internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
(
self.write_lsn,
self.write_record_lsn,
self.flush_record_lsn,
self.file.is_some(),
)
}
/// Call fdatasync if config requires so.
fn fdatasync_file(&mut self, file: &mut File) -> Result<()> {
if !self.conf.no_sync {

View File

@@ -2988,6 +2988,13 @@ class SafekeeperHttpClient(requests.Session):
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def debug_dump(self, params: Dict[str, str] = {}) -> Dict[str, Any]:
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)
res.raise_for_status()
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def timeline_create(
self, tenant_id: TenantId, timeline_id: TimelineId, pg_version: int, commit_lsn: Lsn
):

View File

@@ -775,6 +775,9 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
if not auth_enabled:
wa_http_cli = wa.http_client()
wa_http_cli.check_status()
wa_http_cli_debug = wa.http_client()
wa_http_cli_debug.check_status()
else:
wa_http_cli = wa.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id))
wa_http_cli.check_status()
@@ -785,6 +788,10 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
wa_http_cli_noauth = wa.http_client()
wa_http_cli_noauth.check_status()
# debug endpoint requires safekeeper scope
wa_http_cli_debug = wa.http_client(auth_token=env.auth_keys.generate_safekeeper_token())
wa_http_cli_debug.check_status()
# fetch something sensible from status
tli_status = wa_http_cli.timeline_status(tenant_id, timeline_id)
epoch = tli_status.acceptor_epoch
@@ -795,6 +802,12 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
with pytest.raises(cli.HTTPError, match="Forbidden|Unauthorized"):
cli.timeline_status(tenant_id, timeline_id)
# fetch debug_dump endpoint
debug_dump_0 = wa_http_cli_debug.debug_dump({"dump_all": "true"})
log.info(f"debug_dump before reboot {debug_dump_0}")
assert debug_dump_0["timelines_count"] == 1
assert debug_dump_0["timelines"][0]["timeline_id"] == str(timeline_id)
pg.safe_psql("create table t(i int)")
# ensure epoch goes up after reboot
@@ -808,6 +821,25 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
# and timeline_start_lsn stays the same
assert tli_status.timeline_start_lsn == timeline_start_lsn
# fetch debug_dump after reboot
debug_dump_1 = wa_http_cli_debug.debug_dump({"dump_all": "true"})
log.info(f"debug_dump after reboot {debug_dump_1}")
assert debug_dump_1["timelines_count"] == 1
assert debug_dump_1["timelines"][0]["timeline_id"] == str(timeline_id)
# check that commit_lsn and flush_lsn not decreased
assert (
debug_dump_1["timelines"][0]["memory"]["mem_state"]["commit_lsn"]
>= debug_dump_0["timelines"][0]["memory"]["mem_state"]["commit_lsn"]
)
assert (
debug_dump_1["timelines"][0]["memory"]["flush_lsn"]
>= debug_dump_0["timelines"][0]["memory"]["flush_lsn"]
)
# check .config in response
assert debug_dump_1["config"]["id"] == env.safekeepers[0].id
class SafekeeperEnv:
def __init__(