Timeline IDs are not globally unique, fix some code that assumed that.

A timeline ID is only guaranteed to be unique for a particular tenant,
so you need to use tenant ID + timeline ID as the key, rather than just
timeline ID.

The safekeeper currently makes the same assumption, and we should fix that
too, but this commit just addresses this one case in the page server.

In the passing, reorder some function arguments to be more consistent.
This commit is contained in:
Heikki Linnakangas
2022-01-13 18:45:30 +02:00
parent 404aab9373
commit 19aaa91f6d
3 changed files with 16 additions and 16 deletions

View File

@@ -368,7 +368,7 @@ fn shutdown_timeline(
timeline
.upload_relishes
.store(false, atomic::Ordering::Relaxed);
walreceiver::stop_wal_receiver(timeline_id);
walreceiver::stop_wal_receiver(tenant_id, timeline_id);
trace!("repo shutdown. checkpoint timeline {}", timeline_id);
// Do not reconstruct pages to reduce shutdown time
timeline.checkpoint(CheckpointConfig::Flush)?;

View File

@@ -594,7 +594,7 @@ impl postgres_backend::Handler for PageServerHandler {
tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)
.context("Failed to fetch local timeline for callmemaybe requests")?;
walreceiver::launch_wal_receiver(self.conf, timelineid, &connstr, tenantid.to_owned());
walreceiver::launch_wal_receiver(self.conf, tenantid, timelineid, &connstr);
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("branch_create ") {

View File

@@ -44,7 +44,7 @@ struct WalReceiverEntry {
}
lazy_static! {
static ref WAL_RECEIVERS: Mutex<HashMap<ZTimelineId, WalReceiverEntry>> =
static ref WAL_RECEIVERS: Mutex<HashMap<(ZTenantId, ZTimelineId), WalReceiverEntry>> =
Mutex::new(HashMap::new());
}
@@ -60,10 +60,10 @@ thread_local! {
// In future we can make this more granular and send shutdown signals
// per tenant/timeline to cancel inactive walreceivers.
// TODO deal with blocking pg connections
pub fn stop_wal_receiver(timelineid: ZTimelineId) {
pub fn stop_wal_receiver(tenantid: ZTenantId, timelineid: ZTimelineId) {
let mut receivers = WAL_RECEIVERS.lock();
if let Some(r) = receivers.get_mut(&timelineid) {
if let Some(r) = receivers.get_mut(&(tenantid, timelineid)) {
match r.wal_receiver_interrupt_sender.take() {
Some(s) => {
if s.send(()).is_err() {
@@ -84,9 +84,9 @@ pub fn stop_wal_receiver(timelineid: ZTimelineId) {
}
}
pub fn drop_wal_receiver(timelineid: ZTimelineId, tenantid: ZTenantId) {
fn drop_wal_receiver(tenantid: ZTenantId, timelineid: ZTimelineId) {
let mut receivers = WAL_RECEIVERS.lock();
receivers.remove(&timelineid);
receivers.remove(&(tenantid, timelineid));
// Check if it was the last walreceiver of the tenant.
// TODO now we store one WalReceiverEntry per timeline,
@@ -104,13 +104,13 @@ pub fn drop_wal_receiver(timelineid: ZTimelineId, tenantid: ZTenantId) {
// Launch a new WAL receiver, or tell one that's running about change in connection string
pub fn launch_wal_receiver(
conf: &'static PageServerConf,
tenantid: ZTenantId,
timelineid: ZTimelineId,
wal_producer_connstr: &str,
tenantid: ZTenantId,
) {
let mut receivers = WAL_RECEIVERS.lock();
match receivers.get_mut(&timelineid) {
match receivers.get_mut(&(tenantid, timelineid)) {
Some(receiver) => {
receiver.wal_producer_connstr = wal_producer_connstr.into();
}
@@ -121,7 +121,7 @@ pub fn launch_wal_receiver(
.name("WAL receiver thread".into())
.spawn(move || {
IS_WAL_RECEIVER.with(|c| c.set(true));
thread_main(conf, timelineid, tenantid, rx);
thread_main(conf, tenantid, timelineid, rx);
})
.unwrap();
@@ -131,7 +131,7 @@ pub fn launch_wal_receiver(
wal_receiver_interrupt_sender: Some(tx),
tenantid,
};
receivers.insert(timelineid, receiver);
receivers.insert((tenantid, timelineid), receiver);
// Update tenant state and start tenant threads, if they are not running yet.
tenant_mgr::set_tenant_state(tenantid, TenantState::Active).unwrap();
@@ -141,11 +141,11 @@ pub fn launch_wal_receiver(
}
// Look up current WAL producer connection string in the hash table
fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String {
fn get_wal_producer_connstr(tenantid: ZTenantId, timelineid: ZTimelineId) -> String {
let receivers = WAL_RECEIVERS.lock();
receivers
.get(&timelineid)
.get(&(tenantid, timelineid))
.unwrap()
.wal_producer_connstr
.clone()
@@ -156,15 +156,15 @@ fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String {
//
fn thread_main(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
timelineid: ZTimelineId,
interrupt_receiver: oneshot::Receiver<()>,
) {
let _enter = info_span!("WAL receiver", timeline = %timelineid, tenant = %tenantid).entered();
info!("WAL receiver thread started");
// Look up the current WAL producer address
let wal_producer_connstr = get_wal_producer_connstr(timelineid);
let wal_producer_connstr = get_wal_producer_connstr(tenantid, timelineid);
// Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server,
// and start streaming WAL from it.
@@ -188,7 +188,7 @@ fn thread_main(
// Drop it from list of active WAL_RECEIVERS
// so that next callmemaybe request launched a new thread
drop_wal_receiver(timelineid, tenantid);
drop_wal_receiver(tenantid, timelineid);
}
fn walreceiver_main(