WIP. Launch and shutdown tenant threads together with walreceiver.

TODO: now walreceiver only disconnects if safekeeper was shut down. Implemnt proper walreceiver disconnection.
This commit is contained in:
anastasia
2021-10-28 16:46:26 +03:00
committed by lubennikovaav
parent 071e30cc53
commit 83ed930bc2
3 changed files with 76 additions and 37 deletions

View File

@@ -101,10 +101,7 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
let mut m = access_tenants();
let tenant = m.get_mut(&tenant_id).unwrap();
tenant.repo = Some(repo);
tenant.state = TenantState::Active;
// TODO Start these threads only if tenant actively receives some WAL
tenant_threads::start_tenant_threads(conf, tenant_id);
tenant.state = TenantState::Idle;
}
pub fn register_relish_download(
@@ -128,12 +125,12 @@ pub fn register_relish_download(
match &tenant.repo {
Some(repo) => {
init_timeline(repo.as_ref(), timeline_id);
tenant.state = TenantState::Active;
tenant.state = TenantState::Idle;
return;
}
None => log::warn!("Initialize new repo"),
}
tenant.state = TenantState::Active;
tenant.state = TenantState::Idle;
}
// init repo updates Tenant state
@@ -197,7 +194,7 @@ pub fn create_repository_for_tenant(
let mut m = access_tenants();
let tenant = m.get_mut(&tenantid).unwrap();
tenant.repo = Some(repo);
tenant.state = TenantState::Active;
tenant.state = TenantState::Idle;
Ok(())
}
@@ -211,18 +208,18 @@ pub fn get_tenant_state(tenantid: ZTenantId) -> TenantState {
}
}
pub fn set_tenant_state(tenantid: ZTenantId, state: TenantState) -> Result<TenantState> {
pub fn set_tenant_state(tenantid: ZTenantId, newstate: TenantState) -> Result<TenantState> {
let mut m = access_tenants();
let tenant = m.get_mut(&tenantid);
match tenant {
Some(tenant) => {
if state == TenantState::Idle && tenant.state != TenantState::Active {
if newstate == TenantState::Idle && tenant.state != TenantState::Active {
// Only Active tenant can become Idle
return Ok(tenant.state);
}
info!("set_tenant_state: {} -> {}", tenant.state, state);
tenant.state = state;
info!("set_tenant_state: {} -> {}", tenant.state, newstate);
tenant.state = newstate;
Ok(tenant.state)
}
None => bail!("Tenant not found for tenant {}", tenantid),

View File

@@ -35,33 +35,35 @@ lazy_static! {
.expect("failed to define a metric");
}
// Launch checkpointer and GC for the tenant.
// It's possible that the threads are running already,
// if so, just don't spawn new ones.
pub fn start_tenant_threads(conf: &'static PageServerConf, tenantid: ZTenantId) {
//ensure that old threads are stopeed
wait_for_tenant_threads_to_stop(tenantid);
let checkpointer_handle = std::thread::Builder::new()
.name("Checkpointer thread".into())
.spawn(move || {
checkpoint_loop(tenantid, conf).expect("Checkpointer thread died");
})
.ok();
let gc_handle = std::thread::Builder::new()
.name("GC thread".into())
.spawn(move || {
gc_loop(tenantid, conf).expect("GC thread died");
})
.ok();
// TODO handle thread errors if any
let mut handles = TENANT_HANDLES.lock().unwrap();
let h = TenantHandleEntry {
checkpointer_handle,
gc_handle,
};
let h = handles
.entry(tenantid)
.or_insert_with(|| TenantHandleEntry {
checkpointer_handle: None,
gc_handle: None,
});
handles.insert(tenantid, h);
if h.checkpointer_handle.is_none() {
h.checkpointer_handle = std::thread::Builder::new()
.name("Checkpointer thread".into())
.spawn(move || {
checkpoint_loop(tenantid, conf).expect("Checkpointer thread died");
})
.ok();
}
if h.gc_handle.is_none() {
h.gc_handle = std::thread::Builder::new()
.name("GC thread".into())
.spawn(move || {
gc_loop(tenantid, conf).expect("GC thread died");
})
.ok();
}
}
pub fn wait_for_tenant_threads_to_stop(tenantid: ZTenantId) {

View File

@@ -8,6 +8,8 @@
use crate::relish::*;
use crate::restore_local_repo;
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use crate::tenant_threads;
use crate::waldecoder::*;
use crate::PageServerConf;
use anyhow::{bail, Error, Result};
@@ -38,6 +40,7 @@ use zenith_utils::zid::ZTimelineId;
struct WalReceiverEntry {
wal_producer_connstr: String,
wal_receiver_handle: Option<JoinHandle<()>>,
tenantid: ZTenantId,
}
lazy_static! {
@@ -65,6 +68,23 @@ pub fn stop_wal_receiver(timelineid: ZTimelineId) {
}
}
pub fn drop_wal_receiver(timelineid: ZTimelineId, tenantid: ZTenantId) {
let mut receivers = WAL_RECEIVERS.lock().unwrap();
receivers.remove(&timelineid);
// Check if it was the last walreceiver of the tenant.
// TODO now we store one WalReceiverEntry per timeline,
// so this iterator looks a bit strange.
for (_timelineid, entry) in receivers.iter() {
if entry.tenantid == tenantid {
return;
}
}
// When last walreceiver of the tenant is gone, change state to Idle
tenant_mgr::set_tenant_state(tenantid, TenantState::Idle).unwrap();
}
// Launch a new WAL receiver, or tell one that's running about change in connection string
pub fn launch_wal_receiver(
conf: &'static PageServerConf,
@@ -90,8 +110,13 @@ pub fn launch_wal_receiver(
let receiver = WalReceiverEntry {
wal_producer_connstr: wal_producer_connstr.into(),
wal_receiver_handle: Some(wal_receiver_handle),
tenantid,
};
receivers.insert(timelineid, receiver);
// Update tenant state and start tenant threads, if they are not running yet.
tenant_mgr::set_tenant_state(tenantid, TenantState::Active).unwrap();
tenant_threads::start_tenant_threads(conf, tenantid);
}
};
}
@@ -114,11 +139,15 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid:
let _enter = info_span!("WAL receiver", timeline = %timelineid, tenant = %tenantid).entered();
info!("WAL receiver thread started");
let mut retry_count = 10;
//
// Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server,
// and start streaming WAL from it. If the connection is lost, keep retrying.
// TODO How long should we retry in case of losing connection?
// Should we retry at all or we can wait for the next callmemaybe request?
//
while !tenant_mgr::shutdown_requested() {
while !tenant_mgr::shutdown_requested() && retry_count > 0 {
// Look up the current WAL producer address
let wal_producer_connstr = get_wal_producer_connstr(timelineid);
@@ -129,10 +158,20 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid:
"WAL streaming connection failed ({}), retrying in 1 second",
e
);
retry_count -= 1;
sleep(Duration::from_secs(1));
} else {
info!(
"walreceiver disconnected tenant {}, timelineid {}",
tenantid, timelineid
);
break;
}
}
debug!("WAL streaming shut down");
info!("WAL streaming shut down");
// Drop it from list of active WAL_RECEIVERS
// so that next callmemaybe request launched a new thread
drop_wal_receiver(timelineid, tenantid);
}
fn walreceiver_main(
@@ -300,6 +339,7 @@ fn walreceiver_main(
break;
}
}
Ok(())
}