diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index b47dfbd4ab..455b825db7 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -101,10 +101,7 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) { let mut m = access_tenants(); let tenant = m.get_mut(&tenant_id).unwrap(); tenant.repo = Some(repo); - tenant.state = TenantState::Active; - - // TODO Start these threads only if tenant actively receives some WAL - tenant_threads::start_tenant_threads(conf, tenant_id); + tenant.state = TenantState::Idle; } pub fn register_relish_download( @@ -128,12 +125,12 @@ pub fn register_relish_download( match &tenant.repo { Some(repo) => { init_timeline(repo.as_ref(), timeline_id); - tenant.state = TenantState::Active; + tenant.state = TenantState::Idle; return; } None => log::warn!("Initialize new repo"), } - tenant.state = TenantState::Active; + tenant.state = TenantState::Idle; } // init repo updates Tenant state @@ -197,7 +194,7 @@ pub fn create_repository_for_tenant( let mut m = access_tenants(); let tenant = m.get_mut(&tenantid).unwrap(); tenant.repo = Some(repo); - tenant.state = TenantState::Active; + tenant.state = TenantState::Idle; Ok(()) } @@ -211,18 +208,18 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> TenantState { } } -pub fn set_tenant_state(tenantid: ZTenantId, state: TenantState) -> Result { +pub fn set_tenant_state(tenantid: ZTenantId, newstate: TenantState) -> Result { let mut m = access_tenants(); let tenant = m.get_mut(&tenantid); match tenant { Some(tenant) => { - if state == TenantState::Idle && tenant.state != TenantState::Active { + if newstate == TenantState::Idle && tenant.state != TenantState::Active { // Only Active tenant can become Idle return Ok(tenant.state); } - info!("set_tenant_state: {} -> {}", tenant.state, state); - tenant.state = state; + info!("set_tenant_state: {} -> {}", tenant.state, newstate); + tenant.state = newstate; Ok(tenant.state) } None => bail!("Tenant not found for tenant {}", tenantid), diff --git a/pageserver/src/tenant_threads.rs b/pageserver/src/tenant_threads.rs index d7d06fdef8..80a756de59 100644 --- a/pageserver/src/tenant_threads.rs +++ b/pageserver/src/tenant_threads.rs @@ -35,33 +35,35 @@ lazy_static! { .expect("failed to define a metric"); } +// Launch checkpointer and GC for the tenant. +// It's possible that the threads are running already, +// if so, just don't spawn new ones. pub fn start_tenant_threads(conf: &'static PageServerConf, tenantid: ZTenantId) { - //ensure that old threads are stopeed - wait_for_tenant_threads_to_stop(tenantid); - - let checkpointer_handle = std::thread::Builder::new() - .name("Checkpointer thread".into()) - .spawn(move || { - checkpoint_loop(tenantid, conf).expect("Checkpointer thread died"); - }) - .ok(); - - let gc_handle = std::thread::Builder::new() - .name("GC thread".into()) - .spawn(move || { - gc_loop(tenantid, conf).expect("GC thread died"); - }) - .ok(); - - // TODO handle thread errors if any - let mut handles = TENANT_HANDLES.lock().unwrap(); - let h = TenantHandleEntry { - checkpointer_handle, - gc_handle, - }; + let h = handles + .entry(tenantid) + .or_insert_with(|| TenantHandleEntry { + checkpointer_handle: None, + gc_handle: None, + }); - handles.insert(tenantid, h); + if h.checkpointer_handle.is_none() { + h.checkpointer_handle = std::thread::Builder::new() + .name("Checkpointer thread".into()) + .spawn(move || { + checkpoint_loop(tenantid, conf).expect("Checkpointer thread died"); + }) + .ok(); + } + + if h.gc_handle.is_none() { + h.gc_handle = std::thread::Builder::new() + .name("GC thread".into()) + .spawn(move || { + gc_loop(tenantid, conf).expect("GC thread died"); + }) + .ok(); + } } pub fn wait_for_tenant_threads_to_stop(tenantid: ZTenantId) { diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 65b3fa5cf6..1f84ed8507 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -8,6 +8,8 @@ use crate::relish::*; use crate::restore_local_repo; use crate::tenant_mgr; +use crate::tenant_mgr::TenantState; +use crate::tenant_threads; use crate::waldecoder::*; use crate::PageServerConf; use anyhow::{bail, Error, Result}; @@ -38,6 +40,7 @@ use zenith_utils::zid::ZTimelineId; struct WalReceiverEntry { wal_producer_connstr: String, wal_receiver_handle: Option>, + tenantid: ZTenantId, } lazy_static! { @@ -65,6 +68,23 @@ pub fn stop_wal_receiver(timelineid: ZTimelineId) { } } +pub fn drop_wal_receiver(timelineid: ZTimelineId, tenantid: ZTenantId) { + let mut receivers = WAL_RECEIVERS.lock().unwrap(); + receivers.remove(&timelineid); + + // Check if it was the last walreceiver of the tenant. + // TODO now we store one WalReceiverEntry per timeline, + // so this iterator looks a bit strange. + for (_timelineid, entry) in receivers.iter() { + if entry.tenantid == tenantid { + return; + } + } + + // When last walreceiver of the tenant is gone, change state to Idle + tenant_mgr::set_tenant_state(tenantid, TenantState::Idle).unwrap(); +} + // Launch a new WAL receiver, or tell one that's running about change in connection string pub fn launch_wal_receiver( conf: &'static PageServerConf, @@ -90,8 +110,13 @@ pub fn launch_wal_receiver( let receiver = WalReceiverEntry { wal_producer_connstr: wal_producer_connstr.into(), wal_receiver_handle: Some(wal_receiver_handle), + tenantid, }; receivers.insert(timelineid, receiver); + + // Update tenant state and start tenant threads, if they are not running yet. + tenant_mgr::set_tenant_state(tenantid, TenantState::Active).unwrap(); + tenant_threads::start_tenant_threads(conf, tenantid); } }; } @@ -114,11 +139,15 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: let _enter = info_span!("WAL receiver", timeline = %timelineid, tenant = %tenantid).entered(); info!("WAL receiver thread started"); + let mut retry_count = 10; + // // Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server, // and start streaming WAL from it. If the connection is lost, keep retrying. + // TODO How long should we retry in case of losing connection? + // Should we retry at all or we can wait for the next callmemaybe request? // - while !tenant_mgr::shutdown_requested() { + while !tenant_mgr::shutdown_requested() && retry_count > 0 { // Look up the current WAL producer address let wal_producer_connstr = get_wal_producer_connstr(timelineid); @@ -129,10 +158,20 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: "WAL streaming connection failed ({}), retrying in 1 second", e ); + retry_count -= 1; sleep(Duration::from_secs(1)); + } else { + info!( + "walreceiver disconnected tenant {}, timelineid {}", + tenantid, timelineid + ); + break; } } - debug!("WAL streaming shut down"); + info!("WAL streaming shut down"); + // Drop it from list of active WAL_RECEIVERS + // so that next callmemaybe request launched a new thread + drop_wal_receiver(timelineid, tenantid); } fn walreceiver_main( @@ -300,6 +339,7 @@ fn walreceiver_main( break; } } + Ok(()) }