diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 060210bf7f..71a34d0157 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -368,7 +368,7 @@ fn shutdown_timeline( timeline .upload_relishes .store(false, atomic::Ordering::Relaxed); - walreceiver::stop_wal_receiver(timeline_id); + walreceiver::stop_wal_receiver(tenant_id, timeline_id); trace!("repo shutdown. checkpoint timeline {}", timeline_id); // Do not reconstruct pages to reduce shutdown time timeline.checkpoint(CheckpointConfig::Flush)?; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 70fd8802dc..386e14b4bb 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -594,7 +594,7 @@ impl postgres_backend::Handler for PageServerHandler { tenant_mgr::get_timeline_for_tenant(tenantid, timelineid) .context("Failed to fetch local timeline for callmemaybe requests")?; - walreceiver::launch_wal_receiver(self.conf, timelineid, &connstr, tenantid.to_owned()); + walreceiver::launch_wal_receiver(self.conf, tenantid, timelineid, &connstr); pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("branch_create ") { diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index ef63d357f8..9dd1d14fe0 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -44,7 +44,7 @@ struct WalReceiverEntry { } lazy_static! { - static ref WAL_RECEIVERS: Mutex> = + static ref WAL_RECEIVERS: Mutex> = Mutex::new(HashMap::new()); } @@ -60,10 +60,10 @@ thread_local! { // In future we can make this more granular and send shutdown signals // per tenant/timeline to cancel inactive walreceivers. // TODO deal with blocking pg connections -pub fn stop_wal_receiver(timelineid: ZTimelineId) { +pub fn stop_wal_receiver(tenantid: ZTenantId, timelineid: ZTimelineId) { let mut receivers = WAL_RECEIVERS.lock(); - if let Some(r) = receivers.get_mut(&timelineid) { + if let Some(r) = receivers.get_mut(&(tenantid, timelineid)) { match r.wal_receiver_interrupt_sender.take() { Some(s) => { if s.send(()).is_err() { @@ -84,9 +84,9 @@ pub fn stop_wal_receiver(timelineid: ZTimelineId) { } } -pub fn drop_wal_receiver(timelineid: ZTimelineId, tenantid: ZTenantId) { +fn drop_wal_receiver(tenantid: ZTenantId, timelineid: ZTimelineId) { let mut receivers = WAL_RECEIVERS.lock(); - receivers.remove(&timelineid); + receivers.remove(&(tenantid, timelineid)); // Check if it was the last walreceiver of the tenant. // TODO now we store one WalReceiverEntry per timeline, @@ -104,13 +104,13 @@ pub fn drop_wal_receiver(timelineid: ZTimelineId, tenantid: ZTenantId) { // Launch a new WAL receiver, or tell one that's running about change in connection string pub fn launch_wal_receiver( conf: &'static PageServerConf, + tenantid: ZTenantId, timelineid: ZTimelineId, wal_producer_connstr: &str, - tenantid: ZTenantId, ) { let mut receivers = WAL_RECEIVERS.lock(); - match receivers.get_mut(&timelineid) { + match receivers.get_mut(&(tenantid, timelineid)) { Some(receiver) => { receiver.wal_producer_connstr = wal_producer_connstr.into(); } @@ -121,7 +121,7 @@ pub fn launch_wal_receiver( .name("WAL receiver thread".into()) .spawn(move || { IS_WAL_RECEIVER.with(|c| c.set(true)); - thread_main(conf, timelineid, tenantid, rx); + thread_main(conf, tenantid, timelineid, rx); }) .unwrap(); @@ -131,7 +131,7 @@ pub fn launch_wal_receiver( wal_receiver_interrupt_sender: Some(tx), tenantid, }; - receivers.insert(timelineid, receiver); + receivers.insert((tenantid, timelineid), receiver); // Update tenant state and start tenant threads, if they are not running yet. tenant_mgr::set_tenant_state(tenantid, TenantState::Active).unwrap(); @@ -141,11 +141,11 @@ pub fn launch_wal_receiver( } // Look up current WAL producer connection string in the hash table -fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String { +fn get_wal_producer_connstr(tenantid: ZTenantId, timelineid: ZTimelineId) -> String { let receivers = WAL_RECEIVERS.lock(); receivers - .get(&timelineid) + .get(&(tenantid, timelineid)) .unwrap() .wal_producer_connstr .clone() @@ -156,15 +156,15 @@ fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String { // fn thread_main( conf: &'static PageServerConf, - timelineid: ZTimelineId, tenantid: ZTenantId, + timelineid: ZTimelineId, interrupt_receiver: oneshot::Receiver<()>, ) { let _enter = info_span!("WAL receiver", timeline = %timelineid, tenant = %tenantid).entered(); info!("WAL receiver thread started"); // Look up the current WAL producer address - let wal_producer_connstr = get_wal_producer_connstr(timelineid); + let wal_producer_connstr = get_wal_producer_connstr(tenantid, timelineid); // Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server, // and start streaming WAL from it. @@ -188,7 +188,7 @@ fn thread_main( // Drop it from list of active WAL_RECEIVERS // so that next callmemaybe request launched a new thread - drop_wal_receiver(timelineid, tenantid); + drop_wal_receiver(tenantid, timelineid); } fn walreceiver_main(