Refactoring of checkpointer and GC.

Move them to a separate tenant_threads module to detangle thread management from LayeredRepository implementation.
This commit is contained in:
anastasia
2021-10-22 20:01:57 +03:00
committed by lubennikovaav
parent 28ab40c8b7
commit ea5900f155
6 changed files with 222 additions and 143 deletions

View File

@@ -23,6 +23,7 @@ use zenith_utils::zid::{ZTenantId, ZTimelineId};
use crate::tenant_mgr;
use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;
use crate::{repository::Repository, PageServerConf};
use crate::{restore_local_repo, LOG_FILE_NAME};
@@ -238,7 +239,7 @@ fn bootstrap_timeline(
timeline.writer().as_ref(),
lsn,
)?;
timeline.checkpoint()?;
timeline.checkpoint(CheckpointConfig::Forced)?;
println!(
"created initial timeline {} timeline.lsn {}",

View File

@@ -30,7 +30,6 @@ use std::ops::{Bound::Included, Deref};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use crate::relish::*;
@@ -40,6 +39,7 @@ use crate::tenant_mgr;
use crate::walreceiver;
use crate::walreceiver::IS_WAL_RECEIVER;
use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;
use crate::PageServerConf;
use crate::{ZTenantId, ZTimelineId};
@@ -219,6 +219,22 @@ impl Repository for LayeredRepository {
})
}
fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()> {
{
let timelines = self.timelines.lock().unwrap();
for (timelineid, timeline) in timelines.iter() {
let _entered =
info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid)
.entered();
timeline.checkpoint(cconf)?;
}
}
Ok(())
}
// Wait for all threads to complete and persist repository data before pageserver shutdown.
fn shutdown(&self) -> Result<()> {
trace!("LayeredRepository shutdown for tenant {}", self.tenantid);
@@ -228,7 +244,7 @@ impl Repository for LayeredRepository {
walreceiver::stop_wal_receiver(*timelineid);
// Wait for syncing data to disk
trace!("repo shutdown. checkpoint timeline {}", timelineid);
timeline.checkpoint()?;
timeline.checkpoint(CheckpointConfig::Forced)?;
//TODO Wait for walredo process to shutdown too
}
@@ -309,90 +325,6 @@ impl LayeredRepository {
}
}
///
/// Launch the checkpointer thread in given repository.
///
pub fn launch_checkpointer_thread(
conf: &'static PageServerConf,
rc: Arc<LayeredRepository>,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("Checkpointer thread".into())
.spawn(move || {
// FIXME: relaunch it? Panic is not good.
rc.checkpoint_loop(conf).expect("Checkpointer thread died");
})
.unwrap()
}
///
/// Checkpointer thread's main loop
///
fn checkpoint_loop(&self, conf: &'static PageServerConf) -> Result<()> {
while !tenant_mgr::shutdown_requested() {
std::thread::sleep(conf.checkpoint_period);
info!("checkpointer thread for tenant {} waking up", self.tenantid);
// checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE
// bytes of WAL since last checkpoint.
{
let timelines = self.timelines.lock().unwrap();
for (timelineid, timeline) in timelines.iter() {
let _entered =
info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid)
.entered();
STORAGE_TIME
.with_label_values(&["checkpoint_timed"])
.observe_closure_duration(|| {
timeline.checkpoint_internal(conf.checkpoint_distance, false)
})?
}
// release lock on 'timelines'
}
}
trace!("Checkpointer thread shut down");
Ok(())
}
///
/// Launch the GC thread in given repository.
///
pub fn launch_gc_thread(
conf: &'static PageServerConf,
rc: Arc<LayeredRepository>,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("GC thread".into())
.spawn(move || {
// FIXME: relaunch it? Panic is not good.
rc.gc_loop(conf).expect("GC thread died");
})
.unwrap()
}
///
/// GC thread's main loop
///
fn gc_loop(&self, conf: &'static PageServerConf) -> Result<()> {
while !tenant_mgr::shutdown_requested() {
// Garbage collect old files that are not needed for PITR anymore
if conf.gc_horizon > 0 {
self.gc_iteration(None, conf.gc_horizon, false).unwrap();
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = conf.gc_period.as_secs();
while sleep_time > 0 && !tenant_mgr::shutdown_requested() {
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
info!("gc thread for tenant {} waking up", self.tenantid);
}
Ok(())
}
/// Save timeline metadata to file
fn save_metadata(
conf: &'static PageServerConf,
@@ -546,7 +478,7 @@ impl LayeredRepository {
// so that they too can be garbage collected. That's
// used in tests, so we want as deterministic results as possible.
if checkpoint_before_gc {
timeline.checkpoint()?;
timeline.checkpoint(CheckpointConfig::Forced)?;
info!("timeline {} checkpoint_before_gc done", timelineid);
}
@@ -867,11 +799,15 @@ impl Timeline for LayeredTimeline {
/// Public entry point for checkpoint(). All the logic is in the private
/// checkpoint_internal function, this public facade just wraps it for
/// metrics collection.
fn checkpoint(&self) -> Result<()> {
STORAGE_TIME
.with_label_values(&["checkpoint_force"])
//pass checkpoint_distance=0 to force checkpoint
.observe_closure_duration(|| self.checkpoint_internal(0, true))
fn checkpoint(&self, cconf: CheckpointConfig) -> Result<()> {
match cconf {
CheckpointConfig::Forced => STORAGE_TIME
.with_label_values(&["forced checkpoint"])
.observe_closure_duration(|| self.checkpoint_internal(0)),
CheckpointConfig::Distance(distance) => STORAGE_TIME
.with_label_values(&["checkpoint"])
.observe_closure_duration(|| self.checkpoint_internal(distance)),
}
}
fn get_last_record_lsn(&self) -> Lsn {
@@ -1250,7 +1186,7 @@ impl LayeredTimeline {
/// Flush to disk all data that was written with the put_* functions
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL.
fn checkpoint_internal(&self, checkpoint_distance: u64, forced: bool) -> Result<()> {
fn checkpoint_internal(&self, checkpoint_distance: u64) -> Result<()> {
let mut write_guard = self.write_lock.lock().unwrap();
let mut layers = self.layers.lock().unwrap();
@@ -1281,10 +1217,6 @@ impl LayeredTimeline {
while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() {
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
if tenant_mgr::shutdown_requested() && !forced {
return Ok(());
}
// Does this layer need freezing?
//
// Write out all in-memory layers that contain WAL older than CHECKPOINT_DISTANCE.

View File

@@ -18,6 +18,7 @@ pub mod relish_storage;
pub mod repository;
pub mod restore_local_repo;
pub mod tenant_mgr;
pub mod tenant_threads;
pub mod waldecoder;
pub mod walreceiver;
pub mod walredo;
@@ -164,6 +165,15 @@ impl PageServerConf {
}
}
/// Config for the Repository checkpointer
#[derive(Debug, Clone, Copy)]
pub enum CheckpointConfig {
// Flush in-memory data that is older than this
Distance(u64),
// Flush all in-memory data
Forced,
}
/// External relish storage configuration, enough for creating a client for that storage.
#[derive(Debug, Clone)]
pub struct RelishStorageConfig {

View File

@@ -1,4 +1,5 @@
use crate::relish::*;
use crate::CheckpointConfig;
use anyhow::Result;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use serde::{Deserialize, Serialize};
@@ -24,9 +25,9 @@ pub trait Repository: Send + Sync {
/// Branch a timeline
fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>;
/// perform one garbage collection iteration.
/// garbage collection is periodically performed by gc thread,
/// but it can be explicitly requested through page server api.
/// perform one garbage collection iteration, removing old data files from disk.
/// this funtion is periodically called by gc thread.
/// also it can be explicitly requested through page server api 'do_gc' command.
///
/// 'timelineid' specifies the timeline to GC, or None for all.
/// `horizon` specifies delta from last lsn to preserve all object versions (pitr interval).
@@ -39,6 +40,10 @@ pub trait Repository: Send + Sync {
horizon: u64,
checkpoint_before_gc: bool,
) -> Result<GcResult>;
/// perform one checkpoint iteration, flushing in-memory data on disk.
/// this function is periodically called by checkponter thread.
fn checkpoint_iteration(&self, cconf: CheckpointConfig) -> Result<()>;
}
///
@@ -144,7 +149,7 @@ pub trait Timeline: Send + Sync {
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
fn checkpoint(&self) -> Result<()>;
fn checkpoint(&self, cconf: CheckpointConfig) -> Result<()>;
/// Retrieve current logical size of the timeline
///
@@ -714,7 +719,7 @@ mod tests {
.contains(&TESTREL_A));
// Run checkpoint and garbage collection and check that it's still not visible
newtline.checkpoint()?;
newtline.checkpoint(CheckpointConfig::Forced)?;
repo.gc_iteration(Some(NEW_TIMELINE_ID), 0, true)?;
assert!(!newtline

View File

@@ -4,6 +4,7 @@
use crate::branches;
use crate::layered_repository::LayeredRepository;
use crate::repository::{Repository, Timeline};
use crate::tenant_threads;
use crate::walredo::PostgresRedoManager;
use crate::PageServerConf;
use anyhow::{anyhow, bail, Context, Result};
@@ -16,7 +17,6 @@ use std::fs;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread::JoinHandle;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
lazy_static! {
@@ -28,7 +28,7 @@ struct Tenant {
repo: Option<Arc<dyn Repository>>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
pub enum TenantState {
// This tenant only exists in cloud storage. It cannot be accessed.
CloudOnly,
@@ -41,10 +41,12 @@ pub enum TenantState {
// This tenant exists on local disk, and the layer map has been loaded into memory.
// The local disk might have some newer files that don't exist in cloud storage yet.
Active,
// Tenant is active, but there is no walreceiver connection.
Idle,
// This tenant exists on local disk, and the layer map has been loaded into memory.
// The local disk might have some newer files that don't exist in cloud storage yet.
// The tenant cannot be accessed anymore for any reason, but graceful shutdown.
//Stopping,
Stopping,
}
impl fmt::Display for TenantState {
@@ -53,6 +55,8 @@ impl fmt::Display for TenantState {
TenantState::CloudOnly => f.write_str("CloudOnly"),
TenantState::Downloading => f.write_str("Downloading"),
TenantState::Active => f.write_str("Active"),
TenantState::Idle => f.write_str("Idle"),
TenantState::Stopping => f.write_str("Stopping"),
}
}
}
@@ -61,18 +65,6 @@ fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
TENANTS.lock().unwrap()
}
struct TenantHandleEntry {
checkpointer_handle: Option<JoinHandle<()>>,
gc_handle: Option<JoinHandle<()>>,
}
// Logically these handles belong to Repository,
// but it's just simpler to store them separately
lazy_static! {
static ref TENANT_HANDLES: Mutex<HashMap<ZTenantId, TenantHandleEntry>> =
Mutex::new(HashMap::new());
}
static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
pub fn init(conf: &'static PageServerConf) {
@@ -106,21 +98,13 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
true,
));
let checkpointer_handle = LayeredRepository::launch_checkpointer_thread(conf, repo.clone());
let gc_handle = LayeredRepository::launch_gc_thread(conf, repo.clone());
let mut handles = TENANT_HANDLES.lock().unwrap();
let h = TenantHandleEntry {
checkpointer_handle: Some(checkpointer_handle),
gc_handle: Some(gc_handle),
};
handles.insert(tenant_id, h);
let mut m = access_tenants();
let tenant = m.get_mut(&tenant_id).unwrap();
tenant.repo = Some(repo);
tenant.state = TenantState::Active;
// TODO Start these threads only if tenant actively receives some WAL
tenant_threads::start_tenant_threads(conf, tenant_id);
}
pub fn register_relish_download(
@@ -142,8 +126,7 @@ pub fn register_relish_download(
});
tenant.state = TenantState::Downloading;
match &tenant.repo {
Some(repo) =>
{
Some(repo) => {
init_timeline(repo.as_ref(), timeline_id);
tenant.state = TenantState::Active;
return;
@@ -171,27 +154,23 @@ pub fn shutdown_requested() -> bool {
SHUTDOWN_REQUESTED.load(Ordering::Relaxed)
}
pub fn stop_tenant_threads(tenantid: ZTenantId) {
let mut handles = TENANT_HANDLES.lock().unwrap();
if let Some(h) = handles.get_mut(&tenantid) {
h.checkpointer_handle.take().map(JoinHandle::join);
debug!("checkpointer for tenant {} has stopped", tenantid);
h.gc_handle.take().map(JoinHandle::join);
debug!("gc for tenant {} has stopped", tenantid);
}
}
pub fn shutdown_all_tenants() -> Result<()> {
SHUTDOWN_REQUESTED.swap(true, Ordering::Relaxed);
let tenantids = list_tenantids()?;
for tenantid in &tenantids {
set_tenant_state(*tenantid, TenantState::Stopping)?;
}
for tenantid in tenantids {
stop_tenant_threads(tenantid);
// Wait for checkpointer and GC to finish their job
tenant_threads::wait_for_tenant_threads_to_stop(tenantid);
let repo = get_repository_for_tenant(tenantid)?;
debug!("shutdown tenant {}", tenantid);
repo.shutdown()?;
}
Ok(())
}
@@ -223,6 +202,33 @@ pub fn create_repository_for_tenant(
Ok(())
}
// If tenant is not found in the repository, return CloudOnly state
pub fn get_tenant_state(tenantid: ZTenantId) -> TenantState {
let m = access_tenants();
match m.get(&tenantid) {
Some(tenant) => tenant.state,
None => TenantState::CloudOnly,
}
}
pub fn set_tenant_state(tenantid: ZTenantId, state: TenantState) -> Result<TenantState> {
let mut m = access_tenants();
let tenant = m.get_mut(&tenantid);
match tenant {
Some(tenant) => {
if state == TenantState::Idle && tenant.state != TenantState::Active {
// Only Active tenant can become Idle
return Ok(tenant.state);
}
info!("set_tenant_state: {} -> {}", tenant.state, state);
tenant.state = state;
Ok(tenant.state)
}
None => bail!("Tenant not found for tenant {}", tenantid),
}
}
pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<dyn Repository>> {
let m = access_tenants();
let tenant = m

View File

@@ -0,0 +1,125 @@
//! This module contains functions to serve per-tenant background processes,
//! such as checkpointer and GC
use crate::tenant_mgr;
use crate::tenant_mgr::TenantState;
use crate::CheckpointConfig;
use crate::PageServerConf;
use anyhow::Result;
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::sync::Mutex;
use std::thread::JoinHandle;
use std::time::Duration;
use tracing::*;
use zenith_utils::zid::ZTenantId;
struct TenantHandleEntry {
checkpointer_handle: Option<JoinHandle<()>>,
gc_handle: Option<JoinHandle<()>>,
}
// Preserve handles to wait for thread completion
// at shutdown
lazy_static! {
static ref TENANT_HANDLES: Mutex<HashMap<ZTenantId, TenantHandleEntry>> =
Mutex::new(HashMap::new());
}
pub fn start_tenant_threads(conf: &'static PageServerConf, tenantid: ZTenantId) {
//ensure that old threads are stopeed
wait_for_tenant_threads_to_stop(tenantid);
let checkpointer_handle = std::thread::Builder::new()
.name("Checkpointer thread".into())
.spawn(move || {
checkpoint_loop(tenantid, conf).expect("Checkpointer thread died");
})
.ok();
let gc_handle = std::thread::Builder::new()
.name("GC thread".into())
.spawn(move || {
gc_loop(tenantid, conf).expect("GC thread died");
})
.ok();
// TODO handle thread errors if any
let mut handles = TENANT_HANDLES.lock().unwrap();
let h = TenantHandleEntry {
checkpointer_handle,
gc_handle,
};
handles.insert(tenantid, h);
}
pub fn wait_for_tenant_threads_to_stop(tenantid: ZTenantId) {
let mut handles = TENANT_HANDLES.lock().unwrap();
if let Some(h) = handles.get_mut(&tenantid) {
h.checkpointer_handle.take().map(JoinHandle::join);
trace!("checkpointer for tenant {} has stopped", tenantid);
h.gc_handle.take().map(JoinHandle::join);
trace!("gc for tenant {} has stopped", tenantid);
}
handles.remove(&tenantid);
}
///
/// Checkpointer thread's main loop
///
fn checkpoint_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != TenantState::Active {
break;
}
std::thread::sleep(conf.checkpoint_period);
trace!("checkpointer thread for tenant {} waking up", tenantid);
// checkpoint timelines that have accumulated more than CHECKPOINT_DISTANCE
// bytes of WAL since last checkpoint.
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
repo.checkpoint_iteration(CheckpointConfig::Distance(conf.checkpoint_distance))?;
}
trace!(
"checkpointer thread stopped for tenant {} state is {}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}
///
/// GC thread's main loop
///
fn gc_loop(tenantid: ZTenantId, conf: &'static PageServerConf) -> Result<()> {
loop {
if tenant_mgr::get_tenant_state(tenantid) != TenantState::Active {
break;
}
trace!("gc thread for tenant {} waking up", tenantid);
// Garbage collect old files that are not needed for PITR anymore
if conf.gc_horizon > 0 {
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
repo.gc_iteration(None, conf.gc_horizon, false).unwrap();
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = conf.gc_period.as_secs();
while sleep_time > 0 && tenant_mgr::get_tenant_state(tenantid) == TenantState::Active {
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
}
trace!(
"GC thread stopped for tenant {} state is {}",
tenantid,
tenant_mgr::get_tenant_state(tenantid)
);
Ok(())
}