Implement graceful shutdown at 'pageserver stop':

- perform checkpoint for each tenant repository.
- wait for the completion of all threads.

Add new option 'immediate' to 'pageserver stop' command to terminate the pageserver immediately.
This commit is contained in:
anastasia
2021-09-27 15:28:03 +03:00
committed by lubennikovaav
parent b9119f11bf
commit d7c9dd06f4
18 changed files with 448 additions and 117 deletions

12
Cargo.lock generated
View File

@@ -1191,6 +1191,7 @@ dependencies = [
"scopeguard",
"serde",
"serde_json",
"signal-hook",
"tar",
"thiserror",
"tokio",
@@ -1842,6 +1843,17 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42a568c8f2cd051a4d283bd6eb0343ac214c1b0f1ac19f93e1175b2dee38c73d"
[[package]]
name = "signal-hook"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c98891d737e271a2954825ef19e46bd16bdb98e2746f2eec4f7a4ef7946efd1"
dependencies = [
"cc",
"libc",
"signal-hook-registry",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"

View File

@@ -199,23 +199,45 @@ impl PageServerNode {
bail!("pageserver failed to start in {} seconds", RETRIES);
}
pub fn stop(&self) -> anyhow::Result<()> {
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
let pid = read_pidfile(&self.pid_file())?;
let pid = Pid::from_raw(pid);
if kill(pid, Signal::SIGTERM).is_err() {
bail!("Failed to kill pageserver with pid {}", pid);
if immediate {
println!("Stop pageserver immediately");
if kill(pid, Signal::SIGQUIT).is_err() {
bail!("Failed to kill pageserver with pid {}", pid);
}
} else {
println!("Stop pageserver gracefully");
if kill(pid, Signal::SIGTERM).is_err() {
bail!("Failed to stop pageserver with pid {}", pid);
}
}
// wait for pageserver stop
let address = connection_address(&self.pg_connection_config);
for _ in 0..5 {
let stream = TcpStream::connect(&address);
thread::sleep(Duration::from_secs(1));
if let Err(_e) = stream {
println!("Pageserver stopped");
return Ok(());
// TODO Remove this "timeout" and handle it on caller side instead.
// Shutting down may take a long time,
// if pageserver checkpoints a lot of data
for _ in 0..100 {
if let Err(_e) = TcpStream::connect(&address) {
println!("Pageserver stopped receiving connections");
//Now check status
match self.check_status() {
Ok(_) => {
println!("Pageserver status is OK. Wait a bit.");
thread::sleep(Duration::from_secs(1));
}
Err(err) => {
println!("Pageserver status is: {}", err);
return Ok(());
}
}
} else {
println!("Pageserver still receives connections");
thread::sleep(Duration::from_secs(1));
}
println!("Stopping pageserver on {}", address);
}
bail!("Failed to stop pageserver with pid {}", pid);
@@ -313,8 +335,9 @@ impl PageServerNode {
impl Drop for PageServerNode {
fn drop(&mut self) {
// TODO Looks like this flag is never set
if self.kill_on_exit {
let _ = self.stop();
let _ = self.stop(true);
}
}
}

View File

@@ -17,7 +17,7 @@ lazy_static = "1.4.0"
log = "0.4.14"
clap = "2.33.0"
daemonize = "0.4.1"
tokio = { version = "1.11", features = ["process", "macros", "fs"] }
tokio = { version = "1.11", features = ["process", "macros", "fs", "rt"] }
postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
@@ -36,6 +36,7 @@ rust-s3 = { version = "0.27.0-rc4", features = ["no-verify-ssl"] }
async-trait = "0.1"
const_format = "0.2.21"
tracing = "0.1.27"
signal-hook = {version = "0.3.10", features = ["extended-siginfo"] }
postgres_ffi = { path = "../postgres_ffi" }
zenith_metrics = { path = "../zenith_metrics" }

View File

@@ -15,6 +15,15 @@ use tracing::*;
use zenith_utils::{auth::JwtAuth, logging, postgres_backend::AuthType};
use anyhow::{bail, ensure, Context, Result};
use signal_hook::consts::signal::*;
use signal_hook::consts::TERM_SIGNALS;
use signal_hook::flag;
use signal_hook::iterator::exfiltrator::WithOrigin;
use signal_hook::iterator::SignalsInfo;
use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use clap::{App, Arg, ArgMatches};
use daemonize::Daemonize;
@@ -28,6 +37,7 @@ use pageserver::{
RelishStorageKind, S3Config, LOG_FILE_NAME,
};
use zenith_utils::http::endpoint;
use zenith_utils::postgres_backend;
use const_format::formatcp;
@@ -449,6 +459,17 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
// Initialize logger
let log_file = logging::init(LOG_FILE_NAME, conf.daemonize)?;
let term_now = Arc::new(AtomicBool::new(false));
for sig in TERM_SIGNALS {
// When terminated by a second term signal, exit with exit code 1.
// This will do nothing the first time (because term_now is false).
flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))?;
// But this will "arm" the above for the second time, by setting it to true.
// The order of registering these is important, if you put this one first, it will
// first arm and then terminate all in the first round.
flag::register(*sig, Arc::clone(&term_now))?;
}
// TODO: Check that it looks like a valid repository before going further
// bind sockets before daemonizing so we report errors early and do not return until we are listening
@@ -525,13 +546,42 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type)
})?;
join_handles.push(page_service_thread);
for info in SignalsInfo::<WithOrigin>::new(TERM_SIGNALS)?.into_iter() {
match info.signal {
SIGQUIT => {
info!("Got SIGQUIT. Terminate pageserver in immediate shutdown mode");
exit(111);
}
SIGTERM => {
info!("Got SIGINT/SIGTERM. Terminate gracefully in fast shutdown mode");
// Terminate postgres backends
postgres_backend::set_pgbackend_shutdown_requested();
// Stop all tenants and flush their data
tenant_mgr::shutdown_all_tenants()?;
// Wait for pageservice thread to complete the job
page_service_thread
.join()
.expect("thread panicked")
.expect("thread exited with an error");
for handle in join_handles.into_iter() {
handle
.join()
.expect("thread panicked")
.expect("thread exited with an error")
// Shut down http router
endpoint::shutdown();
// Wait for all threads
for handle in join_handles.into_iter() {
handle
.join()
.expect("thread panicked")
.expect("thread exited with an error");
}
info!("Pageserver shut down successfully completed");
exit(0);
}
_ => {
debug!("Unknown signal.");
}
}
}
Ok(())
}

View File

@@ -30,12 +30,15 @@ use std::ops::Bound::Included;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use crate::layered_repository::inmemory_layer::FreezeLayers;
use crate::relish::*;
use crate::relish_storage::schedule_timeline_upload;
use crate::repository::{GcResult, Repository, Timeline, WALRecord};
use crate::tenant_mgr;
use crate::walreceiver;
use crate::walreceiver::IS_WAL_RECEIVER;
use crate::walredo::WalRedoManager;
use crate::PageServerConf;
@@ -215,6 +218,23 @@ impl Repository for LayeredRepository {
self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc)
})
}
// Wait for all threads to complete and persist repository data before pageserver shutdown.
fn shutdown(&self) -> Result<()> {
trace!("LayeredRepository shutdown for tenant {}", self.tenantid);
let timelines = self.timelines.lock().unwrap();
for (timelineid, timeline) in timelines.iter() {
walreceiver::stop_wal_receiver(*timelineid);
// Wait for syncing data to disk
trace!("repo shutdown. checkpoint timeline {}", timelineid);
timeline.checkpoint()?;
//TODO Wait for walredo process to shutdown too
}
Ok(())
}
}
/// Private functions
@@ -298,21 +318,24 @@ impl LayeredRepository {
///
/// Launch the checkpointer thread in given repository.
///
pub fn launch_checkpointer_thread(conf: &'static PageServerConf, rc: Arc<LayeredRepository>) {
let _thread = std::thread::Builder::new()
pub fn launch_checkpointer_thread(
conf: &'static PageServerConf,
rc: Arc<LayeredRepository>,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("Checkpointer thread".into())
.spawn(move || {
// FIXME: relaunch it? Panic is not good.
rc.checkpoint_loop(conf).expect("Checkpointer thread died");
})
.unwrap();
.unwrap()
}
///
/// Checkpointer thread's main loop
///
fn checkpoint_loop(&self, conf: &'static PageServerConf) -> Result<()> {
loop {
while !tenant_mgr::shutdown_requested() {
std::thread::sleep(conf.checkpoint_period);
info!("checkpointer thread for tenant {} waking up", self.tenantid);
@@ -328,40 +351,52 @@ impl LayeredRepository {
STORAGE_TIME
.with_label_values(&["checkpoint_timed"])
.observe_closure_duration(|| {
timeline.checkpoint_internal(conf.checkpoint_distance)
timeline.checkpoint_internal(conf.checkpoint_distance, false)
})?
}
// release lock on 'timelines'
}
}
trace!("Checkpointer thread shut down");
Ok(())
}
///
/// Launch the GC thread in given repository.
///
pub fn launch_gc_thread(conf: &'static PageServerConf, rc: Arc<LayeredRepository>) {
let _thread = std::thread::Builder::new()
pub fn launch_gc_thread(
conf: &'static PageServerConf,
rc: Arc<LayeredRepository>,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("GC thread".into())
.spawn(move || {
// FIXME: relaunch it? Panic is not good.
rc.gc_loop(conf).expect("GC thread died");
})
.unwrap();
.unwrap()
}
///
/// GC thread's main loop
///
fn gc_loop(&self, conf: &'static PageServerConf) -> Result<()> {
loop {
std::thread::sleep(conf.gc_period);
info!("gc thread for tenant {} waking up", self.tenantid);
while !tenant_mgr::shutdown_requested() {
// Garbage collect old files that are not needed for PITR anymore
if conf.gc_horizon > 0 {
self.gc_iteration(None, conf.gc_horizon, false).unwrap();
}
// TODO Write it in more adequate way using
// condvar.wait_timeout() or something
let mut sleep_time = conf.gc_period.as_secs();
while sleep_time > 0 && !tenant_mgr::shutdown_requested() {
sleep_time -= 1;
std::thread::sleep(Duration::from_secs(1));
}
info!("gc thread for tenant {} waking up", self.tenantid);
}
Ok(())
}
/// Save timeline metadata to file
@@ -508,6 +543,10 @@ impl LayeredRepository {
// Ok, we now know all the branch points.
// Perform GC for each timeline.
for timelineid in timelineids {
if tenant_mgr::shutdown_requested() {
return Ok(totals);
}
// We have already loaded all timelines above
// so this operation is just a quick map lookup.
let timeline = self.get_timeline_locked(timelineid, &mut *timelines)?;
@@ -934,7 +973,7 @@ impl Timeline for LayeredTimeline {
STORAGE_TIME
.with_label_values(&["checkpoint_force"])
//pass checkpoint_distance=0 to force checkpoint
.observe_closure_duration(|| self.checkpoint_internal(0))
.observe_closure_duration(|| self.checkpoint_internal(0, true))
}
///
@@ -1309,7 +1348,7 @@ impl LayeredTimeline {
/// Flush to disk all data that was written with the put_* functions
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL.
fn checkpoint_internal(&self, checkpoint_distance: u64) -> Result<()> {
fn checkpoint_internal(&self, checkpoint_distance: u64, forced: bool) -> Result<()> {
// Grab lock on the layer map.
//
// TODO: We hold it locked throughout the checkpoint operation. That's bad,
@@ -1345,6 +1384,10 @@ impl LayeredTimeline {
while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() {
let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
if tenant_mgr::shutdown_requested() && !forced {
return Ok(());
}
// Does this layer need freezing?
//
// Write out all in-memory layers that contain WAL older than CHECKPOINT_DISTANCE.

View File

@@ -25,6 +25,7 @@ use zenith_metrics::{register_histogram_vec, HistogramVec};
use zenith_utils::auth::{self, JwtAuth};
use zenith_utils::auth::{Claims, Scope};
use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend::is_socket_read_timed_out;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::postgres_backend::{self, AuthType};
use zenith_utils::pq_proto::{
@@ -187,17 +188,32 @@ pub fn thread_main(
listener: TcpListener,
auth_type: AuthType,
) -> anyhow::Result<()> {
loop {
let mut join_handles = Vec::new();
while !tenant_mgr::shutdown_requested() {
let (socket, peer_addr) = listener.accept()?;
debug!("accepted connection from {}", peer_addr);
socket.set_nodelay(true).unwrap();
let local_auth = auth.clone();
thread::spawn(move || {
if let Err(err) = page_service_conn_main(conf, local_auth, socket, auth_type) {
error!(%err, "page server thread exited with error");
}
});
let handle = thread::Builder::new()
.name("serving Page Service thread".into())
.spawn(move || {
if let Err(err) = page_service_conn_main(conf, local_auth, socket, auth_type) {
error!(%err, "page server thread exited with error");
}
})
.unwrap();
join_handles.push(handle);
}
debug!("page_service loop terminated. wait for connections to cancel");
for handle in join_handles.into_iter() {
handle.join().unwrap();
}
Ok(())
}
fn page_service_conn_main(
@@ -216,7 +232,7 @@ fn page_service_conn_main(
}
let mut conn_handler = PageServerHandler::new(conf, auth);
let pgbackend = PostgresBackend::new(socket, auth_type, None)?;
let pgbackend = PostgresBackend::new(socket, auth_type, None, true)?;
pgbackend.run(&mut conn_handler)
}
@@ -268,44 +284,58 @@ impl PageServerHandler {
/* switch client to COPYBOTH */
pgb.write_message(&BeMessage::CopyBothResponse)?;
while let Some(message) = pgb.read_message()? {
trace!("query: {:?}", message);
while !tenant_mgr::shutdown_requested() {
match pgb.read_message() {
Ok(message) => {
if let Some(message) = message {
trace!("query: {:?}", message);
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
_ => continue,
};
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
_ => continue,
};
let zenith_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
let zenith_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
let response = match zenith_fe_msg {
PagestreamFeMessage::Exists(req) => SMGR_QUERY_TIME
.with_label_values(&["get_rel_exists"])
.observe_closure_duration(|| {
self.handle_get_rel_exists_request(&*timeline, &req)
}),
PagestreamFeMessage::Nblocks(req) => SMGR_QUERY_TIME
.with_label_values(&["get_rel_size"])
.observe_closure_duration(|| self.handle_get_nblocks_request(&*timeline, &req)),
PagestreamFeMessage::GetPage(req) => SMGR_QUERY_TIME
.with_label_values(&["get_page_at_lsn"])
.observe_closure_duration(|| {
self.handle_get_page_at_lsn_request(&*timeline, &req)
}),
};
let response = match zenith_fe_msg {
PagestreamFeMessage::Exists(req) => SMGR_QUERY_TIME
.with_label_values(&["get_rel_exists"])
.observe_closure_duration(|| {
self.handle_get_rel_exists_request(&*timeline, &req)
}),
PagestreamFeMessage::Nblocks(req) => SMGR_QUERY_TIME
.with_label_values(&["get_rel_size"])
.observe_closure_duration(|| {
self.handle_get_nblocks_request(&*timeline, &req)
}),
PagestreamFeMessage::GetPage(req) => SMGR_QUERY_TIME
.with_label_values(&["get_page_at_lsn"])
.observe_closure_duration(|| {
self.handle_get_page_at_lsn_request(&*timeline, &req)
}),
};
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:#}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:#}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
} else {
break;
}
}
Err(e) => {
if !is_socket_read_timed_out(&e) {
return Err(e);
}
}
}
}
Ok(())
}

View File

@@ -1,6 +1,7 @@
use std::time::Duration;
use std::{collections::BinaryHeap, sync::Mutex, thread};
use crate::tenant_mgr;
use crate::{relish_storage::RelishStorage, PageServerConf};
lazy_static::lazy_static! {
@@ -31,22 +32,26 @@ pub fn run_storage_sync_thread<
let handle = thread::Builder::new()
.name("Queue based relish storage sync".to_string())
.spawn(move || loop {
let mut queue_accessor = UPLOAD_QUEUE.lock().unwrap();
log::debug!("Upload queue length: {}", queue_accessor.len());
let next_task = queue_accessor.pop();
drop(queue_accessor);
match next_task {
Some(task) => runtime.block_on(async {
// suppress warnings
let _ = (config, task, &relish_storage, max_concurrent_sync);
todo!("omitted for brevity")
}),
None => {
thread::sleep(Duration::from_secs(1));
continue;
.spawn(move || {
while !tenant_mgr::shutdown_requested() {
let mut queue_accessor = UPLOAD_QUEUE.lock().unwrap();
log::debug!("Upload queue length: {}", queue_accessor.len());
let next_task = queue_accessor.pop();
drop(queue_accessor);
match next_task {
Some(task) => runtime.block_on(async {
// suppress warnings
let _ = (config, task, &relish_storage, max_concurrent_sync);
todo!("omitted for brevity")
}),
None => {
thread::sleep(Duration::from_secs(1));
continue;
}
}
}
log::debug!("Queue based relish storage sync thread shut down");
Ok(())
})?;
Ok(Some(handle))
}

View File

@@ -13,6 +13,8 @@ use zenith_utils::zid::ZTimelineId;
/// A repository corresponds to one .zenith directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
pub trait Repository: Send + Sync {
fn shutdown(&self) -> Result<()>;
/// Get Timeline handle for given zenith timeline ID.
fn get_timeline(&self, timelineid: ZTimelineId) -> Result<Arc<dyn Timeline>>;

View File

@@ -8,12 +8,14 @@ use crate::walredo::PostgresRedoManager;
use crate::PageServerConf;
use anyhow::{anyhow, bail, Context, Result};
use lazy_static::lazy_static;
use log::info;
use log::{debug, info};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fs;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread::JoinHandle;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
lazy_static! {
@@ -24,6 +26,19 @@ lazy_static! {
fn access_repository() -> MutexGuard<'static, HashMap<ZTenantId, Arc<dyn Repository>>> {
REPOSITORY.lock().unwrap()
}
struct TenantHandleEntry {
checkpointer_handle: Option<JoinHandle<()>>,
gc_handle: Option<JoinHandle<()>>,
}
// Logically these handles belong to Repository,
// but it's just simpler to store them separately
lazy_static! {
static ref TENANT_HANDLES: Mutex<HashMap<ZTenantId, TenantHandleEntry>> =
Mutex::new(HashMap::new());
}
static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
pub fn init(conf: &'static PageServerConf) {
let mut m = access_repository();
@@ -47,8 +62,18 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Arc<Layered
tenant_id,
true,
));
LayeredRepository::launch_checkpointer_thread(conf, repo.clone());
LayeredRepository::launch_gc_thread(conf, repo.clone());
let checkpointer_handle = LayeredRepository::launch_checkpointer_thread(conf, repo.clone());
let gc_handle = LayeredRepository::launch_gc_thread(conf, repo.clone());
let mut handles = TENANT_HANDLES.lock().unwrap();
let h = TenantHandleEntry {
checkpointer_handle: Some(checkpointer_handle),
gc_handle: Some(gc_handle),
};
handles.insert(tenant_id, h);
repo
}
@@ -82,6 +107,35 @@ fn init_timeline(repo: &dyn Repository, timeline_id: ZTimelineId) {
}
}
// Check this flag in the thread loops to know when to exit
pub fn shutdown_requested() -> bool {
SHUTDOWN_REQUESTED.load(Ordering::Relaxed)
}
pub fn stop_tenant_threads(tenantid: ZTenantId) {
let mut handles = TENANT_HANDLES.lock().unwrap();
if let Some(h) = handles.get_mut(&tenantid) {
h.checkpointer_handle.take().map(JoinHandle::join);
debug!("checkpointer for tenant {} has stopped", tenantid);
h.gc_handle.take().map(JoinHandle::join);
debug!("gc for tenant {} has stopped", tenantid);
}
}
pub fn shutdown_all_tenants() -> Result<()> {
SHUTDOWN_REQUESTED.swap(true, Ordering::Relaxed);
let tenants = list_tenants()?;
for tenantid in tenants {
stop_tenant_threads(tenantid);
let repo = get_repository_for_tenant(tenantid)?;
debug!("shutdown tenant {}", tenantid);
repo.shutdown()?;
}
Ok(())
}
pub fn create_repository_for_tenant(
conf: &'static PageServerConf,
tenantid: ZTenantId,
@@ -115,3 +169,14 @@ pub fn get_timeline_for_tenant(
.get_timeline(timelineid)
.with_context(|| format!("cannot fetch timeline {}", timelineid))
}
fn list_tenants() -> Result<Vec<ZTenantId>> {
let o = &mut REPOSITORY.lock().unwrap();
o.iter()
.map(|tenant| {
let (tenantid, _) = tenant;
Ok(*tenantid)
})
.collect()
}

View File

@@ -24,6 +24,7 @@ use std::str::FromStr;
use std::sync::Mutex;
use std::thread;
use std::thread::sleep;
use std::thread::JoinHandle;
use std::thread_local;
use std::time::{Duration, SystemTime};
use tracing::*;
@@ -36,6 +37,7 @@ use zenith_utils::zid::ZTimelineId;
//
struct WalReceiverEntry {
wal_producer_connstr: String,
wal_receiver_handle: Option<JoinHandle<()>>,
}
lazy_static! {
@@ -50,6 +52,19 @@ thread_local! {
pub(crate) static IS_WAL_RECEIVER: Cell<bool> = Cell::new(false);
}
// Wait for walreceiver to stop
// Now it stops when pageserver shutdown is requested.
// In future we can make this more granular and send shutdown signals
// per tenant/timeline to cancel inactive walreceivers.
// TODO deal with blocking pg connections
pub fn stop_wal_receiver(timelineid: ZTimelineId) {
let mut receivers = WAL_RECEIVERS.lock().unwrap();
if let Some(r) = receivers.get_mut(&timelineid) {
r.wal_receiver_handle.take();
// r.wal_receiver_handle.take().map(JoinHandle::join);
}
}
// Launch a new WAL receiver, or tell one that's running about change in connection string
pub fn launch_wal_receiver(
conf: &'static PageServerConf,
@@ -64,19 +79,19 @@ pub fn launch_wal_receiver(
receiver.wal_producer_connstr = wal_producer_connstr.into();
}
None => {
let receiver = WalReceiverEntry {
wal_producer_connstr: wal_producer_connstr.into(),
};
receivers.insert(timelineid, receiver);
// Also launch a new thread to handle this connection
let _walreceiver_thread = thread::Builder::new()
let wal_receiver_handle = thread::Builder::new()
.name("WAL receiver thread".into())
.spawn(move || {
IS_WAL_RECEIVER.with(|c| c.set(true));
thread_main(conf, timelineid, tenantid);
})
.unwrap();
let receiver = WalReceiverEntry {
wal_producer_connstr: wal_producer_connstr.into(),
wal_receiver_handle: Some(wal_receiver_handle),
};
receivers.insert(timelineid, receiver);
}
};
}
@@ -103,7 +118,7 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid:
// Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server,
// and start streaming WAL from it. If the connection is lost, keep retrying.
//
loop {
while !tenant_mgr::shutdown_requested() {
// Look up the current WAL producer address
let wal_producer_connstr = get_wal_producer_connstr(timelineid);
@@ -117,6 +132,7 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid:
sleep(Duration::from_secs(1));
}
}
debug!("WAL streaming shut down");
}
fn walreceiver_main(
@@ -273,6 +289,11 @@ fn walreceiver_main(
physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?;
}
if tenant_mgr::shutdown_requested() {
debug!("stop walreceiver because pageserver shutdown is requested");
break;
}
}
Ok(())
}

View File

@@ -34,7 +34,7 @@ pub fn thread_main(state: &'static ProxyState, listener: TcpListener) -> anyhow:
pub fn mgmt_conn_main(state: &'static ProxyState, socket: TcpStream) -> anyhow::Result<()> {
let mut conn_handler = MgmtHandler { state };
let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None)?;
let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None, true)?;
pgbackend.run(&mut conn_handler)
}

View File

@@ -64,6 +64,7 @@ pub fn proxy_conn_main(
socket,
postgres_backend::AuthType::MD5,
state.conf.ssl_config.clone(),
false,
)?,
md5_salt: [0u8; 4],
psql_session_id: "".into(),

View File

@@ -375,6 +375,7 @@ class ZenithPageserver(PgProtocol):
Start the page server.
Returns self.
"""
assert self.running == False
self.zenith_cli.run(['start'])
self.running = True
@@ -382,14 +383,18 @@ class ZenithPageserver(PgProtocol):
self.initial_tenant = self.zenith_cli.run(['tenant', 'list']).stdout.strip()
return self
def stop(self) -> 'ZenithPageserver':
def stop(self, immediate=False) -> 'ZenithPageserver':
"""
Stop the page server.
Returns self.
"""
cmd = ['stop']
if immediate:
cmd.append('immediate')
print(cmd)
if self.running:
self.zenith_cli.run(['stop'])
self.zenith_cli.run(cmd)
self.running = False
return self
@@ -398,7 +403,7 @@ class ZenithPageserver(PgProtocol):
return self
def __exit__(self, exc_type, exc, tb):
self.stop()
self.stop(True)
@cached_property
def auth_keys(self) -> AuthKeys:
@@ -444,7 +449,7 @@ def pageserver(zenith_cli: ZenithCli, repo_dir: str, pageserver_port: Pageserver
# After the yield comes any cleanup code we need.
print('Starting pageserver cleanup')
ps.stop()
ps.stop(True)
class PgBin:
""" A helper class for executing postgres binaries """

View File

@@ -41,7 +41,7 @@ fn handle_socket(socket: TcpStream, conf: WalAcceptorConf) -> Result<()> {
socket.set_nodelay(true)?;
let mut conn_handler = SendWalHandler::new(conf);
let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None)?;
let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None, false)?;
// libpq replication protocol between wal_acceptor and replicas/pagers
pgbackend.run(&mut conn_handler)?;

View File

@@ -88,7 +88,12 @@ fn main() -> Result<()> {
)
.subcommand(SubCommand::with_name("status"))
.subcommand(SubCommand::with_name("start").about("Start local pageserver"))
.subcommand(SubCommand::with_name("stop").about("Stop local pageserver"))
.subcommand(SubCommand::with_name("stop").about("Stop local pageserver")
.arg(Arg::with_name("immediate")
.help("Don't flush repository data at shutdown")
.required(false)
)
)
.subcommand(SubCommand::with_name("restart").about("Restart local pageserver"))
.subcommand(
SubCommand::with_name("pg")
@@ -196,10 +201,12 @@ fn main() -> Result<()> {
}
}
("stop", Some(_sub_m)) => {
("stop", Some(stop_match)) => {
let pageserver = PageServerNode::from_env(&env);
if let Err(e) = pageserver.stop() {
let immediate = stop_match.is_present("immediate");
if let Err(e) = pageserver.stop(immediate) {
eprintln!("pageserver stop failed: {}", e);
exit(1);
}
@@ -208,7 +215,8 @@ fn main() -> Result<()> {
("restart", Some(_sub_m)) => {
let pageserver = PageServerNode::from_env(&env);
if let Err(e) = pageserver.stop() {
//TODO what shutdown strategy should we use here?
if let Err(e) = pageserver.stop(false) {
eprintln!("pageserver stop failed: {}", e);
exit(1);
}

View File

@@ -12,8 +12,17 @@ use std::net::TcpListener;
use zenith_metrics::{new_common_metric_name, register_int_counter, IntCounter};
use zenith_metrics::{Encoder, TextEncoder};
use std::sync::Mutex;
use tokio::sync::oneshot::Sender;
use super::error::ApiError;
lazy_static! {
/// Channel used to send shutdown signal - wrapped in an Option to allow
/// it to be taken by value (since oneshot channels consume themselves on send)
static ref SHUTDOWN_SENDER: Mutex<Option<Sender<()>>> = Mutex::new(None);
}
lazy_static! {
static ref SERVE_METRICS_COUNT: IntCounter = register_int_counter!(
new_common_metric_name("serve_metrics_count"),
@@ -143,11 +152,18 @@ pub fn check_permission(req: &Request<Body>, tenantid: Option<ZTenantId>) -> Res
}
}
// Send shutdown signal
pub fn shutdown() {
if let Some(tx) = SHUTDOWN_SENDER.lock().unwrap().take() {
let _ = tx.send(());
}
}
pub fn serve_thread_main(
router_builder: RouterBuilder<hyper::Body, ApiError>,
listener: TcpListener,
) -> anyhow::Result<()> {
log::info!("Starting a http endoint at {}", listener.local_addr()?);
log::info!("Starting a http endpoint at {}", listener.local_addr()?);
// Create a Service from the router above to handle incoming requests.
let service = RouterService::new(router_builder.build().map_err(|err| anyhow!(err))?).unwrap();
@@ -159,7 +175,14 @@ pub fn serve_thread_main(
let _guard = runtime.enter();
let server = Server::from_tcp(listener)?.serve(service);
let (send, recv) = tokio::sync::oneshot::channel::<()>();
*SHUTDOWN_SENDER.lock().unwrap() = Some(send);
let server = Server::from_tcp(listener)?
.serve(service)
.with_graceful_shutdown(async {
recv.await.ok();
});
runtime.block_on(server)?;

View File

@@ -13,7 +13,11 @@ use serde::{Deserialize, Serialize};
use std::io::{self, Write};
use std::net::{Shutdown, SocketAddr, TcpStream};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
static PGBACKEND_SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
pub trait Handler {
/// Handle single query.
@@ -135,13 +139,32 @@ pub fn query_from_cstring(query_string: Bytes) -> Vec<u8> {
query_string
}
// Helper function for socket read loops
pub fn is_socket_read_timed_out(error: &anyhow::Error) -> bool {
for cause in error.chain() {
if let Some(io_error) = cause.downcast_ref::<io::Error>() {
if io_error.kind() == std::io::ErrorKind::WouldBlock {
return true;
}
}
}
false
}
impl PostgresBackend {
pub fn new(
socket: TcpStream,
auth_type: AuthType,
tls_config: Option<Arc<rustls::ServerConfig>>,
set_read_timeout: bool,
) -> io::Result<Self> {
let peer_addr = socket.peer_addr()?;
if set_read_timeout {
socket
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
}
Ok(Self {
stream: Some(Stream::Bidirectional(BidiStream::from_tcp(socket))),
buf_out: BytesMut::with_capacity(10 * 1024),
@@ -229,12 +252,26 @@ impl PostgresBackend {
let mut unnamed_query_string = Bytes::new();
while let Some(msg) = self.read_message()? {
trace!("got message {:?}", msg);
while !PGBACKEND_SHUTDOWN_REQUESTED.load(Ordering::Relaxed) {
match self.read_message() {
Ok(message) => {
if let Some(msg) = message {
trace!("got message {:?}", msg);
match self.process_message(handler, msg, &mut unnamed_query_string)? {
ProcessMsgResult::Continue => continue,
ProcessMsgResult::Break => break,
match self.process_message(handler, msg, &mut unnamed_query_string)? {
ProcessMsgResult::Continue => continue,
ProcessMsgResult::Break => break,
}
} else {
break;
}
}
Err(e) => {
// If it is a timeout error, continue the loop
if !is_socket_read_timed_out(&e) {
return Err(e);
}
}
}
}
@@ -427,3 +464,8 @@ impl PostgresBackend {
Ok(ProcessMsgResult::Continue)
}
}
// Set the flag to inform connections to cancel
pub fn set_pgbackend_shutdown_requested() {
PGBACKEND_SHUTDOWN_REQUESTED.swap(true, Ordering::Relaxed);
}

View File

@@ -110,7 +110,7 @@ fn ssl() {
.unwrap();
let tls_config = Some(Arc::new(cfg));
let pgb = PostgresBackend::new(server_sock, AuthType::Trust, tls_config).unwrap();
let pgb = PostgresBackend::new(server_sock, AuthType::Trust, tls_config, true).unwrap();
pgb.run(&mut handler).unwrap();
assert!(handler.got_query);
@@ -150,7 +150,7 @@ fn no_ssl() {
let mut handler = TestHandler;
let pgb = PostgresBackend::new(server_sock, AuthType::Trust, None).unwrap();
let pgb = PostgresBackend::new(server_sock, AuthType::Trust, None, true).unwrap();
pgb.run(&mut handler).unwrap();
client_jh.join().unwrap();
@@ -214,7 +214,7 @@ fn server_forces_ssl() {
.unwrap();
let tls_config = Some(Arc::new(cfg));
let pgb = PostgresBackend::new(server_sock, AuthType::Trust, tls_config).unwrap();
let pgb = PostgresBackend::new(server_sock, AuthType::Trust, tls_config, true).unwrap();
let res = pgb.run(&mut handler).unwrap_err();
assert_eq!("client did not connect with TLS", format!("{}", res));