diff --git a/walkeeper/src/handler.rs b/walkeeper/src/handler.rs
index b4fcbdfa28..21890764db 100644
--- a/walkeeper/src/handler.rs
+++ b/walkeeper/src/handler.rs
@@ -16,7 +16,7 @@ use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID};
-use zenith_utils::zid::{ZTenantId, ZTimelineId};
+use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
use crate::callmemaybe::CallmeEvent;
use crate::timeline::CreateControlFile;
@@ -116,8 +116,11 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
| SafekeeperPostgresCommand::JSONCtrl { .. } => CreateControlFile::True,
_ => CreateControlFile::False,
};
- self.timeline
- .set(&self.conf, tenantid, timelineid, create_control_file)?;
+ self.timeline.set(
+ &self.conf,
+ ZTenantTimelineId::new(tenantid, timelineid),
+ create_control_file,
+ )?;
}
}
}
diff --git a/walkeeper/src/http/routes.rs b/walkeeper/src/http/routes.rs
index dc1905a5a7..c5c4c1fc4b 100644
--- a/walkeeper/src/http/routes.rs
+++ b/walkeeper/src/http/routes.rs
@@ -6,6 +6,7 @@ use serde::Serializer;
use std::fmt::Display;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
+use zenith_utils::zid::ZTenantTimelineId;
use crate::safekeeper::Term;
use crate::safekeeper::TermHistory;
@@ -65,16 +66,13 @@ struct TimelineStatus {
/// Report info about timeline.
async fn timeline_status_handler(request: Request
) -> Result, ApiError> {
- let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
- let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
+ let zttid = ZTenantTimelineId::new(
+ parse_request_param(&request, "tenant_id")?,
+ parse_request_param(&request, "timeline_id")?,
+ );
- let tli = GlobalTimelines::get(
- get_conf(&request),
- tenant_id,
- timeline_id,
- CreateControlFile::False,
- )
- .map_err(ApiError::from_err)?;
+ let tli = GlobalTimelines::get(get_conf(&request), zttid, CreateControlFile::False)
+ .map_err(ApiError::from_err)?;
let sk_state = tli.get_info();
let flush_lsn = tli.get_end_of_wal();
@@ -85,8 +83,8 @@ async fn timeline_status_handler(request: Request) -> Result PathBuf {
- self.workdir.join(timelineid.to_string())
+ pub fn timeline_dir(&self, zttid: &ZTenantTimelineId) -> PathBuf {
+ self.workdir
+ .join(zttid.tenant_id.to_string())
+ .join(zttid.timeline_id.to_string())
}
}
diff --git a/walkeeper/src/receive_wal.rs b/walkeeper/src/receive_wal.rs
index caf8ed5311..863aaafef1 100644
--- a/walkeeper/src/receive_wal.rs
+++ b/walkeeper/src/receive_wal.rs
@@ -105,7 +105,7 @@ impl<'pg> ReceiveWalConn<'pg> {
// Need to establish replication channel with page server.
// Add far as replication in postgres is initiated by receiver
// we should use callmemaybe mechanism.
- let timeline_id = spg.timeline.get().timeline_id;
+ let timeline_id = spg.timeline.get().zttid.timeline_id;
let subscription_key = SubscriptionStateKey::new(
tenant_id,
timeline_id,
diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs
index d9d6bdf680..d96cdf5171 100644
--- a/walkeeper/src/send_wal.rs
+++ b/walkeeper/src/send_wal.rs
@@ -284,8 +284,8 @@ impl ReplicationConn {
None
} else {
let pageserver_connstr = pageserver_connstr.clone().expect("there should be a pageserver connection string since this is not a wal_proposer_recovery");
- let timeline_id = spg.timeline.get().timeline_id;
let tenant_id = spg.ztenantid.unwrap();
+ let timeline_id = spg.timeline.get().zttid.timeline_id;
let tx_clone = spg.tx.clone();
let subscription_key =
SubscriptionStateKey::new(tenant_id, timeline_id, pageserver_connstr.clone());
@@ -330,10 +330,10 @@ impl ReplicationConn {
// and this code is not reachable
let pageserver_connstr = pageserver_connstr
.expect("there should be a pageserver connection string");
- let timelineid = spg.timeline.get().timeline_id;
let tenant_id = spg.ztenantid.unwrap();
+ let timeline_id = spg.timeline.get().zttid.timeline_id;
let subscription_key =
- SubscriptionStateKey::new(tenant_id, timelineid, pageserver_connstr);
+ SubscriptionStateKey::new(tenant_id, timeline_id, pageserver_connstr);
spg.tx
.send(CallmeEvent::Unsubscribe(subscription_key))
.unwrap_or_else(|e| {
@@ -361,8 +361,10 @@ impl ReplicationConn {
// Open a new file.
let segno = start_pos.segment_number(wal_seg_size);
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
- let timeline_id = spg.timeline.get().timeline_id;
- let wal_file_path = spg.conf.timeline_dir(&timeline_id).join(wal_file_name);
+ let wal_file_path = spg
+ .conf
+ .timeline_dir(&spg.timeline.get().zttid)
+ .join(wal_file_name);
Self::open_wal_file(&wal_file_path)?
}
};
diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs
index 12f6fd853f..14ef7166f3 100644
--- a/walkeeper/src/timeline.rs
+++ b/walkeeper/src/timeline.rs
@@ -4,7 +4,7 @@
use anyhow::{bail, ensure, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use lazy_static::lazy_static;
-use postgres_ffi::xlog_utils::{find_end_of_wal, PG_TLI};
+use postgres_ffi::xlog_utils::{find_end_of_wal, XLogSegNo, PG_TLI};
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
@@ -16,7 +16,7 @@ use tracing::*;
use zenith_metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};
use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn;
-use zenith_utils::zid::{ZTenantId, ZTimelineId};
+use zenith_utils::zid::ZTenantTimelineId;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo,
@@ -164,14 +164,14 @@ impl SharedState {
/// If create=false and file doesn't exist, bails out.
fn create_restore(
conf: &SafeKeeperConf,
- timeline_id: ZTimelineId,
+ zttid: &ZTenantTimelineId,
create: CreateControlFile,
) -> Result {
- let state = FileStorage::load_control_file_conf(conf, timeline_id, create)
+ let state = FileStorage::load_control_file_conf(conf, zttid, create)
.context("failed to load from control file")?;
- let file_storage = FileStorage::new(timeline_id, conf);
+ let file_storage = FileStorage::new(zttid, conf);
let flush_lsn = if state.server.wal_seg_size != 0 {
- let wal_dir = conf.timeline_dir(&timeline_id);
+ let wal_dir = conf.timeline_dir(zttid);
find_end_of_wal(
&wal_dir,
state.server.wal_seg_size as usize,
@@ -194,16 +194,16 @@ impl SharedState {
/// Database instance (tenant)
pub struct Timeline {
- pub timeline_id: ZTimelineId,
+ pub zttid: ZTenantTimelineId,
mutex: Mutex,
/// conditional variable used to notify wal senders
cond: Condvar,
}
impl Timeline {
- fn new(timelineid: ZTimelineId, shared_state: SharedState) -> Timeline {
+ fn new(zttid: ZTenantTimelineId, shared_state: SharedState) -> Timeline {
Timeline {
- timeline_id: timelineid,
+ zttid,
mutex: Mutex::new(shared_state),
cond: Condvar::new(),
}
@@ -349,8 +349,7 @@ pub trait TimelineTools {
fn set(
&mut self,
conf: &SafeKeeperConf,
- tenant_id: ZTenantId,
- timeline_id: ZTimelineId,
+ zttid: ZTenantTimelineId,
create: CreateControlFile,
) -> Result<()>;
@@ -361,14 +360,13 @@ impl TimelineTools for Option> {
fn set(
&mut self,
conf: &SafeKeeperConf,
- tenant_id: ZTenantId,
- timeline_id: ZTimelineId,
+ zttid: ZTenantTimelineId,
create: CreateControlFile,
) -> Result<()> {
// We will only set the timeline once. If it were to ever change,
// anyone who cloned the Arc would be out of date.
assert!(self.is_none());
- *self = Some(GlobalTimelines::get(conf, tenant_id, timeline_id, create)?);
+ *self = Some(GlobalTimelines::get(conf, zttid, create)?);
Ok(())
}
@@ -378,7 +376,7 @@ impl TimelineTools for Option> {
}
lazy_static! {
- pub static ref TIMELINES: Mutex>> =
+ pub static ref TIMELINES: Mutex>> =
Mutex::new(HashMap::new());
}
@@ -390,26 +388,29 @@ impl GlobalTimelines {
/// If control file doesn't exist and create=false, bails out.
pub fn get(
conf: &SafeKeeperConf,
- tenant_id: ZTenantId,
- timeline_id: ZTimelineId,
+ zttid: ZTenantTimelineId,
create: CreateControlFile,
) -> Result> {
let mut timelines = TIMELINES.lock().unwrap();
- match timelines.get(&(tenant_id, timeline_id)) {
+ match timelines.get(&zttid) {
Some(result) => Ok(Arc::clone(result)),
None => {
- info!(
- "creating timeline dir {}, create is {:?}",
- timeline_id, create
- );
- fs::create_dir_all(timeline_id.to_string())?;
+ if let CreateControlFile::True = create {
+ let dir = conf.timeline_dir(&zttid);
+ info!(
+ "creating timeline dir {}, create is {:?}",
+ dir.display(),
+ create
+ );
+ fs::create_dir_all(dir)?;
+ }
- let shared_state = SharedState::create_restore(conf, timeline_id, create)
+ let shared_state = SharedState::create_restore(conf, &zttid, create)
.context("failed to restore shared state")?;
- let new_tli = Arc::new(Timeline::new(timeline_id, shared_state));
- timelines.insert((tenant_id, timeline_id), Arc::clone(&new_tli));
+ let new_tli = Arc::new(Timeline::new(zttid, shared_state));
+ timelines.insert(zttid, Arc::clone(&new_tli));
Ok(new_tli)
}
}
@@ -425,9 +426,9 @@ pub struct FileStorage {
}
impl FileStorage {
- fn new(timeline_id: ZTimelineId, conf: &SafeKeeperConf) -> FileStorage {
- let timeline_dir = conf.timeline_dir(&timeline_id);
- let timelineid_str = format!("{}", timeline_id);
+ fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> FileStorage {
+ let timeline_dir = conf.timeline_dir(zttid);
+ let timelineid_str = format!("{}", zttid);
FileStorage {
timeline_dir,
conf: conf.clone(),
@@ -456,12 +457,13 @@ impl FileStorage {
upgrade_control_file(buf, version)
}
+ // Load control file for given zttid at path specified by conf.
fn load_control_file_conf(
conf: &SafeKeeperConf,
- timeline_id: ZTimelineId,
+ zttid: &ZTenantTimelineId,
create: CreateControlFile,
) -> Result {
- let path = conf.timeline_dir(&timeline_id).join(CONTROL_FILE_NAME);
+ let path = conf.timeline_dir(zttid).join(CONTROL_FILE_NAME);
Self::load_control_file(path, create)
}
@@ -526,6 +528,14 @@ impl FileStorage {
};
Ok(state)
}
+
+ /// Helper returning full path to WAL segment file and its .partial brother.
+ fn wal_file_paths(&self, segno: XLogSegNo, wal_seg_size: usize) -> (PathBuf, PathBuf) {
+ let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
+ let wal_file_path = self.timeline_dir.join(wal_file_name.clone());
+ let wal_file_partial_path = self.timeline_dir.join(wal_file_name + ".partial");
+ (wal_file_path, wal_file_partial_path)
+ }
}
impl Storage for FileStorage {
@@ -594,7 +604,6 @@ impl Storage for FileStorage {
let mut start_pos = startpos;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_seg_size = server.wal_seg_size as usize;
- let ztli = server.timeline_id;
/* Extract WAL location for this block */
let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize;
@@ -614,14 +623,7 @@ impl Storage for FileStorage {
/* Open file */
let segno = start_pos.segment_number(wal_seg_size);
- // note: we basically don't support changing pg timeline
- let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
- let wal_file_path = self.conf.timeline_dir(&ztli).join(wal_file_name.clone());
- let wal_file_partial_path = self
- .conf
- .timeline_dir(&ztli)
- .join(wal_file_name.clone() + ".partial");
-
+ let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size);
{
let mut wal_file: File;
/* Try to open already completed segment */
@@ -682,25 +684,13 @@ impl Storage for FileStorage {
let partial;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_seg_size = server.wal_seg_size as usize;
- let ztli = server.timeline_id;
/* Extract WAL location for this block */
let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
/* Open file */
let mut segno = end_pos.segment_number(wal_seg_size);
- // note: we basically don't support changing pg timeline
- let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
- let wal_file_path = self
- .conf
- .workdir
- .join(ztli.to_string())
- .join(wal_file_name.clone());
- let wal_file_partial_path = self
- .conf
- .workdir
- .join(ztli.to_string())
- .join(wal_file_name + ".partial");
+ let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size);
{
let mut wal_file: File;
/* Try to open already completed segment */
@@ -731,17 +721,7 @@ impl Storage for FileStorage {
// Remove all subsequent segments
loop {
segno += 1;
- let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
- let wal_file_path = self
- .conf
- .workdir
- .join(ztli.to_string())
- .join(wal_file_name.clone());
- let wal_file_partial_path = self
- .conf
- .workdir
- .join(ztli.to_string())
- .join(wal_file_name.clone() + ".partial");
+ let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size);
// TODO: better use fs::try_exists which is currenty avaialble only in nightly build
if wal_file_path.exists() {
fs::remove_file(&wal_file_path)?;
@@ -761,11 +741,11 @@ mod test {
use crate::{
safekeeper::{SafeKeeperState, Storage},
timeline::{CreateControlFile, CONTROL_FILE_NAME},
- SafeKeeperConf,
+ SafeKeeperConf, ZTenantTimelineId,
};
use anyhow::Result;
use std::fs;
- use zenith_utils::{lsn::Lsn, zid::ZTimelineId};
+ use zenith_utils::lsn::Lsn;
fn stub_conf() -> SafeKeeperConf {
let workdir = tempfile::tempdir().unwrap().into_path();
@@ -777,31 +757,30 @@ mod test {
fn load_from_control_file(
conf: &SafeKeeperConf,
- timeline_id: ZTimelineId,
+ zttid: &ZTenantTimelineId,
create: CreateControlFile,
) -> Result<(FileStorage, SafeKeeperState)> {
- fs::create_dir_all(&conf.timeline_dir(&timeline_id))
- .expect("failed to create timeline dir");
+ fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir");
Ok((
- FileStorage::new(timeline_id, conf),
- FileStorage::load_control_file_conf(conf, timeline_id, create)?,
+ FileStorage::new(zttid, conf),
+ FileStorage::load_control_file_conf(conf, zttid, create)?,
))
}
#[test]
fn test_read_write_safekeeper_state() {
let conf = stub_conf();
- let timeline_id = ZTimelineId::generate();
+ let zttid = ZTenantTimelineId::generate();
{
let (mut storage, mut state) =
- load_from_control_file(&conf, timeline_id, CreateControlFile::True)
+ load_from_control_file(&conf, &zttid, CreateControlFile::True)
.expect("failed to read state");
// change something
state.wal_start_lsn = Lsn(42);
storage.persist(&state).expect("failed to persist state");
}
- let (_, state) = load_from_control_file(&conf, timeline_id, CreateControlFile::False)
+ let (_, state) = load_from_control_file(&conf, &zttid, CreateControlFile::False)
.expect("failed to read state");
assert_eq!(state.wal_start_lsn, Lsn(42));
}
@@ -809,21 +788,21 @@ mod test {
#[test]
fn test_safekeeper_state_checksum_mismatch() {
let conf = stub_conf();
- let timeline_id = ZTimelineId::generate();
+ let zttid = ZTenantTimelineId::generate();
{
let (mut storage, mut state) =
- load_from_control_file(&conf, timeline_id, CreateControlFile::True)
+ load_from_control_file(&conf, &zttid, CreateControlFile::True)
.expect("failed to read state");
// change something
state.wal_start_lsn = Lsn(42);
storage.persist(&state).expect("failed to persist state");
}
- let control_path = conf.timeline_dir(&timeline_id).join(CONTROL_FILE_NAME);
+ let control_path = conf.timeline_dir(&zttid).join(CONTROL_FILE_NAME);
let mut data = fs::read(&control_path).unwrap();
data[0] += 1; // change the first byte of the file to fail checksum validation
fs::write(&control_path, &data).expect("failed to write control file");
- match load_from_control_file(&conf, timeline_id, CreateControlFile::False) {
+ match load_from_control_file(&conf, &zttid, CreateControlFile::False) {
Err(err) => assert!(err
.to_string()
.contains("safekeeper control file checksum mismatch")),
diff --git a/zenith_utils/src/zid.rs b/zenith_utils/src/zid.rs
index c5f63275c4..7ff3c0cffd 100644
--- a/zenith_utils/src/zid.rs
+++ b/zenith_utils/src/zid.rs
@@ -195,6 +195,32 @@ pub mod opt_display_serde {
}
}
+// A pair uniquely identifying Zenith instance.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct ZTenantTimelineId {
+ pub tenant_id: ZTenantId,
+ pub timeline_id: ZTimelineId,
+}
+
+impl ZTenantTimelineId {
+ pub fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> Self {
+ ZTenantTimelineId {
+ tenant_id,
+ timeline_id,
+ }
+ }
+
+ pub fn generate() -> Self {
+ Self::new(ZTenantId::generate(), ZTimelineId::generate())
+ }
+}
+
+impl fmt::Display for ZTenantTimelineId {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "{}-{}", self.tenant_id, self.timeline_id)
+ }
+}
+
#[cfg(test)]
mod tests {
use std::fmt::Display;