diff --git a/Cargo.lock b/Cargo.lock index 02b03e02fb..fe5aae6ae8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3307,6 +3307,7 @@ dependencies = [ "async-trait", "byteorder", "bytes", + "chrono", "clap 4.1.4", "const_format", "crc32c", diff --git a/libs/utils/src/http/json.rs b/libs/utils/src/http/json.rs index 8981fdd1dd..40e61e3d0c 100644 --- a/libs/utils/src/http/json.rs +++ b/libs/utils/src/http/json.rs @@ -1,7 +1,9 @@ +use std::fmt::Display; + use anyhow::Context; use bytes::Buf; use hyper::{header, Body, Request, Response, StatusCode}; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize, Serializer}; use super::error::ApiError; @@ -31,3 +33,12 @@ pub fn json_response( .map_err(|e| ApiError::InternalServerError(e.into()))?; Ok(response) } + +/// Serialize through Display trait. +pub fn display_serialize(z: &F, s: S) -> Result +where + S: Serializer, + F: Display, +{ + s.serialize_str(&format!("{}", z)) +} diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 4ee8d82203..2424509477 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -10,6 +10,7 @@ anyhow.workspace = true async-trait.workspace = true byteorder.workspace = true bytes.workspace = true +chrono.workspace = true clap = { workspace = true, features = ["derive"] } const_format.workspace = true crc32c.workspace = true diff --git a/safekeeper/src/debug_dump.rs b/safekeeper/src/debug_dump.rs new file mode 100644 index 0000000000..674cf9f6eb --- /dev/null +++ b/safekeeper/src/debug_dump.rs @@ -0,0 +1,264 @@ +//! Utils for dumping full state of the safekeeper. + +use std::fs; +use std::fs::DirEntry; +use std::io::BufReader; +use std::io::Read; +use std::path::PathBuf; + +use anyhow::Result; +use chrono::{DateTime, Utc}; +use postgres_ffi::XLogSegNo; +use serde::Serialize; + +use utils::http::json::display_serialize; +use utils::id::NodeId; +use utils::id::TenantTimelineId; +use utils::id::{TenantId, TimelineId}; +use utils::lsn::Lsn; + +use crate::safekeeper::SafeKeeperState; +use crate::safekeeper::SafekeeperMemState; +use crate::safekeeper::TermHistory; +use crate::SafeKeeperConf; + +use crate::timeline::ReplicaState; +use crate::GlobalTimelines; + +/// Various filters that influence the resulting JSON output. +#[derive(Debug, Serialize)] +pub struct Args { + /// Dump all available safekeeper state. False by default. + pub dump_all: bool, + + /// Dump control_file content. Uses value of `dump_all` by default. + pub dump_control_file: bool, + + /// Dump in-memory state. Uses value of `dump_all` by default. + pub dump_memory: bool, + + /// Dump all disk files in a timeline directory. Uses value of `dump_all` by default. + pub dump_disk_content: bool, + + /// Dump full term history. True by default. + pub dump_term_history: bool, + + /// Filter timelines by tenant_id. + pub tenant_id: Option, + + /// Filter timelines by timeline_id. + pub timeline_id: Option, +} + +/// Response for debug dump request. +#[derive(Debug, Serialize)] +pub struct Response { + pub start_time: DateTime, + pub finish_time: DateTime, + pub timelines: Vec, + pub timelines_count: usize, + pub config: Config, +} + +/// Safekeeper configuration. +#[derive(Debug, Serialize)] +pub struct Config { + pub id: NodeId, + pub workdir: PathBuf, + pub listen_pg_addr: String, + pub listen_http_addr: String, + pub no_sync: bool, + pub max_offloader_lag_bytes: u64, + pub wal_backup_enabled: bool, +} + +#[derive(Debug, Serialize)] +pub struct Timeline { + #[serde(serialize_with = "display_serialize")] + pub tenant_id: TenantId, + #[serde(serialize_with = "display_serialize")] + pub timeline_id: TimelineId, + pub control_file: Option, + pub memory: Option, + pub disk_content: Option, +} + +#[derive(Debug, Serialize)] +pub struct Memory { + pub is_cancelled: bool, + pub peers_info_len: usize, + pub replicas: Vec>, + pub wal_backup_active: bool, + pub active: bool, + pub num_computes: u32, + pub last_removed_segno: XLogSegNo, + pub epoch_start_lsn: Lsn, + pub mem_state: SafekeeperMemState, + + // PhysicalStorage state. + pub write_lsn: Lsn, + pub write_record_lsn: Lsn, + pub flush_lsn: Lsn, + pub file_open: bool, +} + +#[derive(Debug, Serialize)] +pub struct DiskContent { + pub files: Vec, +} + +#[derive(Debug, Serialize)] +pub struct FileInfo { + pub name: String, + pub size: u64, + pub created: DateTime, + pub modified: DateTime, + pub start_zeroes: u64, + pub end_zeroes: u64, + // TODO: add sha256 checksum +} + +/// Build debug dump response, using the provided [`Args`] filters. +pub fn build(args: Args) -> Result { + let start_time = Utc::now(); + let timelines_count = GlobalTimelines::timelines_count(); + + let ptrs_snapshot = if args.tenant_id.is_some() && args.timeline_id.is_some() { + // If both tenant_id and timeline_id are specified, we can just get the + // timeline directly, without taking a snapshot of the whole list. + let ttid = TenantTimelineId::new(args.tenant_id.unwrap(), args.timeline_id.unwrap()); + if let Ok(tli) = GlobalTimelines::get(ttid) { + vec![tli] + } else { + vec![] + } + } else { + // Otherwise, take a snapshot of the whole list. + GlobalTimelines::get_all() + }; + + // TODO: return Stream instead of Vec + let mut timelines = Vec::new(); + for tli in ptrs_snapshot { + let ttid = tli.ttid; + if let Some(tenant_id) = args.tenant_id { + if tenant_id != ttid.tenant_id { + continue; + } + } + if let Some(timeline_id) = args.timeline_id { + if timeline_id != ttid.timeline_id { + continue; + } + } + + let control_file = if args.dump_control_file { + let mut state = tli.get_state().1; + if !args.dump_term_history { + state.acceptor_state.term_history = TermHistory(vec![]); + } + Some(state) + } else { + None + }; + + let memory = if args.dump_memory { + Some(tli.memory_dump()) + } else { + None + }; + + let disk_content = if args.dump_disk_content { + // build_disk_content can fail, but we don't want to fail the whole + // request because of that. + build_disk_content(&tli.timeline_dir).ok() + } else { + None + }; + + let timeline = Timeline { + tenant_id: ttid.tenant_id, + timeline_id: ttid.timeline_id, + control_file, + memory, + disk_content, + }; + timelines.push(timeline); + } + + let config = GlobalTimelines::get_global_config(); + + Ok(Response { + start_time, + finish_time: Utc::now(), + timelines, + timelines_count, + config: build_config(config), + }) +} + +/// Builds DiskContent from a directory path. It can fail if the directory +/// is deleted between the time we get the path and the time we try to open it. +fn build_disk_content(path: &std::path::Path) -> Result { + let mut files = Vec::new(); + for entry in fs::read_dir(path)? { + if entry.is_err() { + continue; + } + let file = build_file_info(entry?); + if file.is_err() { + continue; + } + files.push(file?); + } + + Ok(DiskContent { files }) +} + +/// Builds FileInfo from DirEntry. Sometimes it can return an error +/// if the file is deleted between the time we get the DirEntry +/// and the time we try to open it. +fn build_file_info(entry: DirEntry) -> Result { + let metadata = entry.metadata()?; + let path = entry.path(); + let name = path + .file_name() + .and_then(|x| x.to_str()) + .unwrap_or("") + .to_owned(); + let mut file = fs::File::open(path)?; + let mut reader = BufReader::new(&mut file).bytes().filter_map(|x| x.ok()); + + let start_zeroes = reader.by_ref().take_while(|&x| x == 0).count() as u64; + let mut end_zeroes = 0; + for b in reader { + if b == 0 { + end_zeroes += 1; + } else { + end_zeroes = 0; + } + } + + Ok(FileInfo { + name, + size: metadata.len(), + created: DateTime::from(metadata.created()?), + modified: DateTime::from(metadata.modified()?), + start_zeroes, + end_zeroes, + }) +} + +/// Converts SafeKeeperConf to Config, filtering out the fields that are not +/// supposed to be exposed. +fn build_config(config: SafeKeeperConf) -> Config { + Config { + id: config.my_id, + workdir: config.workdir, + listen_pg_addr: config.listen_pg_addr, + listen_http_addr: config.listen_http_addr, + no_sync: config.no_sync, + max_offloader_lag_bytes: config.max_offloader_lag_bytes, + wal_backup_enabled: config.wal_backup_enabled, + } +} diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index a917d61678..ced9599b36 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -5,14 +5,16 @@ use once_cell::sync::Lazy; use postgres_ffi::WAL_SEGMENT_SIZE; use safekeeper_api::models::SkTimelineInfo; use serde::Serialize; -use serde::Serializer; use std::collections::{HashMap, HashSet}; -use std::fmt::Display; +use std::fmt; +use std::str::FromStr; use std::sync::Arc; use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use tokio::task::JoinError; +use utils::http::json::display_serialize; +use crate::debug_dump; use crate::safekeeper::ServerInfo; use crate::safekeeper::Term; @@ -54,15 +56,6 @@ fn get_conf(request: &Request) -> &SafeKeeperConf { .as_ref() } -/// Serialize through Display trait. -fn display_serialize(z: &F, s: S) -> Result -where - S: Serializer, - F: Display, -{ - s.serialize_str(&format!("{}", z)) -} - /// Same as TermSwitchEntry, but serializes LSN using display serializer /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response. #[derive(Debug, Serialize)] @@ -276,6 +269,69 @@ async fn record_safekeeper_info(mut request: Request) -> Result>(k: &str, v: &str) -> Result { + v.parse() + .map_err(|e| ApiError::BadRequest(anyhow::anyhow!("cannot parse {k}: {e}"))) +} + +/// Dump debug info about all available safekeeper state. +async fn dump_debug_handler(mut request: Request) -> Result, ApiError> { + check_permission(&request, None)?; + ensure_no_body(&mut request).await?; + + let mut dump_all: Option = None; + let mut dump_control_file: Option = None; + let mut dump_memory: Option = None; + let mut dump_disk_content: Option = None; + let mut dump_term_history: Option = None; + let mut tenant_id: Option = None; + let mut timeline_id: Option = None; + + let query = request.uri().query().unwrap_or(""); + let mut values = url::form_urlencoded::parse(query.as_bytes()); + + for (k, v) in &mut values { + match k.as_ref() { + "dump_all" => dump_all = Some(parse_kv_str(&k, &v)?), + "dump_control_file" => dump_control_file = Some(parse_kv_str(&k, &v)?), + "dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?), + "dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?), + "dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?), + "tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?), + "timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?), + _ => Err(ApiError::BadRequest(anyhow::anyhow!( + "Unknown query parameter: {}", + k + )))?, + } + } + + let dump_all = dump_all.unwrap_or(false); + let dump_control_file = dump_control_file.unwrap_or(dump_all); + let dump_memory = dump_memory.unwrap_or(dump_all); + let dump_disk_content = dump_disk_content.unwrap_or(dump_all); + let dump_term_history = dump_term_history.unwrap_or(true); + + let args = debug_dump::Args { + dump_all, + dump_control_file, + dump_memory, + dump_disk_content, + dump_term_history, + tenant_id, + timeline_id, + }; + + let resp = tokio::task::spawn_blocking(move || { + debug_dump::build(args).map_err(ApiError::InternalServerError) + }) + .await + .map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??; + + // TODO: use streaming response + json_response(StatusCode::OK, resp) +} + /// Safekeeper http router. pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder { let mut router = endpoint::make_router(); @@ -316,6 +372,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder "/v1/record_safekeeper_info/:tenant_id/:timeline_id", record_safekeeper_info, ) + .get("/v1/debug_dump", dump_debug_handler) } #[cfg(test)] diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 891d73533f..6ab108ceb0 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -10,6 +10,7 @@ mod auth; pub mod broker; pub mod control_file; pub mod control_file_upgrade; +pub mod debug_dump; pub mod handler; pub mod http; pub mod json_ctrl; diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index fa973a3ede..c37411d667 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -204,7 +204,7 @@ pub struct SafeKeeperState { pub peers: PersistedPeers, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] // In memory safekeeper state. Fields mirror ones in `SafeKeeperState`; values // are not flushed yet. pub struct SafekeeperMemState { @@ -212,6 +212,7 @@ pub struct SafekeeperMemState { pub backup_lsn: Lsn, pub peer_horizon_lsn: Lsn, pub remote_consistent_lsn: Lsn, + #[serde(with = "hex")] pub proposer_uuid: PgUuid, } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 43c395574f..7479741774 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -5,6 +5,7 @@ use anyhow::{bail, Result}; use parking_lot::{Mutex, MutexGuard}; use postgres_ffi::XLogSegNo; use pq_proto::ReplicationFeedback; +use serde::Serialize; use std::cmp::{max, min}; use std::path::PathBuf; use tokio::{ @@ -28,9 +29,9 @@ use crate::send_wal::HotStandbyFeedback; use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION}; use crate::metrics::FullTimelineInfo; -use crate::wal_storage; use crate::wal_storage::Storage as wal_storage_iface; use crate::SafeKeeperConf; +use crate::{debug_dump, wal_storage}; /// Things safekeeper should know about timeline state on peers. #[derive(Debug, Clone)] @@ -80,7 +81,7 @@ impl PeersInfo { } /// Replica status update + hot standby feedback -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Serialize)] pub struct ReplicaState { /// last known lsn received by replica pub last_received_lsn: Lsn, // None means we don't know @@ -381,7 +382,7 @@ pub struct Timeline { cancellation_rx: watch::Receiver, /// Directory where timeline state is stored. - timeline_dir: PathBuf, + pub timeline_dir: PathBuf, } impl Timeline { @@ -588,38 +589,6 @@ impl Timeline { self.write_shared_state().wal_backup_attend() } - /// Returns full timeline info, required for the metrics. If the timeline is - /// not active, returns None instead. - pub fn info_for_metrics(&self) -> Option { - if self.is_cancelled() { - return None; - } - - let state = self.write_shared_state(); - if state.active { - Some(FullTimelineInfo { - ttid: self.ttid, - replicas: state - .replicas - .iter() - .filter_map(|r| r.as_ref()) - .copied() - .collect(), - wal_backup_active: state.wal_backup_active, - timeline_is_active: state.active, - num_computes: state.num_computes, - last_removed_segno: state.last_removed_segno, - epoch_start_lsn: state.sk.epoch_start_lsn, - mem_state: state.sk.inmem.clone(), - persisted_state: state.sk.state.clone(), - flush_lsn: state.sk.wal_store.flush_lsn(), - wal_storage: state.sk.wal_store.get_metrics(), - }) - } else { - None - } - } - /// Returns commit_lsn watch channel. pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver { self.commit_lsn_watch_rx.clone() @@ -784,6 +753,62 @@ impl Timeline { shared_state.last_removed_segno = horizon_segno; Ok(()) } + + /// Returns full timeline info, required for the metrics. If the timeline is + /// not active, returns None instead. + pub fn info_for_metrics(&self) -> Option { + if self.is_cancelled() { + return None; + } + + let state = self.write_shared_state(); + if state.active { + Some(FullTimelineInfo { + ttid: self.ttid, + replicas: state + .replicas + .iter() + .filter_map(|r| r.as_ref()) + .copied() + .collect(), + wal_backup_active: state.wal_backup_active, + timeline_is_active: state.active, + num_computes: state.num_computes, + last_removed_segno: state.last_removed_segno, + epoch_start_lsn: state.sk.epoch_start_lsn, + mem_state: state.sk.inmem.clone(), + persisted_state: state.sk.state.clone(), + flush_lsn: state.sk.wal_store.flush_lsn(), + wal_storage: state.sk.wal_store.get_metrics(), + }) + } else { + None + } + } + + /// Returns in-memory timeline state to build a full debug dump. + pub fn memory_dump(&self) -> debug_dump::Memory { + let state = self.write_shared_state(); + + let (write_lsn, write_record_lsn, flush_lsn, file_open) = + state.sk.wal_store.internal_state(); + + debug_dump::Memory { + is_cancelled: self.is_cancelled(), + peers_info_len: state.peers_info.0.len(), + replicas: state.replicas.clone(), + wal_backup_active: state.wal_backup_active, + active: state.active, + num_computes: state.num_computes, + last_removed_segno: state.last_removed_segno, + epoch_start_lsn: state.sk.epoch_start_lsn, + mem_state: state.sk.inmem.clone(), + write_lsn, + write_record_lsn, + flush_lsn, + file_open, + } + } } /// Deletes directory and it's contents. Returns false if directory does not exist. diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 66e0145042..baef17ffa8 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -159,6 +159,16 @@ impl GlobalTimelines { Ok(()) } + /// Get the number of timelines in the map. + pub fn timelines_count() -> usize { + TIMELINES_STATE.lock().unwrap().timelines.len() + } + + /// Get the global safekeeper config. + pub fn get_global_config() -> SafeKeeperConf { + TIMELINES_STATE.lock().unwrap().get_conf().clone() + } + /// Create a new timeline with the given id. If the timeline already exists, returns /// an existing timeline. pub fn create( diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 561104bd27..ae02b3c7bc 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -165,6 +165,16 @@ impl PhysicalStorage { }) } + /// Get all known state of the storage. + pub fn internal_state(&self) -> (Lsn, Lsn, Lsn, bool) { + ( + self.write_lsn, + self.write_record_lsn, + self.flush_record_lsn, + self.file.is_some(), + ) + } + /// Call fdatasync if config requires so. fn fdatasync_file(&mut self, file: &mut File) -> Result<()> { if !self.conf.no_sync { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index c4b3d057f8..56b56b8578 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2988,6 +2988,13 @@ class SafekeeperHttpClient(requests.Session): def check_status(self): self.get(f"http://localhost:{self.port}/v1/status").raise_for_status() + def debug_dump(self, params: Dict[str, str] = {}) -> Dict[str, Any]: + res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params) + res.raise_for_status() + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + def timeline_create( self, tenant_id: TenantId, timeline_id: TimelineId, pg_version: int, commit_lsn: Lsn ): diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 9e3b0ec02f..0ac9127c6b 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -775,6 +775,9 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): if not auth_enabled: wa_http_cli = wa.http_client() wa_http_cli.check_status() + + wa_http_cli_debug = wa.http_client() + wa_http_cli_debug.check_status() else: wa_http_cli = wa.http_client(auth_token=env.auth_keys.generate_tenant_token(tenant_id)) wa_http_cli.check_status() @@ -785,6 +788,10 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): wa_http_cli_noauth = wa.http_client() wa_http_cli_noauth.check_status() + # debug endpoint requires safekeeper scope + wa_http_cli_debug = wa.http_client(auth_token=env.auth_keys.generate_safekeeper_token()) + wa_http_cli_debug.check_status() + # fetch something sensible from status tli_status = wa_http_cli.timeline_status(tenant_id, timeline_id) epoch = tli_status.acceptor_epoch @@ -795,6 +802,12 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): with pytest.raises(cli.HTTPError, match="Forbidden|Unauthorized"): cli.timeline_status(tenant_id, timeline_id) + # fetch debug_dump endpoint + debug_dump_0 = wa_http_cli_debug.debug_dump({"dump_all": "true"}) + log.info(f"debug_dump before reboot {debug_dump_0}") + assert debug_dump_0["timelines_count"] == 1 + assert debug_dump_0["timelines"][0]["timeline_id"] == str(timeline_id) + pg.safe_psql("create table t(i int)") # ensure epoch goes up after reboot @@ -808,6 +821,25 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): # and timeline_start_lsn stays the same assert tli_status.timeline_start_lsn == timeline_start_lsn + # fetch debug_dump after reboot + debug_dump_1 = wa_http_cli_debug.debug_dump({"dump_all": "true"}) + log.info(f"debug_dump after reboot {debug_dump_1}") + assert debug_dump_1["timelines_count"] == 1 + assert debug_dump_1["timelines"][0]["timeline_id"] == str(timeline_id) + + # check that commit_lsn and flush_lsn not decreased + assert ( + debug_dump_1["timelines"][0]["memory"]["mem_state"]["commit_lsn"] + >= debug_dump_0["timelines"][0]["memory"]["mem_state"]["commit_lsn"] + ) + assert ( + debug_dump_1["timelines"][0]["memory"]["flush_lsn"] + >= debug_dump_0["timelines"][0]["memory"]["flush_lsn"] + ) + + # check .config in response + assert debug_dump_1["config"]["id"] == env.safekeepers[0].id + class SafekeeperEnv: def __init__(