Prefix per-cluster directory with ztenant_id in safekeeper.

Currently ztimelineids are unique, but all APIs accept the pair, so let's keep
it everywhere for uniformity.

Carry around ZTTId containing both ZTenantId and ZTimelineId for simplicity.

(existing clusters on staging ought to be preprocessed for that)
This commit is contained in:
Arseny Sher
2022-01-12 23:07:35 +03:00
parent 79f0e44a20
commit 86045ac36c
7 changed files with 111 additions and 101 deletions

View File

@@ -16,7 +16,7 @@ use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
use crate::callmemaybe::CallmeEvent;
use crate::timeline::CreateControlFile;
@@ -116,8 +116,11 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
| SafekeeperPostgresCommand::JSONCtrl { .. } => CreateControlFile::True,
_ => CreateControlFile::False,
};
self.timeline
.set(&self.conf, tenantid, timelineid, create_control_file)?;
self.timeline.set(
&self.conf,
ZTenantTimelineId::new(tenantid, timelineid),
create_control_file,
)?;
}
}
}

View File

@@ -6,6 +6,7 @@ use serde::Serializer;
use std::fmt::Display;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZTenantTimelineId;
use crate::safekeeper::Term;
use crate::safekeeper::TermHistory;
@@ -65,16 +66,13 @@ struct TimelineStatus {
/// Report info about timeline.
async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
let zttid = ZTenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
let tli = GlobalTimelines::get(
get_conf(&request),
tenant_id,
timeline_id,
CreateControlFile::False,
)
.map_err(ApiError::from_err)?;
let tli = GlobalTimelines::get(get_conf(&request), zttid, CreateControlFile::False)
.map_err(ApiError::from_err)?;
let sk_state = tli.get_info();
let flush_lsn = tli.get_end_of_wal();
@@ -85,8 +83,8 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
};
let status = TimelineStatus {
tenant_id,
timeline_id,
tenant_id: zttid.tenant_id,
timeline_id: zttid.timeline_id,
acceptor_state: acc_state,
commit_lsn: sk_state.commit_lsn,
truncate_lsn: sk_state.truncate_lsn,

View File

@@ -2,7 +2,7 @@
use std::path::PathBuf;
use std::time::Duration;
use zenith_utils::zid::ZTimelineId;
use zenith_utils::zid::ZTenantTimelineId;
pub mod callmemaybe;
pub mod handler;
@@ -47,8 +47,10 @@ pub struct SafeKeeperConf {
}
impl SafeKeeperConf {
pub fn timeline_dir(&self, timelineid: &ZTimelineId) -> PathBuf {
self.workdir.join(timelineid.to_string())
pub fn timeline_dir(&self, zttid: &ZTenantTimelineId) -> PathBuf {
self.workdir
.join(zttid.tenant_id.to_string())
.join(zttid.timeline_id.to_string())
}
}

View File

@@ -105,7 +105,7 @@ impl<'pg> ReceiveWalConn<'pg> {
// Need to establish replication channel with page server.
// Add far as replication in postgres is initiated by receiver
// we should use callmemaybe mechanism.
let timeline_id = spg.timeline.get().timeline_id;
let timeline_id = spg.timeline.get().zttid.timeline_id;
let subscription_key = SubscriptionStateKey::new(
tenant_id,
timeline_id,

View File

@@ -284,8 +284,8 @@ impl ReplicationConn {
None
} else {
let pageserver_connstr = pageserver_connstr.clone().expect("there should be a pageserver connection string since this is not a wal_proposer_recovery");
let timeline_id = spg.timeline.get().timeline_id;
let tenant_id = spg.ztenantid.unwrap();
let timeline_id = spg.timeline.get().zttid.timeline_id;
let tx_clone = spg.tx.clone();
let subscription_key =
SubscriptionStateKey::new(tenant_id, timeline_id, pageserver_connstr.clone());
@@ -330,10 +330,10 @@ impl ReplicationConn {
// and this code is not reachable
let pageserver_connstr = pageserver_connstr
.expect("there should be a pageserver connection string");
let timelineid = spg.timeline.get().timeline_id;
let tenant_id = spg.ztenantid.unwrap();
let timeline_id = spg.timeline.get().zttid.timeline_id;
let subscription_key =
SubscriptionStateKey::new(tenant_id, timelineid, pageserver_connstr);
SubscriptionStateKey::new(tenant_id, timeline_id, pageserver_connstr);
spg.tx
.send(CallmeEvent::Unsubscribe(subscription_key))
.unwrap_or_else(|e| {
@@ -361,8 +361,10 @@ impl ReplicationConn {
// Open a new file.
let segno = start_pos.segment_number(wal_seg_size);
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let timeline_id = spg.timeline.get().timeline_id;
let wal_file_path = spg.conf.timeline_dir(&timeline_id).join(wal_file_name);
let wal_file_path = spg
.conf
.timeline_dir(&spg.timeline.get().zttid)
.join(wal_file_name);
Self::open_wal_file(&wal_file_path)?
}
};

View File

@@ -4,7 +4,7 @@
use anyhow::{bail, ensure, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use lazy_static::lazy_static;
use postgres_ffi::xlog_utils::{find_end_of_wal, PG_TLI};
use postgres_ffi::xlog_utils::{find_end_of_wal, XLogSegNo, PG_TLI};
use std::cmp::{max, min};
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
@@ -16,7 +16,7 @@ use tracing::*;
use zenith_metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};
use zenith_utils::bin_ser::LeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use zenith_utils::zid::ZTenantTimelineId;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, ServerInfo,
@@ -164,14 +164,14 @@ impl SharedState {
/// If create=false and file doesn't exist, bails out.
fn create_restore(
conf: &SafeKeeperConf,
timeline_id: ZTimelineId,
zttid: &ZTenantTimelineId,
create: CreateControlFile,
) -> Result<Self> {
let state = FileStorage::load_control_file_conf(conf, timeline_id, create)
let state = FileStorage::load_control_file_conf(conf, zttid, create)
.context("failed to load from control file")?;
let file_storage = FileStorage::new(timeline_id, conf);
let file_storage = FileStorage::new(zttid, conf);
let flush_lsn = if state.server.wal_seg_size != 0 {
let wal_dir = conf.timeline_dir(&timeline_id);
let wal_dir = conf.timeline_dir(zttid);
find_end_of_wal(
&wal_dir,
state.server.wal_seg_size as usize,
@@ -194,16 +194,16 @@ impl SharedState {
/// Database instance (tenant)
pub struct Timeline {
pub timeline_id: ZTimelineId,
pub zttid: ZTenantTimelineId,
mutex: Mutex<SharedState>,
/// conditional variable used to notify wal senders
cond: Condvar,
}
impl Timeline {
fn new(timelineid: ZTimelineId, shared_state: SharedState) -> Timeline {
fn new(zttid: ZTenantTimelineId, shared_state: SharedState) -> Timeline {
Timeline {
timeline_id: timelineid,
zttid,
mutex: Mutex::new(shared_state),
cond: Condvar::new(),
}
@@ -349,8 +349,7 @@ pub trait TimelineTools {
fn set(
&mut self,
conf: &SafeKeeperConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
zttid: ZTenantTimelineId,
create: CreateControlFile,
) -> Result<()>;
@@ -361,14 +360,13 @@ impl TimelineTools for Option<Arc<Timeline>> {
fn set(
&mut self,
conf: &SafeKeeperConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
zttid: ZTenantTimelineId,
create: CreateControlFile,
) -> Result<()> {
// We will only set the timeline once. If it were to ever change,
// anyone who cloned the Arc would be out of date.
assert!(self.is_none());
*self = Some(GlobalTimelines::get(conf, tenant_id, timeline_id, create)?);
*self = Some(GlobalTimelines::get(conf, zttid, create)?);
Ok(())
}
@@ -378,7 +376,7 @@ impl TimelineTools for Option<Arc<Timeline>> {
}
lazy_static! {
pub static ref TIMELINES: Mutex<HashMap<(ZTenantId, ZTimelineId), Arc<Timeline>>> =
pub static ref TIMELINES: Mutex<HashMap<ZTenantTimelineId, Arc<Timeline>>> =
Mutex::new(HashMap::new());
}
@@ -390,26 +388,29 @@ impl GlobalTimelines {
/// If control file doesn't exist and create=false, bails out.
pub fn get(
conf: &SafeKeeperConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
zttid: ZTenantTimelineId,
create: CreateControlFile,
) -> Result<Arc<Timeline>> {
let mut timelines = TIMELINES.lock().unwrap();
match timelines.get(&(tenant_id, timeline_id)) {
match timelines.get(&zttid) {
Some(result) => Ok(Arc::clone(result)),
None => {
info!(
"creating timeline dir {}, create is {:?}",
timeline_id, create
);
fs::create_dir_all(timeline_id.to_string())?;
if let CreateControlFile::True = create {
let dir = conf.timeline_dir(&zttid);
info!(
"creating timeline dir {}, create is {:?}",
dir.display(),
create
);
fs::create_dir_all(dir)?;
}
let shared_state = SharedState::create_restore(conf, timeline_id, create)
let shared_state = SharedState::create_restore(conf, &zttid, create)
.context("failed to restore shared state")?;
let new_tli = Arc::new(Timeline::new(timeline_id, shared_state));
timelines.insert((tenant_id, timeline_id), Arc::clone(&new_tli));
let new_tli = Arc::new(Timeline::new(zttid, shared_state));
timelines.insert(zttid, Arc::clone(&new_tli));
Ok(new_tli)
}
}
@@ -425,9 +426,9 @@ pub struct FileStorage {
}
impl FileStorage {
fn new(timeline_id: ZTimelineId, conf: &SafeKeeperConf) -> FileStorage {
let timeline_dir = conf.timeline_dir(&timeline_id);
let timelineid_str = format!("{}", timeline_id);
fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> FileStorage {
let timeline_dir = conf.timeline_dir(zttid);
let timelineid_str = format!("{}", zttid);
FileStorage {
timeline_dir,
conf: conf.clone(),
@@ -456,12 +457,13 @@ impl FileStorage {
upgrade_control_file(buf, version)
}
// Load control file for given zttid at path specified by conf.
fn load_control_file_conf(
conf: &SafeKeeperConf,
timeline_id: ZTimelineId,
zttid: &ZTenantTimelineId,
create: CreateControlFile,
) -> Result<SafeKeeperState> {
let path = conf.timeline_dir(&timeline_id).join(CONTROL_FILE_NAME);
let path = conf.timeline_dir(zttid).join(CONTROL_FILE_NAME);
Self::load_control_file(path, create)
}
@@ -526,6 +528,14 @@ impl FileStorage {
};
Ok(state)
}
/// Helper returning full path to WAL segment file and its .partial brother.
fn wal_file_paths(&self, segno: XLogSegNo, wal_seg_size: usize) -> (PathBuf, PathBuf) {
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = self.timeline_dir.join(wal_file_name.clone());
let wal_file_partial_path = self.timeline_dir.join(wal_file_name + ".partial");
(wal_file_path, wal_file_partial_path)
}
}
impl Storage for FileStorage {
@@ -594,7 +604,6 @@ impl Storage for FileStorage {
let mut start_pos = startpos;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_seg_size = server.wal_seg_size as usize;
let ztli = server.timeline_id;
/* Extract WAL location for this block */
let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize;
@@ -614,14 +623,7 @@ impl Storage for FileStorage {
/* Open file */
let segno = start_pos.segment_number(wal_seg_size);
// note: we basically don't support changing pg timeline
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = self.conf.timeline_dir(&ztli).join(wal_file_name.clone());
let wal_file_partial_path = self
.conf
.timeline_dir(&ztli)
.join(wal_file_name.clone() + ".partial");
let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size);
{
let mut wal_file: File;
/* Try to open already completed segment */
@@ -682,25 +684,13 @@ impl Storage for FileStorage {
let partial;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
let wal_seg_size = server.wal_seg_size as usize;
let ztli = server.timeline_id;
/* Extract WAL location for this block */
let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
/* Open file */
let mut segno = end_pos.segment_number(wal_seg_size);
// note: we basically don't support changing pg timeline
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name.clone());
let wal_file_partial_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name + ".partial");
let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size);
{
let mut wal_file: File;
/* Try to open already completed segment */
@@ -731,17 +721,7 @@ impl Storage for FileStorage {
// Remove all subsequent segments
loop {
segno += 1;
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name.clone());
let wal_file_partial_path = self
.conf
.workdir
.join(ztli.to_string())
.join(wal_file_name.clone() + ".partial");
let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno, wal_seg_size);
// TODO: better use fs::try_exists which is currenty avaialble only in nightly build
if wal_file_path.exists() {
fs::remove_file(&wal_file_path)?;
@@ -761,11 +741,11 @@ mod test {
use crate::{
safekeeper::{SafeKeeperState, Storage},
timeline::{CreateControlFile, CONTROL_FILE_NAME},
SafeKeeperConf,
SafeKeeperConf, ZTenantTimelineId,
};
use anyhow::Result;
use std::fs;
use zenith_utils::{lsn::Lsn, zid::ZTimelineId};
use zenith_utils::lsn::Lsn;
fn stub_conf() -> SafeKeeperConf {
let workdir = tempfile::tempdir().unwrap().into_path();
@@ -777,31 +757,30 @@ mod test {
fn load_from_control_file(
conf: &SafeKeeperConf,
timeline_id: ZTimelineId,
zttid: &ZTenantTimelineId,
create: CreateControlFile,
) -> Result<(FileStorage, SafeKeeperState)> {
fs::create_dir_all(&conf.timeline_dir(&timeline_id))
.expect("failed to create timeline dir");
fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir");
Ok((
FileStorage::new(timeline_id, conf),
FileStorage::load_control_file_conf(conf, timeline_id, create)?,
FileStorage::new(zttid, conf),
FileStorage::load_control_file_conf(conf, zttid, create)?,
))
}
#[test]
fn test_read_write_safekeeper_state() {
let conf = stub_conf();
let timeline_id = ZTimelineId::generate();
let zttid = ZTenantTimelineId::generate();
{
let (mut storage, mut state) =
load_from_control_file(&conf, timeline_id, CreateControlFile::True)
load_from_control_file(&conf, &zttid, CreateControlFile::True)
.expect("failed to read state");
// change something
state.wal_start_lsn = Lsn(42);
storage.persist(&state).expect("failed to persist state");
}
let (_, state) = load_from_control_file(&conf, timeline_id, CreateControlFile::False)
let (_, state) = load_from_control_file(&conf, &zttid, CreateControlFile::False)
.expect("failed to read state");
assert_eq!(state.wal_start_lsn, Lsn(42));
}
@@ -809,21 +788,21 @@ mod test {
#[test]
fn test_safekeeper_state_checksum_mismatch() {
let conf = stub_conf();
let timeline_id = ZTimelineId::generate();
let zttid = ZTenantTimelineId::generate();
{
let (mut storage, mut state) =
load_from_control_file(&conf, timeline_id, CreateControlFile::True)
load_from_control_file(&conf, &zttid, CreateControlFile::True)
.expect("failed to read state");
// change something
state.wal_start_lsn = Lsn(42);
storage.persist(&state).expect("failed to persist state");
}
let control_path = conf.timeline_dir(&timeline_id).join(CONTROL_FILE_NAME);
let control_path = conf.timeline_dir(&zttid).join(CONTROL_FILE_NAME);
let mut data = fs::read(&control_path).unwrap();
data[0] += 1; // change the first byte of the file to fail checksum validation
fs::write(&control_path, &data).expect("failed to write control file");
match load_from_control_file(&conf, timeline_id, CreateControlFile::False) {
match load_from_control_file(&conf, &zttid, CreateControlFile::False) {
Err(err) => assert!(err
.to_string()
.contains("safekeeper control file checksum mismatch")),

View File

@@ -195,6 +195,32 @@ pub mod opt_display_serde {
}
}
// A pair uniquely identifying Zenith instance.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ZTenantTimelineId {
pub tenant_id: ZTenantId,
pub timeline_id: ZTimelineId,
}
impl ZTenantTimelineId {
pub fn new(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> Self {
ZTenantTimelineId {
tenant_id,
timeline_id,
}
}
pub fn generate() -> Self {
Self::new(ZTenantId::generate(), ZTimelineId::generate())
}
}
impl fmt::Display for ZTenantTimelineId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}-{}", self.tenant_id, self.timeline_id)
}
}
#[cfg(test)]
mod tests {
use std::fmt::Display;