From f86cf93435133ee11f8c4bc53b1470e2dada3ce0 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Tue, 15 Feb 2022 20:10:10 +0300 Subject: [PATCH] Refactor timeline creation on safekeepers, allowing storing peer ids. Have separate routine and http endpoint to create timeline on safekeepers. It is not used yet, i.e. timeline is still created implicitly, but we'll change that once infrastructure for learning which tlis are assigned to which safekeepers will be ready, preventing accidental creation by compute. Changes format of safekeeper control file, allowing to store set of peers. Knowing peers provides a part of foundation for peer recovery (calculating min horizons like truncate_lsn for WAL truncation and commit_lsn for sync-safekeepers replacement) and proper membership change; similarly, we don't yet use it for now. Employing cf file version bump, extracts tenant_id and timeline_id to top level where it is more suitable. Also adds a bunch of LSNs there and rename truncate_lsn to more specific peer_horizon_lsn. --- Cargo.lock | 1 + control_plane/Cargo.toml | 1 + control_plane/src/safekeeper.rs | 24 ++++- walkeeper/src/bin/safekeeper.rs | 7 +- walkeeper/src/control_file.rs | 104 ++++++++---------- walkeeper/src/control_file_upgrade.rs | 82 +++++++++++++-- walkeeper/src/handler.rs | 38 +++---- walkeeper/src/http/mod.rs | 1 + walkeeper/src/http/models.rs | 9 ++ walkeeper/src/http/routes.rs | 32 +++++- walkeeper/src/safekeeper.rs | 145 ++++++++++++++++++-------- walkeeper/src/timeline.rs | 128 ++++++++++++++++------- walkeeper/src/wal_storage.rs | 23 ++-- zenith_utils/src/zid.rs | 4 + 14 files changed, 404 insertions(+), 195 deletions(-) create mode 100644 walkeeper/src/http/models.rs diff --git a/Cargo.lock b/Cargo.lock index ba3c6729d6..ad38a41d91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,6 +424,7 @@ dependencies = [ "thiserror", "toml", "url", + "walkeeper", "workspace_hack", "zenith_utils", ] diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 5e972200c2..eff6b3ef2d 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -17,5 +17,6 @@ url = "2.2.2" reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "rustls-tls"] } pageserver = { path = "../pageserver" } +walkeeper = { path = "../walkeeper" } zenith_utils = { path = "../zenith_utils" } workspace_hack = { path = "../workspace_hack" } diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index 351d1efbbc..969e2cd531 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -14,8 +14,9 @@ use postgres::Config; use reqwest::blocking::{Client, RequestBuilder, Response}; use reqwest::{IntoUrl, Method}; use thiserror::Error; +use walkeeper::http::models::TimelineCreateRequest; use zenith_utils::http::error::HttpErrorBody; -use zenith_utils::zid::ZNodeId; +use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId}; use crate::local_env::{LocalEnv, SafekeeperConf}; use crate::storage::PageServerNode; @@ -261,4 +262,25 @@ impl SafekeeperNode { .error_from_body()?; Ok(()) } + + pub fn timeline_create( + &self, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + peer_ids: Vec, + ) -> Result<()> { + Ok(self + .http_request( + Method::POST, + format!("{}/{}", self.http_base_url, "timeline"), + ) + .json(&TimelineCreateRequest { + tenant_id, + timeline_id, + peer_ids, + }) + .send()? + .error_from_body()? + .json()?) + } } diff --git a/walkeeper/src/bin/safekeeper.rs b/walkeeper/src/bin/safekeeper.rs index 48de1481d4..6c45115e5f 100644 --- a/walkeeper/src/bin/safekeeper.rs +++ b/walkeeper/src/bin/safekeeper.rs @@ -11,7 +11,7 @@ use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::thread; use tracing::*; -use walkeeper::control_file::{self, CreateControlFile}; +use walkeeper::control_file::{self}; use zenith_utils::http::endpoint; use zenith_utils::zid::ZNodeId; use zenith_utils::{logging, tcp_listener, GIT_VERSION}; @@ -108,10 +108,7 @@ fn main() -> Result<()> { .get_matches(); if let Some(addr) = arg_matches.value_of("dump-control-file") { - let state = control_file::FileStorage::load_control_file( - Path::new(addr), - CreateControlFile::False, - )?; + let state = control_file::FileStorage::load_control_file(Path::new(addr))?; let json = serde_json::to_string(&state)?; print!("{}", json); return Ok(()); diff --git a/walkeeper/src/control_file.rs b/walkeeper/src/control_file.rs index 6016e00d1d..8b4e618661 100644 --- a/walkeeper/src/control_file.rs +++ b/walkeeper/src/control_file.rs @@ -27,13 +27,6 @@ const CONTROL_FILE_NAME: &str = "safekeeper.control"; const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial"; pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); -// A named boolean. -#[derive(Debug)] -pub enum CreateControlFile { - True, - False, -} - lazy_static! { static ref PERSIST_CONTROL_FILE_SECONDS: HistogramVec = register_histogram_vec!( "safekeeper_persist_control_file_seconds", @@ -94,28 +87,22 @@ impl FileStorage { pub fn load_control_file_conf( conf: &SafeKeeperConf, zttid: &ZTenantTimelineId, - create: CreateControlFile, ) -> Result { let path = conf.timeline_dir(zttid).join(CONTROL_FILE_NAME); - Self::load_control_file(path, create) + Self::load_control_file(path) } /// Read in the control file. /// If create=false and file doesn't exist, bails out. - pub fn load_control_file>( - control_file_path: P, - create: CreateControlFile, - ) -> Result { + pub fn load_control_file>(control_file_path: P) -> Result { info!( - "loading control file {}, create={:?}", + "loading control file {}", control_file_path.as_ref().display(), - create, ); let mut control_file = OpenOptions::new() .read(true) .write(true) - .create(matches!(create, CreateControlFile::True)) .open(&control_file_path) .with_context(|| { format!( @@ -124,41 +111,32 @@ impl FileStorage { ) })?; - // Empty file is legit on 'create', don't try to deser from it. - let state = if control_file.metadata().unwrap().len() == 0 { - if let CreateControlFile::False = create { - bail!("control file is empty"); - } - SafeKeeperState::new() - } else { - let mut buf = Vec::new(); - control_file - .read_to_end(&mut buf) - .context("failed to read control file")?; + let mut buf = Vec::new(); + control_file + .read_to_end(&mut buf) + .context("failed to read control file")?; - let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]); + let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]); - let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] = - buf[buf.len() - CHECKSUM_SIZE..].try_into()?; - let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes); + let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] = + buf[buf.len() - CHECKSUM_SIZE..].try_into()?; + let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes); - ensure!( - calculated_checksum == expected_checksum, + ensure!( + calculated_checksum == expected_checksum, + format!( + "safekeeper control file checksum mismatch: expected {} got {}", + expected_checksum, calculated_checksum + ) + ); + + let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]) + .with_context(|| { format!( - "safekeeper control file checksum mismatch: expected {} got {}", - expected_checksum, calculated_checksum + "while reading control file {}", + control_file_path.as_ref().display(), ) - ); - - FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE]).with_context( - || { - format!( - "while reading control file {}", - control_file_path.as_ref().display(), - ) - }, - )? - }; + })?; Ok(state) } } @@ -247,31 +225,38 @@ mod test { fn load_from_control_file( conf: &SafeKeeperConf, zttid: &ZTenantTimelineId, - create: CreateControlFile, ) -> Result<(FileStorage, SafeKeeperState)> { fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir"); Ok(( FileStorage::new(zttid, conf), - FileStorage::load_control_file_conf(conf, zttid, create)?, + FileStorage::load_control_file_conf(conf, zttid)?, )) } + fn create( + conf: &SafeKeeperConf, + zttid: &ZTenantTimelineId, + ) -> Result<(FileStorage, SafeKeeperState)> { + fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir"); + let state = SafeKeeperState::empty(); + let mut storage = FileStorage::new(zttid, conf); + storage.persist(&state)?; + Ok((storage, state)) + } + #[test] fn test_read_write_safekeeper_state() { let conf = stub_conf(); let zttid = ZTenantTimelineId::generate(); { - let (mut storage, mut state) = - load_from_control_file(&conf, &zttid, CreateControlFile::True) - .expect("failed to read state"); + let (mut storage, mut state) = create(&conf, &zttid).expect("failed to create state"); // change something - state.wal_start_lsn = Lsn(42); + state.commit_lsn = Lsn(42); storage.persist(&state).expect("failed to persist state"); } - let (_, state) = load_from_control_file(&conf, &zttid, CreateControlFile::False) - .expect("failed to read state"); - assert_eq!(state.wal_start_lsn, Lsn(42)); + let (_, state) = load_from_control_file(&conf, &zttid).expect("failed to read state"); + assert_eq!(state.commit_lsn, Lsn(42)); } #[test] @@ -279,11 +264,10 @@ mod test { let conf = stub_conf(); let zttid = ZTenantTimelineId::generate(); { - let (mut storage, mut state) = - load_from_control_file(&conf, &zttid, CreateControlFile::True) - .expect("failed to read state"); + let (mut storage, mut state) = create(&conf, &zttid).expect("failed to read state"); + // change something - state.wal_start_lsn = Lsn(42); + state.commit_lsn = Lsn(42); storage.persist(&state).expect("failed to persist state"); } let control_path = conf.timeline_dir(&zttid).join(CONTROL_FILE_NAME); @@ -291,7 +275,7 @@ mod test { data[0] += 1; // change the first byte of the file to fail checksum validation fs::write(&control_path, &data).expect("failed to write control file"); - match load_from_control_file(&conf, &zttid, CreateControlFile::False) { + match load_from_control_file(&conf, &zttid) { Err(err) => assert!(err .to_string() .contains("safekeeper control file checksum mismatch")), diff --git a/walkeeper/src/control_file_upgrade.rs b/walkeeper/src/control_file_upgrade.rs index 913bd02c1e..9effe42f8d 100644 --- a/walkeeper/src/control_file_upgrade.rs +++ b/walkeeper/src/control_file_upgrade.rs @@ -1,6 +1,6 @@ //! Code to deal with safekeeper control file upgrades use crate::safekeeper::{ - AcceptorState, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry, + AcceptorState, Peers, PgUuid, SafeKeeperState, ServerInfo, Term, TermHistory, TermSwitchEntry, }; use anyhow::{bail, Result}; use serde::{Deserialize, Serialize}; @@ -26,7 +26,7 @@ struct SafeKeeperStateV1 { /// persistent acceptor state acceptor_state: AcceptorStateV1, /// information about server - server: ServerInfo, + server: ServerInfoV2, /// Unique id of the last *elected* proposer we dealed with. Not needed /// for correctness, exists for monitoring purposes. proposer_uuid: PgUuid, @@ -70,6 +70,39 @@ pub struct SafeKeeperStateV2 { pub wal_start_lsn: Lsn, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct ServerInfoV3 { + /// Postgres server version + pub pg_version: u32, + pub system_id: SystemId, + #[serde(with = "hex")] + pub tenant_id: ZTenantId, + /// Zenith timelineid + #[serde(with = "hex")] + pub timeline_id: ZTimelineId, + pub wal_seg_size: u32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SafeKeeperStateV3 { + /// persistent acceptor state + pub acceptor_state: AcceptorState, + /// information about server + pub server: ServerInfoV3, + /// Unique id of the last *elected* proposer we dealed with. Not needed + /// for correctness, exists for monitoring purposes. + #[serde(with = "hex")] + pub proposer_uuid: PgUuid, + /// part of WAL acknowledged by quorum and available locally + pub commit_lsn: Lsn, + /// minimal LSN which may be needed for recovery of some safekeeper (end_lsn + /// of last record streamed to everyone) + pub truncate_lsn: Lsn, + // Safekeeper starts receiving WAL from this LSN, zeros before it ought to + // be skipped during decoding. + pub wal_start_lsn: Lsn, +} + pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result { // migrate to storing full term history if version == 1 { @@ -83,12 +116,20 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result }]), }; return Ok(SafeKeeperState { + tenant_id: oldstate.server.tenant_id, + timeline_id: oldstate.server.ztli, acceptor_state: ac, - server: oldstate.server.clone(), + server: ServerInfo { + pg_version: oldstate.server.pg_version, + system_id: oldstate.server.system_id, + wal_seg_size: oldstate.server.wal_seg_size, + }, proposer_uuid: oldstate.proposer_uuid, commit_lsn: oldstate.commit_lsn, - truncate_lsn: oldstate.truncate_lsn, - wal_start_lsn: oldstate.wal_start_lsn, + s3_wal_lsn: Lsn(0), + peer_horizon_lsn: oldstate.truncate_lsn, + remote_consistent_lsn: Lsn(0), + peers: Peers(vec![]), }); // migrate to hexing some zids } else if version == 2 { @@ -97,17 +138,40 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result let server = ServerInfo { pg_version: oldstate.server.pg_version, system_id: oldstate.server.system_id, - tenant_id: oldstate.server.tenant_id, - timeline_id: oldstate.server.ztli, wal_seg_size: oldstate.server.wal_seg_size, }; return Ok(SafeKeeperState { + tenant_id: oldstate.server.tenant_id, + timeline_id: oldstate.server.ztli, acceptor_state: oldstate.acceptor_state, server, proposer_uuid: oldstate.proposer_uuid, commit_lsn: oldstate.commit_lsn, - truncate_lsn: oldstate.truncate_lsn, - wal_start_lsn: oldstate.wal_start_lsn, + s3_wal_lsn: Lsn(0), + peer_horizon_lsn: oldstate.truncate_lsn, + remote_consistent_lsn: Lsn(0), + peers: Peers(vec![]), + }); + // migrate to moving ztenantid/ztli to the top and adding some lsns + } else if version == 3 { + info!("reading safekeeper control file version {}", version); + let oldstate = SafeKeeperStateV3::des(&buf[..buf.len()])?; + let server = ServerInfo { + pg_version: oldstate.server.pg_version, + system_id: oldstate.server.system_id, + wal_seg_size: oldstate.server.wal_seg_size, + }; + return Ok(SafeKeeperState { + tenant_id: oldstate.server.tenant_id, + timeline_id: oldstate.server.timeline_id, + acceptor_state: oldstate.acceptor_state, + server, + proposer_uuid: oldstate.proposer_uuid, + commit_lsn: oldstate.commit_lsn, + s3_wal_lsn: Lsn(0), + peer_horizon_lsn: oldstate.truncate_lsn, + remote_consistent_lsn: Lsn(0), + peers: Peers(vec![]), }); } bail!("unsupported safekeeper control file version {}", version) diff --git a/walkeeper/src/handler.rs b/walkeeper/src/handler.rs index d1ead5cb37..ead6fab9fb 100644 --- a/walkeeper/src/handler.rs +++ b/walkeeper/src/handler.rs @@ -13,6 +13,7 @@ use postgres_ffi::xlog_utils::PG_TLI; use regex::Regex; use std::str::FromStr; use std::sync::Arc; +use tracing::info; use zenith_utils::lsn::Lsn; use zenith_utils::postgres_backend; use zenith_utils::postgres_backend::PostgresBackend; @@ -20,7 +21,6 @@ use zenith_utils::pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; use crate::callmemaybe::CallmeEvent; -use crate::control_file::CreateControlFile; use tokio::sync::mpsc::UnboundedSender; /// Safekeeper handler of postgres commands @@ -101,29 +101,19 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: &str) -> Result<()> { let cmd = parse_cmd(query_string)?; - // Is this command is ztimeline scoped? - match cmd { - SafekeeperPostgresCommand::StartWalPush { .. } - | SafekeeperPostgresCommand::StartReplication { .. } - | SafekeeperPostgresCommand::IdentifySystem - | SafekeeperPostgresCommand::JSONCtrl { .. } => { - let tenantid = self.ztenantid.context("tenantid is required")?; - let timelineid = self.ztimelineid.context("timelineid is required")?; - if self.timeline.is_none() { - // START_WAL_PUSH is the only command that initializes the timeline in production. - // There is also JSON_CTRL command, which should initialize the timeline for testing. - let create_control_file = match cmd { - SafekeeperPostgresCommand::StartWalPush { .. } - | SafekeeperPostgresCommand::JSONCtrl { .. } => CreateControlFile::True, - _ => CreateControlFile::False, - }; - self.timeline.set( - &self.conf, - ZTenantTimelineId::new(tenantid, timelineid), - create_control_file, - )?; - } - } + info!("got query {:?}", query_string); + + let create = !(matches!(cmd, SafekeeperPostgresCommand::StartReplication { .. }) + || matches!(cmd, SafekeeperPostgresCommand::IdentifySystem)); + + let tenantid = self.ztenantid.context("tenantid is required")?; + let timelineid = self.ztimelineid.context("timelineid is required")?; + if self.timeline.is_none() { + self.timeline.set( + &self.conf, + ZTenantTimelineId::new(tenantid, timelineid), + create, + )?; } match cmd { diff --git a/walkeeper/src/http/mod.rs b/walkeeper/src/http/mod.rs index c82d1c0362..4c0be17ecd 100644 --- a/walkeeper/src/http/mod.rs +++ b/walkeeper/src/http/mod.rs @@ -1,2 +1,3 @@ +pub mod models; pub mod routes; pub use routes::make_router; diff --git a/walkeeper/src/http/models.rs b/walkeeper/src/http/models.rs new file mode 100644 index 0000000000..8a6ed7a812 --- /dev/null +++ b/walkeeper/src/http/models.rs @@ -0,0 +1,9 @@ +use serde::{Deserialize, Serialize}; +use zenith_utils::zid::{ZNodeId, ZTenantId, ZTimelineId}; + +#[derive(Serialize, Deserialize)] +pub struct TimelineCreateRequest { + pub tenant_id: ZTenantId, + pub timeline_id: ZTimelineId, + pub peer_ids: Vec, +} diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs index bc992c6a6f..74f7f4a735 100644 --- a/walkeeper/src/http/routes.rs +++ b/walkeeper/src/http/routes.rs @@ -1,14 +1,15 @@ use hyper::{Body, Request, Response, StatusCode}; + use serde::Serialize; use serde::Serializer; use std::fmt::Display; use std::sync::Arc; +use zenith_utils::http::json::json_request; use zenith_utils::http::{RequestExt, RouterBuilder}; use zenith_utils::lsn::Lsn; use zenith_utils::zid::ZNodeId; use zenith_utils::zid::ZTenantTimelineId; -use crate::control_file::CreateControlFile; use crate::safekeeper::Term; use crate::safekeeper::TermHistory; use crate::timeline::GlobalTimelines; @@ -19,6 +20,8 @@ use zenith_utils::http::json::json_response; use zenith_utils::http::request::parse_request_param; use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use super::models::TimelineCreateRequest; + #[derive(Debug, Serialize)] struct SafekeeperStatus { id: ZNodeId, @@ -66,7 +69,11 @@ struct TimelineStatus { #[serde(serialize_with = "display_serialize")] commit_lsn: Lsn, #[serde(serialize_with = "display_serialize")] - truncate_lsn: Lsn, + s3_wal_lsn: Lsn, + #[serde(serialize_with = "display_serialize")] + peer_horizon_lsn: Lsn, + #[serde(serialize_with = "display_serialize")] + remote_consistent_lsn: Lsn, #[serde(serialize_with = "display_serialize")] flush_lsn: Lsn, } @@ -78,8 +85,7 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result) -> Result, ApiError> { + let request_data: TimelineCreateRequest = json_request(&mut request).await?; + + let zttid = ZTenantTimelineId { + tenant_id: request_data.tenant_id, + timeline_id: request_data.timeline_id, + }; + GlobalTimelines::create(get_conf(&request), zttid, request_data.peer_ids) + .map_err(ApiError::from_err)?; + + Ok(json_response(StatusCode::CREATED, ())?) +} + /// Safekeeper http router. pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder { let router = endpoint::make_router(); @@ -110,4 +131,5 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder "/v1/timeline/:tenant_id/:timeline_id", timeline_status_handler, ) + .post("/v1/timeline", timeline_create_handler) } diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index fa624bb18f..f8b12530d8 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -10,6 +10,8 @@ use std::cmp::min; use std::fmt; use std::io::Read; use tracing::*; +use zenith_utils::zid::ZNodeId; +use zenith_utils::zid::ZTenantTimelineId; use lazy_static::lazy_static; @@ -25,12 +27,13 @@ use zenith_utils::pq_proto::ZenithFeedback; use zenith_utils::zid::{ZTenantId, ZTimelineId}; pub const SK_MAGIC: u32 = 0xcafeceefu32; -pub const SK_FORMAT_VERSION: u32 = 3; +pub const SK_FORMAT_VERSION: u32 = 4; const SK_PROTOCOL_VERSION: u32 = 1; const UNKNOWN_SERVER_VERSION: u32 = 0; /// Consensus logical timestamp. pub type Term = u64; +const INVALID_TERM: Term = 0; #[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct TermSwitchEntry { @@ -128,18 +131,47 @@ pub struct ServerInfo { /// Postgres server version pub pg_version: u32, pub system_id: SystemId, - #[serde(with = "hex")] - pub tenant_id: ZTenantId, - /// Zenith timelineid - #[serde(with = "hex")] - pub timeline_id: ZTimelineId, pub wal_seg_size: u32, } +/// Data published by safekeeper to the peers +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PeerInfo { + /// LSN up to which safekeeper offloaded WAL to s3. + s3_wal_lsn: Lsn, + /// Term of the last entry. + term: Term, + /// LSN of the last record. + flush_lsn: Lsn, + /// Up to which LSN safekeeper regards its WAL as committed. + commit_lsn: Lsn, +} + +impl PeerInfo { + fn new() -> Self { + Self { + s3_wal_lsn: Lsn(0), + term: INVALID_TERM, + flush_lsn: Lsn(0), + commit_lsn: Lsn(0), + } + } +} + +// vector-based node id -> peer state map with very limited functionality we +// need/ +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Peers(pub Vec<(ZNodeId, PeerInfo)>); + /// Persistent information stored on safekeeper node /// On disk data is prefixed by magic and format version and followed by checksum. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SafeKeeperState { + #[serde(with = "hex")] + pub tenant_id: ZTenantId, + /// Zenith timelineid + #[serde(with = "hex")] + pub timeline_id: ZTimelineId, /// persistent acceptor state pub acceptor_state: AcceptorState, /// information about server @@ -148,19 +180,33 @@ pub struct SafeKeeperState { /// for correctness, exists for monitoring purposes. #[serde(with = "hex")] pub proposer_uuid: PgUuid, - /// part of WAL acknowledged by quorum and available locally + /// Part of WAL acknowledged by quorum and available locally. Always points + /// to record boundary. pub commit_lsn: Lsn, - /// minimal LSN which may be needed for recovery of some safekeeper (end_lsn - /// of last record streamed to everyone) - pub truncate_lsn: Lsn, - // Safekeeper starts receiving WAL from this LSN, zeros before it ought to - // be skipped during decoding. - pub wal_start_lsn: Lsn, + /// First LSN not yet offloaded to s3. Useful to persist to avoid finding + /// out offloading progress on boot. + pub s3_wal_lsn: Lsn, + /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn + /// of last record streamed to everyone). Persisting it helps skipping + /// recovery in walproposer, generally we compute it from peers. In + /// walproposer proto called 'truncate_lsn'. + pub peer_horizon_lsn: Lsn, + /// LSN of the oldest known checkpoint made by pageserver and successfully + /// pushed to s3. We don't remove WAL beyond it. Persisted only for + /// informational purposes, we receive it from pageserver. + pub remote_consistent_lsn: Lsn, + // Peers and their state as we remember it. Knowing peers themselves is + // fundamental; but state is saved here only for informational purposes and + // obviously can be stale. (Currently not saved at all, but let's provision + // place to have less file version upgrades). + pub peers: Peers, } impl SafeKeeperState { - pub fn new() -> SafeKeeperState { + pub fn new(zttid: &ZTenantTimelineId, peers: Vec) -> SafeKeeperState { SafeKeeperState { + tenant_id: zttid.tenant_id, + timeline_id: zttid.timeline_id, acceptor_state: AcceptorState { term: 0, term_history: TermHistory::empty(), @@ -168,21 +214,20 @@ impl SafeKeeperState { server: ServerInfo { pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ system_id: 0, /* Postgres system identifier */ - tenant_id: ZTenantId::from([0u8; 16]), - timeline_id: ZTimelineId::from([0u8; 16]), wal_seg_size: 0, }, proposer_uuid: [0; 16], - commit_lsn: Lsn(0), /* part of WAL acknowledged by quorum */ - truncate_lsn: Lsn(0), /* minimal LSN which may be needed for recovery of some safekeeper */ - wal_start_lsn: Lsn(0), + commit_lsn: Lsn(0), + s3_wal_lsn: Lsn(0), + peer_horizon_lsn: Lsn(0), + remote_consistent_lsn: Lsn(0), + peers: Peers(peers.iter().map(|p| (*p, PeerInfo::new())).collect()), } } -} -impl Default for SafeKeeperState { - fn default() -> Self { - Self::new() + #[cfg(test)] + pub fn empty() -> Self { + SafeKeeperState::new(&ZTenantTimelineId::empty(), vec![]) } } @@ -421,6 +466,7 @@ lazy_static! { struct SafeKeeperMetrics { commit_lsn: Gauge, + // WAL-related metrics are in WalStorageMetrics } impl SafeKeeperMetrics { @@ -443,7 +489,7 @@ pub struct SafeKeeper { /// not-yet-flushed pairs of same named fields in s.* pub commit_lsn: Lsn, - pub truncate_lsn: Lsn, + pub peer_horizon_lsn: Lsn, pub s: SafeKeeperState, // persistent part pub control_store: CTRL, @@ -462,16 +508,14 @@ where wal_store: WAL, state: SafeKeeperState, ) -> SafeKeeper { - if state.server.timeline_id != ZTimelineId::from([0u8; 16]) - && ztli != state.server.timeline_id - { - panic!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.server.timeline_id); + if state.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.timeline_id { + panic!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.timeline_id); } SafeKeeper { - metrics: SafeKeeperMetrics::new(state.server.tenant_id, ztli, state.commit_lsn), + metrics: SafeKeeperMetrics::new(state.tenant_id, ztli, state.commit_lsn), commit_lsn: state.commit_lsn, - truncate_lsn: state.truncate_lsn, + peer_horizon_lsn: state.peer_horizon_lsn, s: state, control_store, wal_store, @@ -532,12 +576,24 @@ where msg.pg_version, self.s.server.pg_version ); } + if msg.tenant_id != self.s.tenant_id { + bail!( + "invalid tenant ID, got {}, expected {}", + msg.tenant_id, + self.s.tenant_id + ); + } + if msg.ztli != self.s.timeline_id { + bail!( + "invalid timeline ID, got {}, expected {}", + msg.ztli, + self.s.timeline_id + ); + } // set basic info about server, if not yet // TODO: verify that is doesn't change after self.s.server.system_id = msg.system_id; - self.s.server.tenant_id = msg.tenant_id; - self.s.server.timeline_id = msg.ztli; self.s.server.wal_seg_size = msg.wal_seg_size; self.control_store .persist(&self.s) @@ -568,7 +624,7 @@ where term: self.s.acceptor_state.term, vote_given: false as u64, flush_lsn: self.wal_store.flush_lsn(), - truncate_lsn: self.s.truncate_lsn, + truncate_lsn: self.s.peer_horizon_lsn, term_history: self.get_term_history(), }; if self.s.acceptor_state.term < msg.term { @@ -655,10 +711,11 @@ where if !msg.wal_data.is_empty() { self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?; - // If this was the first record we ever receieved, remember LSN to help - // find_end_of_wal skip the hole in the beginning. - if self.s.wal_start_lsn == Lsn(0) { - self.s.wal_start_lsn = msg.h.begin_lsn; + // If this was the first record we ever receieved, initialize + // commit_lsn to help find_end_of_wal skip the hole in the + // beginning. + if self.s.commit_lsn == Lsn(0) { + self.s.commit_lsn = msg.h.begin_lsn; sync_control_file = true; require_flush = true; } @@ -685,15 +742,15 @@ where .set(u64::from(self.commit_lsn) as f64); } - self.truncate_lsn = msg.h.truncate_lsn; + self.peer_horizon_lsn = msg.h.truncate_lsn; // Update truncate and commit LSN in control file. // To avoid negative impact on performance of extra fsync, do it only // when truncate_lsn delta exceeds WAL segment size. sync_control_file |= - self.s.truncate_lsn + (self.s.server.wal_seg_size as u64) < self.truncate_lsn; + self.s.peer_horizon_lsn + (self.s.server.wal_seg_size as u64) < self.peer_horizon_lsn; if sync_control_file { self.s.commit_lsn = self.commit_lsn; - self.s.truncate_lsn = self.truncate_lsn; + self.s.peer_horizon_lsn = self.peer_horizon_lsn; } if sync_control_file { @@ -774,11 +831,11 @@ mod tests { #[test] fn test_voting() { let storage = InMemoryState { - persisted_state: SafeKeeperState::new(), + persisted_state: SafeKeeperState::empty(), }; let wal_store = DummyWalStore { lsn: Lsn(0) }; let ztli = ZTimelineId::from([0u8; 16]); - let mut sk = SafeKeeper::new(ztli, storage, wal_store, SafeKeeperState::new()); + let mut sk = SafeKeeper::new(ztli, storage, wal_store, SafeKeeperState::empty()); // check voting for 1 is ok let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); @@ -806,11 +863,11 @@ mod tests { #[test] fn test_epoch_switch() { let storage = InMemoryState { - persisted_state: SafeKeeperState::new(), + persisted_state: SafeKeeperState::empty(), }; let wal_store = DummyWalStore { lsn: Lsn(0) }; let ztli = ZTimelineId::from([0u8; 16]); - let mut sk = SafeKeeper::new(ztli, storage, wal_store, SafeKeeperState::new()); + let mut sk = SafeKeeper::new(ztli, storage, wal_store, SafeKeeperState::empty()); let mut ar_hdr = AppendRequestHeader { term: 1, diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index c639e81b79..ea8308b95e 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -1,7 +1,7 @@ //! This module contains timeline id -> safekeeper state map with file-backed //! persistence and support for interaction between sending and receiving wal. -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use lazy_static::lazy_static; @@ -9,22 +9,24 @@ use std::cmp::{max, min}; use std::collections::HashMap; use std::fs::{self}; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Condvar, Mutex, MutexGuard}; use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; use tracing::*; use zenith_utils::lsn::Lsn; -use zenith_utils::zid::ZTenantTimelineId; +use zenith_utils::zid::{ZNodeId, ZTenantTimelineId}; use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; -use crate::control_file::{self, CreateControlFile}; +use crate::control_file; +use crate::control_file::Storage as cf_storage; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, }; use crate::send_wal::HotStandbyFeedback; -use crate::wal_storage::{self, Storage}; +use crate::wal_storage; +use crate::wal_storage::Storage as wal_storage_iface; use crate::SafeKeeperConf; use zenith_utils::pq_proto::ZenithFeedback; @@ -87,21 +89,39 @@ struct SharedState { } impl SharedState { - /// Restore SharedState from control file. - /// If create=false and file doesn't exist, bails out. - fn create_restore( + /// Initialize timeline state, creating control file + fn create( conf: &SafeKeeperConf, zttid: &ZTenantTimelineId, - create: CreateControlFile, + peer_ids: Vec, ) -> Result { - let state = control_file::FileStorage::load_control_file_conf(conf, zttid, create) + let state = SafeKeeperState::new(zttid, peer_ids); + let control_store = control_file::FileStorage::new(zttid, conf); + let wal_store = wal_storage::PhysicalStorage::new(zttid, conf); + let mut sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store, state); + sk.control_store.persist(&sk.s)?; + + Ok(Self { + notified_commit_lsn: Lsn(0), + sk, + replicas: Vec::new(), + active: false, + num_computes: 0, + pageserver_connstr: None, + }) + } + + /// Restore SharedState from control file. + /// If file doesn't exist, bails out. + fn restore(conf: &SafeKeeperConf, zttid: &ZTenantTimelineId) -> Result { + let state = control_file::FileStorage::load_control_file_conf(conf, zttid) .context("failed to load from control file")?; let control_store = control_file::FileStorage::new(zttid, conf); let wal_store = wal_storage::PhysicalStorage::new(zttid, conf); - info!("timeline {} created or restored", zttid.timeline_id); + info!("timeline {} restored", zttid.timeline_id); Ok(Self { notified_commit_lsn: Lsn(0), @@ -418,26 +438,13 @@ impl Timeline { // Utilities needed by various Connection-like objects pub trait TimelineTools { - fn set( - &mut self, - conf: &SafeKeeperConf, - zttid: ZTenantTimelineId, - create: CreateControlFile, - ) -> Result<()>; + fn set(&mut self, conf: &SafeKeeperConf, zttid: ZTenantTimelineId, create: bool) -> Result<()>; fn get(&self) -> &Arc; } impl TimelineTools for Option> { - fn set( - &mut self, - conf: &SafeKeeperConf, - zttid: ZTenantTimelineId, - create: CreateControlFile, - ) -> Result<()> { - // We will only set the timeline once. If it were to ever change, - // anyone who cloned the Arc would be out of date. - assert!(self.is_none()); + fn set(&mut self, conf: &SafeKeeperConf, zttid: ZTenantTimelineId, create: bool) -> Result<()> { *self = Some(GlobalTimelines::get(conf, zttid, create)?); Ok(()) } @@ -456,30 +463,73 @@ lazy_static! { pub struct GlobalTimelines; impl GlobalTimelines { + fn create_internal( + mut timelines: MutexGuard>>, + conf: &SafeKeeperConf, + zttid: ZTenantTimelineId, + peer_ids: Vec, + ) -> Result> { + match timelines.get(&zttid) { + Some(_) => bail!("timeline {} already exists", zttid), + None => { + // TODO: check directory existence + let dir = conf.timeline_dir(&zttid); + fs::create_dir_all(dir)?; + let shared_state = SharedState::create(conf, &zttid, peer_ids) + .context("failed to create shared state")?; + + let new_tli = Arc::new(Timeline::new(zttid, shared_state)); + timelines.insert(zttid, Arc::clone(&new_tli)); + Ok(new_tli) + } + } + } + + pub fn create( + conf: &SafeKeeperConf, + zttid: ZTenantTimelineId, + peer_ids: Vec, + ) -> Result> { + let timelines = TIMELINES.lock().unwrap(); + GlobalTimelines::create_internal(timelines, conf, zttid, peer_ids) + } + /// Get a timeline with control file loaded from the global TIMELINES map. - /// If control file doesn't exist and create=false, bails out. + /// If control file doesn't exist, bails out. pub fn get( conf: &SafeKeeperConf, zttid: ZTenantTimelineId, - create: CreateControlFile, + create: bool, ) -> Result> { let mut timelines = TIMELINES.lock().unwrap(); match timelines.get(&zttid) { Some(result) => Ok(Arc::clone(result)), None => { - if let CreateControlFile::True = create { - let dir = conf.timeline_dir(&zttid); - info!( - "creating timeline dir {}, create is {:?}", - dir.display(), - create - ); - fs::create_dir_all(dir)?; - } + let shared_state = + SharedState::restore(conf, &zttid).context("failed to restore shared state"); - let shared_state = SharedState::create_restore(conf, &zttid, create) - .context("failed to restore shared state")?; + let shared_state = match shared_state { + Ok(shared_state) => shared_state, + Err(error) => { + // TODO: always create timeline explicitly + if error + .root_cause() + .to_string() + .contains("No such file or directory") + && create + { + return GlobalTimelines::create_internal( + timelines, + conf, + zttid, + vec![], + ); + } else { + return Err(error); + } + } + }; let new_tli = Arc::new(Timeline::new(zttid, shared_state)); timelines.insert(zttid, Arc::clone(&new_tli)); diff --git a/walkeeper/src/wal_storage.rs b/walkeeper/src/wal_storage.rs index 73eccd0ae8..7cef525bee 100644 --- a/walkeeper/src/wal_storage.rs +++ b/walkeeper/src/wal_storage.rs @@ -301,7 +301,8 @@ impl Storage for PhysicalStorage { /// allows to postpone its initialization. fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()> { if state.server.wal_seg_size == 0 { - // wal_seg_size is still unknown + // wal_seg_size is still unknown. This is dead path normally, should + // be used only in tests. return Ok(()); } @@ -315,9 +316,13 @@ impl Storage for PhysicalStorage { let wal_seg_size = state.server.wal_seg_size as usize; self.wal_seg_size = Some(wal_seg_size); - // we need to read WAL from disk to know which LSNs are stored on disk - self.write_lsn = - Lsn(find_end_of_wal(&self.timeline_dir, wal_seg_size, true, state.wal_start_lsn)?.0); + // Find out where stored WAL ends, starting at commit_lsn which is a + // known recent record boundary (unless we don't have WAL at all). + self.write_lsn = if state.commit_lsn == Lsn(0) { + Lsn(0) + } else { + Lsn(find_end_of_wal(&self.timeline_dir, wal_seg_size, true, state.commit_lsn)?.0) + }; self.write_record_lsn = self.write_lsn; @@ -326,11 +331,13 @@ impl Storage for PhysicalStorage { self.update_flush_lsn(); info!( - "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, truncate_lsn={}", - self.zttid.timeline_id, self.flush_record_lsn, state.commit_lsn, state.truncate_lsn, + "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}", + self.zttid.timeline_id, self.flush_record_lsn, state.commit_lsn, state.peer_horizon_lsn, ); - if self.flush_record_lsn < state.commit_lsn || self.flush_record_lsn < state.truncate_lsn { - warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or truncate_lsn from control file", self.zttid.timeline_id); + if self.flush_record_lsn < state.commit_lsn + || self.flush_record_lsn < state.peer_horizon_lsn + { + warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", self.zttid.timeline_id); } Ok(()) diff --git a/zenith_utils/src/zid.rs b/zenith_utils/src/zid.rs index 813eb3f8f4..a740d4fb48 100644 --- a/zenith_utils/src/zid.rs +++ b/zenith_utils/src/zid.rs @@ -334,6 +334,10 @@ impl ZTenantTimelineId { pub fn generate() -> Self { Self::new(ZTenantId::generate(), ZTimelineId::generate()) } + + pub fn empty() -> Self { + Self::new(ZTenantId::from([0u8; 16]), ZTimelineId::from([0u8; 16])) + } } impl fmt::Display for ZTenantTimelineId {