From 86045ac36c1108b15b59a0eb103f2d6cd29a156c Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 12 Jan 2022 23:07:35 +0300 Subject: [PATCH] Prefix per-cluster directory with ztenant_id in safekeeper. Currently ztimelineids are unique, but all APIs accept the pair, so let's keep it everywhere for uniformity. Carry around ZTTId containing both ZTenantId and ZTimelineId for simplicity. (existing clusters on staging ought to be preprocessed for that) --- walkeeper/src/handler.rs | 9 ++- walkeeper/src/http/routes.rs | 20 +++--- walkeeper/src/lib.rs | 8 ++- walkeeper/src/receive_wal.rs | 2 +- walkeeper/src/send_wal.rs | 12 ++-- walkeeper/src/timeline.rs | 135 +++++++++++++++-------------------- zenith_utils/src/zid.rs | 26 +++++++ 7 files changed, 111 insertions(+), 101 deletions(-) diff --git a/walkeeper/src/handler.rs b/walkeeper/src/handler.rs index b4fcbdfa28..21890764db 100644 --- a/walkeeper/src/handler.rs +++ b/walkeeper/src/handler.rs @@ -16,7 +16,7 @@ use zenith_utils::lsn::Lsn; use zenith_utils::postgres_backend; use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID}; -use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; use crate::callmemaybe::CallmeEvent; use crate::timeline::CreateControlFile; @@ -116,8 +116,11 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { | SafekeeperPostgresCommand::JSONCtrl { .. } => CreateControlFile::True, _ => CreateControlFile::False, }; - self.timeline - .set(&self.conf, tenantid, timelineid, create_control_file)?; + self.timeline.set( + &self.conf, + ZTenantTimelineId::new(tenantid, timelineid), + create_control_file, + )?; } } } diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs index dc1905a5a7..c5c4c1fc4b 100644 --- a/walkeeper/src/http/routes.rs +++ b/walkeeper/src/http/routes.rs @@ -6,6 +6,7 @@ use serde::Serializer; use std::fmt::Display; use std::sync::Arc; use zenith_utils::lsn::Lsn; +use zenith_utils::zid::ZTenantTimelineId; use crate::safekeeper::Term; use crate::safekeeper::TermHistory; @@ -65,16 +66,13 @@ struct TimelineStatus { /// Report info about timeline. async fn timeline_status_handler(request: Request) -> Result, ApiError> { - let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; - let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?; + let zttid = ZTenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); - let tli = GlobalTimelines::get( - get_conf(&request), - tenant_id, - timeline_id, - CreateControlFile::False, - ) - .map_err(ApiError::from_err)?; + let tli = GlobalTimelines::get(get_conf(&request), zttid, CreateControlFile::False) + .map_err(ApiError::from_err)?; let sk_state = tli.get_info(); let flush_lsn = tli.get_end_of_wal(); @@ -85,8 +83,8 @@ async fn timeline_status_handler(request: Request) -> Result PathBuf { - self.workdir.join(timelineid.to_string()) + pub fn timeline_dir(&self, zttid: &ZTenantTimelineId) -> PathBuf { + self.workdir + .join(zttid.tenant_id.to_string()) + .join(zttid.timeline_id.to_string()) } } diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs index caf8ed5311..863aaafef1 100644 --- a/walkeeper/src/receive_wal.rs +++ b/walkeeper/src/receive_wal.rs @@ -105,7 +105,7 @@ impl<'pg> ReceiveWalConn<'pg> { // Need to establish replication channel with page server. // Add far as replication in postgres is initiated by receiver // we should use callmemaybe mechanism. - let timeline_id = spg.timeline.get().timeline_id; + let timeline_id = spg.timeline.get().zttid.timeline_id; let subscription_key = SubscriptionStateKey::new( tenant_id, timeline_id, diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index d9d6bdf680..d96cdf5171 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -284,8 +284,8 @@ impl ReplicationConn { None } else { let pageserver_connstr = pageserver_connstr.clone().expect("there should be a pageserver connection string since this is not a wal_proposer_recovery"); - let timeline_id = spg.timeline.get().timeline_id; let tenant_id = spg.ztenantid.unwrap(); + let timeline_id = spg.timeline.get().zttid.timeline_id; let tx_clone = spg.tx.clone(); let subscription_key = SubscriptionStateKey::new(tenant_id, timeline_id, pageserver_connstr.clone()); @@ -330,10 +330,10 @@ impl ReplicationConn { // and this code is not reachable let pageserver_connstr = pageserver_connstr .expect("there should be a pageserver connection string"); - let timelineid = spg.timeline.get().timeline_id; let tenant_id = spg.ztenantid.unwrap(); + let timeline_id = spg.timeline.get().zttid.timeline_id; let subscription_key = - SubscriptionStateKey::new(tenant_id, timelineid, pageserver_connstr); + SubscriptionStateKey::new(tenant_id, timeline_id, pageserver_connstr); spg.tx .send(CallmeEvent::Unsubscribe(subscription_key)) .unwrap_or_else(|e| { @@ -361,8 +361,10 @@ impl ReplicationConn { // Open a new file. let segno = start_pos.segment_number(wal_seg_size); let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); - let timeline_id = spg.timeline.get().timeline_id; - let wal_file_path = spg.conf.timeline_dir(&timeline_id).join(wal_file_name); + let wal_file_path = spg + .conf + .timeline_dir(&spg.timeline.get().zttid) + .join(wal_file_name); Self::open_wal_file(&wal_file_path)? } }; diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index 12f6fd853f..14ef7166f3 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -4,7 +4,7 @@ use anyhow::{bail, ensure, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use lazy_static::lazy_static; -use postgres_ffi::xlog_utils::{find_end_of_wal, PG_TLI}; +use postgres_ffi::xlog_utils::{find_end_of_wal, XLogSegNo, PG_TLI}; use std::cmp::{max, min}; use std::collections::HashMap; use std::fs::{self, File, OpenOptions}; @@ -16,7 +16,7 @@ use tracing::*; use zenith_metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS}; use zenith_utils::bin_ser::LeSer; use zenith_utils::lsn::Lsn; -use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use zenith_utils::zid::ZTenantTimelineId; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo, @@ -164,14 +164,14 @@ impl SharedState { /// If create=false and file doesn't exist, bails out. fn create_restore( conf: &SafeKeeperConf, - timeline_id: ZTimelineId, + zttid: &ZTenantTimelineId, create: CreateControlFile, ) -> Result { - let state = FileStorage::load_control_file_conf(conf, timeline_id, create) + let state = FileStorage::load_control_file_conf(conf, zttid, create) .context("failed to load from control file")?; - let file_storage = FileStorage::new(timeline_id, conf); + let file_storage = FileStorage::new(zttid, conf); let flush_lsn = if state.server.wal_seg_size != 0 { - let wal_dir = conf.timeline_dir(&timeline_id); + let wal_dir = conf.timeline_dir(zttid); find_end_of_wal( &wal_dir, state.server.wal_seg_size as usize, @@ -194,16 +194,16 @@ impl SharedState { /// Database instance (tenant) pub struct Timeline { - pub timeline_id: ZTimelineId, + pub zttid: ZTenantTimelineId, mutex: Mutex, /// conditional variable used to notify wal senders cond: Condvar, } impl Timeline { - fn new(timelineid: ZTimelineId, shared_state: SharedState) -> Timeline { + fn new(zttid: ZTenantTimelineId, shared_state: SharedState) -> Timeline { Timeline { - timeline_id: timelineid, + zttid, mutex: Mutex::new(shared_state), cond: Condvar::new(), } @@ -349,8 +349,7 @@ pub trait TimelineTools { fn set( &mut self, conf: &SafeKeeperConf, - tenant_id: ZTenantId, - timeline_id: ZTimelineId, + zttid: ZTenantTimelineId, create: CreateControlFile, ) -> Result<()>; @@ -361,14 +360,13 @@ impl TimelineTools for Option> { fn set( &mut self, conf: &SafeKeeperConf, - tenant_id: ZTenantId, - timeline_id: ZTimelineId, + zttid: ZTenantTimelineId, create: CreateControlFile, ) -> Result<()> { // We will only set the timeline once. If it were to ever change, // anyone who cloned the Arc would be out of date. assert!(self.is_none()); - *self = Some(GlobalTimelines::get(conf, tenant_id, timeline_id, create)?); + *self = Some(GlobalTimelines::get(conf, zttid, create)?); Ok(()) } @@ -378,7 +376,7 @@ impl TimelineTools for Option> { } lazy_static! { - pub static ref TIMELINES: Mutex>> = + pub static ref TIMELINES: Mutex>> = Mutex::new(HashMap::new()); } @@ -390,26 +388,29 @@ impl GlobalTimelines { /// If control file doesn't exist and create=false, bails out. pub fn get( conf: &SafeKeeperConf, - tenant_id: ZTenantId, - timeline_id: ZTimelineId, + zttid: ZTenantTimelineId, create: CreateControlFile, ) -> Result> { let mut timelines = TIMELINES.lock().unwrap(); - match timelines.get(&(tenant_id, timeline_id)) { + match timelines.get(&zttid) { Some(result) => Ok(Arc::clone(result)), None => { - info!( - "creating timeline dir {}, create is {:?}", - timeline_id, create - ); - fs::create_dir_all(timeline_id.to_string())?; + if let CreateControlFile::True = create { + let dir = conf.timeline_dir(&zttid); + info!( + "creating timeline dir {}, create is {:?}", + dir.display(), + create + ); + fs::create_dir_all(dir)?; + } - let shared_state = SharedState::create_restore(conf, timeline_id, create) + let shared_state = SharedState::create_restore(conf, &zttid, create) .context("failed to restore shared state")?; - let new_tli = Arc::new(Timeline::new(timeline_id, shared_state)); - timelines.insert((tenant_id, timeline_id), Arc::clone(&new_tli)); + let new_tli = Arc::new(Timeline::new(zttid, shared_state)); + timelines.insert(zttid, Arc::clone(&new_tli)); Ok(new_tli) } } @@ -425,9 +426,9 @@ pub struct FileStorage { } impl FileStorage { - fn new(timeline_id: ZTimelineId, conf: &SafeKeeperConf) -> FileStorage { - let timeline_dir = conf.timeline_dir(&timeline_id); - let timelineid_str = format!("{}", timeline_id); + fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> FileStorage { + let timeline_dir = conf.timeline_dir(zttid); + let timelineid_str = format!("{}", zttid); FileStorage { timeline_dir, conf: conf.clone(), @@ -456,12 +457,13 @@ impl FileStorage { upgrade_control_file(buf, version) } + // Load control file for given zttid at path specified by conf. fn load_control_file_conf( conf: &SafeKeeperConf, - timeline_id: ZTimelineId, + zttid: &ZTenantTimelineId, create: CreateControlFile, ) -> Result { - let path = conf.timeline_dir(&timeline_id).join(CONTROL_FILE_NAME); + let path = conf.timeline_dir(zttid).join(CONTROL_FILE_NAME); Self::load_control_file(path, create) } @@ -526,6 +528,14 @@ impl FileStorage { }; Ok(state) } + + /// Helper returning full path to WAL segment file and its .partial brother. + fn wal_file_paths(&self, segno: XLogSegNo, wal_seg_size: usize) -> (PathBuf, PathBuf) { + let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); + let wal_file_path = self.timeline_dir.join(wal_file_name.clone()); + let wal_file_partial_path = self.timeline_dir.join(wal_file_name + ".partial"); + (wal_file_path, wal_file_partial_path) + } } impl Storage for FileStorage { @@ -594,7 +604,6 @@ impl Storage for FileStorage { let mut start_pos = startpos; const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; let wal_seg_size = server.wal_seg_size as usize; - let ztli = server.timeline_id; /* Extract WAL location for this block */ let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize; @@ -614,14 +623,7 @@ impl Storage for FileStorage { /* Open file */ let segno = start_pos.segment_number(wal_seg_size); - // note: we basically don't support changing pg timeline - let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); - let wal_file_path = self.conf.timeline_dir(&ztli).join(wal_file_name.clone()); - let wal_file_partial_path = self - .conf - .timeline_dir(&ztli) - .join(wal_file_name.clone() + ".partial"); - + let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size); { let mut wal_file: File; /* Try to open already completed segment */ @@ -682,25 +684,13 @@ impl Storage for FileStorage { let partial; const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; let wal_seg_size = server.wal_seg_size as usize; - let ztli = server.timeline_id; /* Extract WAL location for this block */ let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize; /* Open file */ let mut segno = end_pos.segment_number(wal_seg_size); - // note: we basically don't support changing pg timeline - let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); - let wal_file_path = self - .conf - .workdir - .join(ztli.to_string()) - .join(wal_file_name.clone()); - let wal_file_partial_path = self - .conf - .workdir - .join(ztli.to_string()) - .join(wal_file_name + ".partial"); + let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size); { let mut wal_file: File; /* Try to open already completed segment */ @@ -731,17 +721,7 @@ impl Storage for FileStorage { // Remove all subsequent segments loop { segno += 1; - let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size); - let wal_file_path = self - .conf - .workdir - .join(ztli.to_string()) - .join(wal_file_name.clone()); - let wal_file_partial_path = self - .conf - .workdir - .join(ztli.to_string()) - .join(wal_file_name.clone() + ".partial"); + let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size); // TODO: better use fs::try_exists which is currenty avaialble only in nightly build if wal_file_path.exists() { fs::remove_file(&wal_file_path)?; @@ -761,11 +741,11 @@ mod test { use crate::{ safekeeper::{SafeKeeperState, Storage}, timeline::{CreateControlFile, CONTROL_FILE_NAME}, - SafeKeeperConf, + SafeKeeperConf, ZTenantTimelineId, }; use anyhow::Result; use std::fs; - use zenith_utils::{lsn::Lsn, zid::ZTimelineId}; + use zenith_utils::lsn::Lsn; fn stub_conf() -> SafeKeeperConf { let workdir = tempfile::tempdir().unwrap().into_path(); @@ -777,31 +757,30 @@ mod test { fn load_from_control_file( conf: &SafeKeeperConf, - timeline_id: ZTimelineId, + zttid: &ZTenantTimelineId, create: CreateControlFile, ) -> Result<(FileStorage, SafeKeeperState)> { - fs::create_dir_all(&conf.timeline_dir(&timeline_id)) - .expect("failed to create timeline dir"); + fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir"); Ok(( - FileStorage::new(timeline_id, conf), - FileStorage::load_control_file_conf(conf, timeline_id, create)?, + FileStorage::new(zttid, conf), + FileStorage::load_control_file_conf(conf, zttid, create)?, )) } #[test] fn test_read_write_safekeeper_state() { let conf = stub_conf(); - let timeline_id = ZTimelineId::generate(); + let zttid = ZTenantTimelineId::generate(); { let (mut storage, mut state) = - load_from_control_file(&conf, timeline_id, CreateControlFile::True) + load_from_control_file(&conf, &zttid, CreateControlFile::True) .expect("failed to read state"); // change something state.wal_start_lsn = Lsn(42); storage.persist(&state).expect("failed to persist state"); } - let (_, state) = load_from_control_file(&conf, timeline_id, CreateControlFile::False) + let (_, state) = load_from_control_file(&conf, &zttid, CreateControlFile::False) .expect("failed to read state"); assert_eq!(state.wal_start_lsn, Lsn(42)); } @@ -809,21 +788,21 @@ mod test { #[test] fn test_safekeeper_state_checksum_mismatch() { let conf = stub_conf(); - let timeline_id = ZTimelineId::generate(); + let zttid = ZTenantTimelineId::generate(); { let (mut storage, mut state) = - load_from_control_file(&conf, timeline_id, CreateControlFile::True) + load_from_control_file(&conf, &zttid, CreateControlFile::True) .expect("failed to read state"); // change something state.wal_start_lsn = Lsn(42); storage.persist(&state).expect("failed to persist state"); } - let control_path = conf.timeline_dir(&timeline_id).join(CONTROL_FILE_NAME); + let control_path = conf.timeline_dir(&zttid).join(CONTROL_FILE_NAME); let mut data = fs::read(&control_path).unwrap(); data[0] += 1; // change the first byte of the file to fail checksum validation fs::write(&control_path, &data).expect("failed to write control file"); - match load_from_control_file(&conf, timeline_id, CreateControlFile::False) { + match load_from_control_file(&conf, &zttid, CreateControlFile::False) { Err(err) => assert!(err .to_string() .contains("safekeeper control file checksum mismatch")), diff --git a/zenith_utils/src/zid.rs b/zenith_utils/src/zid.rs index c5f63275c4..7ff3c0cffd 100644 --- a/zenith_utils/src/zid.rs +++ b/zenith_utils/src/zid.rs @@ -195,6 +195,32 @@ pub mod opt_display_serde { } } +// A pair uniquely identifying Zenith instance. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ZTenantTimelineId { + pub tenant_id: ZTenantId, + pub timeline_id: ZTimelineId, +} + +impl ZTenantTimelineId { + pub fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> Self { + ZTenantTimelineId { + tenant_id, + timeline_id, + } + } + + pub fn generate() -> Self { + Self::new(ZTenantId::generate(), ZTimelineId::generate()) + } +} + +impl fmt::Display for ZTenantTimelineId { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}-{}", self.tenant_id, self.timeline_id) + } +} + #[cfg(test)] mod tests { use std::fmt::Display;