From d7c9dd06f42c17a5c0d1ae2a075032d3e63671e8 Mon Sep 17 00:00:00 2001 From: anastasia Date: Mon, 27 Sep 2021 15:28:03 +0300 Subject: [PATCH] Implement graceful shutdown at 'pageserver stop': - perform checkpoint for each tenant repository. - wait for the completion of all threads. Add new option 'immediate' to 'pageserver stop' command to terminate the pageserver immediately. --- Cargo.lock | 12 ++ control_plane/src/storage.rs | 47 ++++++-- pageserver/Cargo.toml | 3 +- pageserver/src/bin/pageserver.rs | 62 +++++++++- pageserver/src/layered_repository.rs | 71 +++++++++--- pageserver/src/page_service.rs | 108 +++++++++++------- .../src/relish_storage/synced_storage.rs | 33 +++--- pageserver/src/repository.rs | 2 + pageserver/src/tenant_mgr.rs | 71 +++++++++++- pageserver/src/walreceiver.rs | 37 ++++-- proxy/src/mgmt.rs | 2 +- proxy/src/proxy.rs | 1 + test_runner/fixtures/zenith_fixtures.py | 13 ++- walkeeper/src/wal_service.rs | 2 +- zenith/src/main.rs | 16 ++- zenith_utils/src/http/endpoint.rs | 27 ++++- zenith_utils/src/postgres_backend.rs | 52 ++++++++- zenith_utils/tests/ssl_test.rs | 6 +- 18 files changed, 448 insertions(+), 117 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f7ce2bf543..6ea1fd5593 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1191,6 +1191,7 @@ dependencies = [ "scopeguard", "serde", "serde_json", + "signal-hook", "tar", "thiserror", "tokio", @@ -1842,6 +1843,17 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42a568c8f2cd051a4d283bd6eb0343ac214c1b0f1ac19f93e1175b2dee38c73d" +[[package]] +name = "signal-hook" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c98891d737e271a2954825ef19e46bd16bdb98e2746f2eec4f7a4ef7946efd1" +dependencies = [ + "cc", + "libc", + "signal-hook-registry", +] + [[package]] name = "signal-hook-registry" version = "1.4.0" diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 9d762c360f..3d331ca2a7 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -199,23 +199,45 @@ impl PageServerNode { bail!("pageserver failed to start in {} seconds", RETRIES); } - pub fn stop(&self) -> anyhow::Result<()> { + pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { let pid = read_pidfile(&self.pid_file())?; let pid = Pid::from_raw(pid); - if kill(pid, Signal::SIGTERM).is_err() { - bail!("Failed to kill pageserver with pid {}", pid); + if immediate { + println!("Stop pageserver immediately"); + if kill(pid, Signal::SIGQUIT).is_err() { + bail!("Failed to kill pageserver with pid {}", pid); + } + } else { + println!("Stop pageserver gracefully"); + if kill(pid, Signal::SIGTERM).is_err() { + bail!("Failed to stop pageserver with pid {}", pid); + } } - // wait for pageserver stop let address = connection_address(&self.pg_connection_config); - for _ in 0..5 { - let stream = TcpStream::connect(&address); - thread::sleep(Duration::from_secs(1)); - if let Err(_e) = stream { - println!("Pageserver stopped"); - return Ok(()); + + // TODO Remove this "timeout" and handle it on caller side instead. + // Shutting down may take a long time, + // if pageserver checkpoints a lot of data + for _ in 0..100 { + if let Err(_e) = TcpStream::connect(&address) { + println!("Pageserver stopped receiving connections"); + + //Now check status + match self.check_status() { + Ok(_) => { + println!("Pageserver status is OK. Wait a bit."); + thread::sleep(Duration::from_secs(1)); + } + Err(err) => { + println!("Pageserver status is: {}", err); + return Ok(()); + } + } + } else { + println!("Pageserver still receives connections"); + thread::sleep(Duration::from_secs(1)); } - println!("Stopping pageserver on {}", address); } bail!("Failed to stop pageserver with pid {}", pid); @@ -313,8 +335,9 @@ impl PageServerNode { impl Drop for PageServerNode { fn drop(&mut self) { + // TODO Looks like this flag is never set if self.kill_on_exit { - let _ = self.stop(); + let _ = self.stop(true); } } } diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index abbade4355..33c911c840 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -17,7 +17,7 @@ lazy_static = "1.4.0" log = "0.4.14" clap = "2.33.0" daemonize = "0.4.1" -tokio = { version = "1.11", features = ["process", "macros", "fs"] } +tokio = { version = "1.11", features = ["process", "macros", "fs", "rt"] } postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } @@ -36,6 +36,7 @@ rust-s3 = { version = "0.27.0-rc4", features = ["no-verify-ssl"] } async-trait = "0.1" const_format = "0.2.21" tracing = "0.1.27" +signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } postgres_ffi = { path = "../postgres_ffi" } zenith_metrics = { path = "../zenith_metrics" } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index d6cf86ba24..4e2c02a83a 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -15,6 +15,15 @@ use tracing::*; use zenith_utils::{auth::JwtAuth, logging, postgres_backend::AuthType}; use anyhow::{bail, ensure, Context, Result}; +use signal_hook::consts::signal::*; +use signal_hook::consts::TERM_SIGNALS; +use signal_hook::flag; +use signal_hook::iterator::exfiltrator::WithOrigin; +use signal_hook::iterator::SignalsInfo; +use std::process::exit; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; + use clap::{App, Arg, ArgMatches}; use daemonize::Daemonize; @@ -28,6 +37,7 @@ use pageserver::{ RelishStorageKind, S3Config, LOG_FILE_NAME, }; use zenith_utils::http::endpoint; +use zenith_utils::postgres_backend; use const_format::formatcp; @@ -449,6 +459,17 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { // Initialize logger let log_file = logging::init(LOG_FILE_NAME, conf.daemonize)?; + let term_now = Arc::new(AtomicBool::new(false)); + for sig in TERM_SIGNALS { + // When terminated by a second term signal, exit with exit code 1. + // This will do nothing the first time (because term_now is false). + flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))?; + // But this will "arm" the above for the second time, by setting it to true. + // The order of registering these is important, if you put this one first, it will + // first arm and then terminate ‒ all in the first round. + flag::register(*sig, Arc::clone(&term_now))?; + } + // TODO: Check that it looks like a valid repository before going further // bind sockets before daemonizing so we report errors early and do not return until we are listening @@ -525,13 +546,42 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { page_service::thread_main(conf, auth, pageserver_listener, conf.auth_type) })?; - join_handles.push(page_service_thread); + for info in SignalsInfo::::new(TERM_SIGNALS)?.into_iter() { + match info.signal { + SIGQUIT => { + info!("Got SIGQUIT. Terminate pageserver in immediate shutdown mode"); + exit(111); + } + SIGTERM => { + info!("Got SIGINT/SIGTERM. Terminate gracefully in fast shutdown mode"); + // Terminate postgres backends + postgres_backend::set_pgbackend_shutdown_requested(); + // Stop all tenants and flush their data + tenant_mgr::shutdown_all_tenants()?; + // Wait for pageservice thread to complete the job + page_service_thread + .join() + .expect("thread panicked") + .expect("thread exited with an error"); - for handle in join_handles.into_iter() { - handle - .join() - .expect("thread panicked") - .expect("thread exited with an error") + // Shut down http router + endpoint::shutdown(); + + // Wait for all threads + for handle in join_handles.into_iter() { + handle + .join() + .expect("thread panicked") + .expect("thread exited with an error"); + } + info!("Pageserver shut down successfully completed"); + exit(0); + } + _ => { + debug!("Unknown signal."); + } + } } + Ok(()) } diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 4c9e03999e..b3164f6cc1 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -30,12 +30,15 @@ use std::ops::Bound::Included; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; +use std::thread::JoinHandle; use std::time::{Duration, Instant}; use crate::layered_repository::inmemory_layer::FreezeLayers; use crate::relish::*; use crate::relish_storage::schedule_timeline_upload; use crate::repository::{GcResult, Repository, Timeline, WALRecord}; +use crate::tenant_mgr; +use crate::walreceiver; use crate::walreceiver::IS_WAL_RECEIVER; use crate::walredo::WalRedoManager; use crate::PageServerConf; @@ -215,6 +218,23 @@ impl Repository for LayeredRepository { self.gc_iteration_internal(target_timelineid, horizon, checkpoint_before_gc) }) } + + // Wait for all threads to complete and persist repository data before pageserver shutdown. + fn shutdown(&self) -> Result<()> { + trace!("LayeredRepository shutdown for tenant {}", self.tenantid); + + let timelines = self.timelines.lock().unwrap(); + for (timelineid, timeline) in timelines.iter() { + walreceiver::stop_wal_receiver(*timelineid); + // Wait for syncing data to disk + trace!("repo shutdown. checkpoint timeline {}", timelineid); + timeline.checkpoint()?; + + //TODO Wait for walredo process to shutdown too + } + + Ok(()) + } } /// Private functions @@ -298,21 +318,24 @@ impl LayeredRepository { /// /// Launch the checkpointer thread in given repository. /// - pub fn launch_checkpointer_thread(conf: &'static PageServerConf, rc: Arc) { - let _thread = std::thread::Builder::new() + pub fn launch_checkpointer_thread( + conf: &'static PageServerConf, + rc: Arc, + ) -> JoinHandle<()> { + std::thread::Builder::new() .name("Checkpointer thread".into()) .spawn(move || { // FIXME: relaunch it? Panic is not good. rc.checkpoint_loop(conf).expect("Checkpointer thread died"); }) - .unwrap(); + .unwrap() } /// /// Checkpointer thread's main loop /// fn checkpoint_loop(&self, conf: &'static PageServerConf) -> Result<()> { - loop { + while !tenant_mgr::shutdown_requested() { std::thread::sleep(conf.checkpoint_period); info!("checkpointer thread for tenant {} waking up", self.tenantid); @@ -328,40 +351,52 @@ impl LayeredRepository { STORAGE_TIME .with_label_values(&["checkpoint_timed"]) .observe_closure_duration(|| { - timeline.checkpoint_internal(conf.checkpoint_distance) + timeline.checkpoint_internal(conf.checkpoint_distance, false) })? } // release lock on 'timelines' } } + trace!("Checkpointer thread shut down"); + Ok(()) } /// /// Launch the GC thread in given repository. /// - pub fn launch_gc_thread(conf: &'static PageServerConf, rc: Arc) { - let _thread = std::thread::Builder::new() + pub fn launch_gc_thread( + conf: &'static PageServerConf, + rc: Arc, + ) -> JoinHandle<()> { + std::thread::Builder::new() .name("GC thread".into()) .spawn(move || { // FIXME: relaunch it? Panic is not good. rc.gc_loop(conf).expect("GC thread died"); }) - .unwrap(); + .unwrap() } /// /// GC thread's main loop /// fn gc_loop(&self, conf: &'static PageServerConf) -> Result<()> { - loop { - std::thread::sleep(conf.gc_period); - info!("gc thread for tenant {} waking up", self.tenantid); - + while !tenant_mgr::shutdown_requested() { // Garbage collect old files that are not needed for PITR anymore if conf.gc_horizon > 0 { self.gc_iteration(None, conf.gc_horizon, false).unwrap(); } + + // TODO Write it in more adequate way using + // condvar.wait_timeout() or something + let mut sleep_time = conf.gc_period.as_secs(); + while sleep_time > 0 && !tenant_mgr::shutdown_requested() { + sleep_time -= 1; + std::thread::sleep(Duration::from_secs(1)); + } + info!("gc thread for tenant {} waking up", self.tenantid); } + Ok(()) } /// Save timeline metadata to file @@ -508,6 +543,10 @@ impl LayeredRepository { // Ok, we now know all the branch points. // Perform GC for each timeline. for timelineid in timelineids { + if tenant_mgr::shutdown_requested() { + return Ok(totals); + } + // We have already loaded all timelines above // so this operation is just a quick map lookup. let timeline = self.get_timeline_locked(timelineid, &mut *timelines)?; @@ -934,7 +973,7 @@ impl Timeline for LayeredTimeline { STORAGE_TIME .with_label_values(&["checkpoint_force"]) //pass checkpoint_distance=0 to force checkpoint - .observe_closure_duration(|| self.checkpoint_internal(0)) + .observe_closure_duration(|| self.checkpoint_internal(0, true)) } /// @@ -1309,7 +1348,7 @@ impl LayeredTimeline { /// Flush to disk all data that was written with the put_* functions /// /// NOTE: This has nothing to do with checkpoint in PostgreSQL. - fn checkpoint_internal(&self, checkpoint_distance: u64) -> Result<()> { + fn checkpoint_internal(&self, checkpoint_distance: u64, forced: bool) -> Result<()> { // Grab lock on the layer map. // // TODO: We hold it locked throughout the checkpoint operation. That's bad, @@ -1345,6 +1384,10 @@ impl LayeredTimeline { while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() { let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); + if tenant_mgr::shutdown_requested() && !forced { + return Ok(()); + } + // Does this layer need freezing? // // Write out all in-memory layers that contain WAL older than CHECKPOINT_DISTANCE. diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 32b8746f09..be849ce35f 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -25,6 +25,7 @@ use zenith_metrics::{register_histogram_vec, HistogramVec}; use zenith_utils::auth::{self, JwtAuth}; use zenith_utils::auth::{Claims, Scope}; use zenith_utils::lsn::Lsn; +use zenith_utils::postgres_backend::is_socket_read_timed_out; use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::postgres_backend::{self, AuthType}; use zenith_utils::pq_proto::{ @@ -187,17 +188,32 @@ pub fn thread_main( listener: TcpListener, auth_type: AuthType, ) -> anyhow::Result<()> { - loop { + let mut join_handles = Vec::new(); + + while !tenant_mgr::shutdown_requested() { let (socket, peer_addr) = listener.accept()?; debug!("accepted connection from {}", peer_addr); socket.set_nodelay(true).unwrap(); let local_auth = auth.clone(); - thread::spawn(move || { - if let Err(err) = page_service_conn_main(conf, local_auth, socket, auth_type) { - error!(%err, "page server thread exited with error"); - } - }); + + let handle = thread::Builder::new() + .name("serving Page Service thread".into()) + .spawn(move || { + if let Err(err) = page_service_conn_main(conf, local_auth, socket, auth_type) { + error!(%err, "page server thread exited with error"); + } + }) + .unwrap(); + + join_handles.push(handle); } + + debug!("page_service loop terminated. wait for connections to cancel"); + for handle in join_handles.into_iter() { + handle.join().unwrap(); + } + + Ok(()) } fn page_service_conn_main( @@ -216,7 +232,7 @@ fn page_service_conn_main( } let mut conn_handler = PageServerHandler::new(conf, auth); - let pgbackend = PostgresBackend::new(socket, auth_type, None)?; + let pgbackend = PostgresBackend::new(socket, auth_type, None, true)?; pgbackend.run(&mut conn_handler) } @@ -268,44 +284,58 @@ impl PageServerHandler { /* switch client to COPYBOTH */ pgb.write_message(&BeMessage::CopyBothResponse)?; - while let Some(message) = pgb.read_message()? { - trace!("query: {:?}", message); + while !tenant_mgr::shutdown_requested() { + match pgb.read_message() { + Ok(message) => { + if let Some(message) = message { + trace!("query: {:?}", message); - let copy_data_bytes = match message { - FeMessage::CopyData(bytes) => bytes, - _ => continue, - }; + let copy_data_bytes = match message { + FeMessage::CopyData(bytes) => bytes, + _ => continue, + }; - let zenith_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?; + let zenith_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?; - let response = match zenith_fe_msg { - PagestreamFeMessage::Exists(req) => SMGR_QUERY_TIME - .with_label_values(&["get_rel_exists"]) - .observe_closure_duration(|| { - self.handle_get_rel_exists_request(&*timeline, &req) - }), - PagestreamFeMessage::Nblocks(req) => SMGR_QUERY_TIME - .with_label_values(&["get_rel_size"]) - .observe_closure_duration(|| self.handle_get_nblocks_request(&*timeline, &req)), - PagestreamFeMessage::GetPage(req) => SMGR_QUERY_TIME - .with_label_values(&["get_page_at_lsn"]) - .observe_closure_duration(|| { - self.handle_get_page_at_lsn_request(&*timeline, &req) - }), - }; + let response = match zenith_fe_msg { + PagestreamFeMessage::Exists(req) => SMGR_QUERY_TIME + .with_label_values(&["get_rel_exists"]) + .observe_closure_duration(|| { + self.handle_get_rel_exists_request(&*timeline, &req) + }), + PagestreamFeMessage::Nblocks(req) => SMGR_QUERY_TIME + .with_label_values(&["get_rel_size"]) + .observe_closure_duration(|| { + self.handle_get_nblocks_request(&*timeline, &req) + }), + PagestreamFeMessage::GetPage(req) => SMGR_QUERY_TIME + .with_label_values(&["get_page_at_lsn"]) + .observe_closure_duration(|| { + self.handle_get_page_at_lsn_request(&*timeline, &req) + }), + }; - let response = response.unwrap_or_else(|e| { - // print the all details to the log with {:#}, but for the client the - // error message is enough - error!("error reading relation or page version: {:#}", e); - PagestreamBeMessage::Error(PagestreamErrorResponse { - message: e.to_string(), - }) - }); + let response = response.unwrap_or_else(|e| { + // print the all details to the log with {:#}, but for the client the + // error message is enough + error!("error reading relation or page version: {:#}", e); + PagestreamBeMessage::Error(PagestreamErrorResponse { + message: e.to_string(), + }) + }); - pgb.write_message(&BeMessage::CopyData(&response.serialize()))?; + pgb.write_message(&BeMessage::CopyData(&response.serialize()))?; + } else { + break; + } + } + Err(e) => { + if !is_socket_read_timed_out(&e) { + return Err(e); + } + } + } } - Ok(()) } diff --git a/pageserver/src/relish_storage/synced_storage.rs b/pageserver/src/relish_storage/synced_storage.rs index f51e976a83..e9ac20ff8c 100644 --- a/pageserver/src/relish_storage/synced_storage.rs +++ b/pageserver/src/relish_storage/synced_storage.rs @@ -1,6 +1,7 @@ use std::time::Duration; use std::{collections::BinaryHeap, sync::Mutex, thread}; +use crate::tenant_mgr; use crate::{relish_storage::RelishStorage, PageServerConf}; lazy_static::lazy_static! { @@ -31,22 +32,26 @@ pub fn run_storage_sync_thread< let handle = thread::Builder::new() .name("Queue based relish storage sync".to_string()) - .spawn(move || loop { - let mut queue_accessor = UPLOAD_QUEUE.lock().unwrap(); - log::debug!("Upload queue length: {}", queue_accessor.len()); - let next_task = queue_accessor.pop(); - drop(queue_accessor); - match next_task { - Some(task) => runtime.block_on(async { - // suppress warnings - let _ = (config, task, &relish_storage, max_concurrent_sync); - todo!("omitted for brevity") - }), - None => { - thread::sleep(Duration::from_secs(1)); - continue; + .spawn(move || { + while !tenant_mgr::shutdown_requested() { + let mut queue_accessor = UPLOAD_QUEUE.lock().unwrap(); + log::debug!("Upload queue length: {}", queue_accessor.len()); + let next_task = queue_accessor.pop(); + drop(queue_accessor); + match next_task { + Some(task) => runtime.block_on(async { + // suppress warnings + let _ = (config, task, &relish_storage, max_concurrent_sync); + todo!("omitted for brevity") + }), + None => { + thread::sleep(Duration::from_secs(1)); + continue; + } } } + log::debug!("Queue based relish storage sync thread shut down"); + Ok(()) })?; Ok(Some(handle)) } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index e8a7952d77..f1082513bc 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -13,6 +13,8 @@ use zenith_utils::zid::ZTimelineId; /// A repository corresponds to one .zenith directory. One repository holds multiple /// timelines, forked off from the same initial call to 'initdb'. pub trait Repository: Send + Sync { + fn shutdown(&self) -> Result<()>; + /// Get Timeline handle for given zenith timeline ID. fn get_timeline(&self, timelineid: ZTimelineId) -> Result>; diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 4eb46ba71a..1712cf1b8a 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -8,12 +8,14 @@ use crate::walredo::PostgresRedoManager; use crate::PageServerConf; use anyhow::{anyhow, bail, Context, Result}; use lazy_static::lazy_static; -use log::info; +use log::{debug, info}; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fs; use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; +use std::thread::JoinHandle; use zenith_utils::zid::{ZTenantId, ZTimelineId}; lazy_static! { @@ -24,6 +26,19 @@ lazy_static! { fn access_repository() -> MutexGuard<'static, HashMap>> { REPOSITORY.lock().unwrap() } +struct TenantHandleEntry { + checkpointer_handle: Option>, + gc_handle: Option>, +} + +// Logically these handles belong to Repository, +// but it's just simpler to store them separately +lazy_static! { + static ref TENANT_HANDLES: Mutex> = + Mutex::new(HashMap::new()); +} + +static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false); pub fn init(conf: &'static PageServerConf) { let mut m = access_repository(); @@ -47,8 +62,18 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Arc bool { + SHUTDOWN_REQUESTED.load(Ordering::Relaxed) +} + +pub fn stop_tenant_threads(tenantid: ZTenantId) { + let mut handles = TENANT_HANDLES.lock().unwrap(); + if let Some(h) = handles.get_mut(&tenantid) { + h.checkpointer_handle.take().map(JoinHandle::join); + debug!("checkpointer for tenant {} has stopped", tenantid); + h.gc_handle.take().map(JoinHandle::join); + debug!("gc for tenant {} has stopped", tenantid); + } +} + +pub fn shutdown_all_tenants() -> Result<()> { + SHUTDOWN_REQUESTED.swap(true, Ordering::Relaxed); + + let tenants = list_tenants()?; + for tenantid in tenants { + stop_tenant_threads(tenantid); + let repo = get_repository_for_tenant(tenantid)?; + debug!("shutdown tenant {}", tenantid); + repo.shutdown()?; + } + + Ok(()) +} + pub fn create_repository_for_tenant( conf: &'static PageServerConf, tenantid: ZTenantId, @@ -115,3 +169,14 @@ pub fn get_timeline_for_tenant( .get_timeline(timelineid) .with_context(|| format!("cannot fetch timeline {}", timelineid)) } + +fn list_tenants() -> Result> { + let o = &mut REPOSITORY.lock().unwrap(); + + o.iter() + .map(|tenant| { + let (tenantid, _) = tenant; + Ok(*tenantid) + }) + .collect() +} diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 0fd9dfdf27..5602d8e1db 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -24,6 +24,7 @@ use std::str::FromStr; use std::sync::Mutex; use std::thread; use std::thread::sleep; +use std::thread::JoinHandle; use std::thread_local; use std::time::{Duration, SystemTime}; use tracing::*; @@ -36,6 +37,7 @@ use zenith_utils::zid::ZTimelineId; // struct WalReceiverEntry { wal_producer_connstr: String, + wal_receiver_handle: Option>, } lazy_static! { @@ -50,6 +52,19 @@ thread_local! { pub(crate) static IS_WAL_RECEIVER: Cell = Cell::new(false); } +// Wait for walreceiver to stop +// Now it stops when pageserver shutdown is requested. +// In future we can make this more granular and send shutdown signals +// per tenant/timeline to cancel inactive walreceivers. +// TODO deal with blocking pg connections +pub fn stop_wal_receiver(timelineid: ZTimelineId) { + let mut receivers = WAL_RECEIVERS.lock().unwrap(); + if let Some(r) = receivers.get_mut(&timelineid) { + r.wal_receiver_handle.take(); + // r.wal_receiver_handle.take().map(JoinHandle::join); + } +} + // Launch a new WAL receiver, or tell one that's running about change in connection string pub fn launch_wal_receiver( conf: &'static PageServerConf, @@ -64,19 +79,19 @@ pub fn launch_wal_receiver( receiver.wal_producer_connstr = wal_producer_connstr.into(); } None => { - let receiver = WalReceiverEntry { - wal_producer_connstr: wal_producer_connstr.into(), - }; - receivers.insert(timelineid, receiver); - - // Also launch a new thread to handle this connection - let _walreceiver_thread = thread::Builder::new() + let wal_receiver_handle = thread::Builder::new() .name("WAL receiver thread".into()) .spawn(move || { IS_WAL_RECEIVER.with(|c| c.set(true)); thread_main(conf, timelineid, tenantid); }) .unwrap(); + + let receiver = WalReceiverEntry { + wal_producer_connstr: wal_producer_connstr.into(), + wal_receiver_handle: Some(wal_receiver_handle), + }; + receivers.insert(timelineid, receiver); } }; } @@ -103,7 +118,7 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: // Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server, // and start streaming WAL from it. If the connection is lost, keep retrying. // - loop { + while !tenant_mgr::shutdown_requested() { // Look up the current WAL producer address let wal_producer_connstr = get_wal_producer_connstr(timelineid); @@ -117,6 +132,7 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: sleep(Duration::from_secs(1)); } } + debug!("WAL streaming shut down"); } fn walreceiver_main( @@ -273,6 +289,11 @@ fn walreceiver_main( physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?; } + + if tenant_mgr::shutdown_requested() { + debug!("stop walreceiver because pageserver shutdown is requested"); + break; + } } Ok(()) } diff --git a/proxy/src/mgmt.rs b/proxy/src/mgmt.rs index 2b3259f8ec..1f33b68a1c 100644 --- a/proxy/src/mgmt.rs +++ b/proxy/src/mgmt.rs @@ -34,7 +34,7 @@ pub fn thread_main(state: &'static ProxyState, listener: TcpListener) -> anyhow: pub fn mgmt_conn_main(state: &'static ProxyState, socket: TcpStream) -> anyhow::Result<()> { let mut conn_handler = MgmtHandler { state }; - let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None)?; + let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None, true)?; pgbackend.run(&mut conn_handler) } diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index f246d4470a..61a742cf38 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -64,6 +64,7 @@ pub fn proxy_conn_main( socket, postgres_backend::AuthType::MD5, state.conf.ssl_config.clone(), + false, )?, md5_salt: [0u8; 4], psql_session_id: "".into(), diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index c0ce57801d..597c39db0b 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -375,6 +375,7 @@ class ZenithPageserver(PgProtocol): Start the page server. Returns self. """ + assert self.running == False self.zenith_cli.run(['start']) self.running = True @@ -382,14 +383,18 @@ class ZenithPageserver(PgProtocol): self.initial_tenant = self.zenith_cli.run(['tenant', 'list']).stdout.strip() return self - def stop(self) -> 'ZenithPageserver': + def stop(self, immediate=False) -> 'ZenithPageserver': """ Stop the page server. Returns self. """ + cmd = ['stop'] + if immediate: + cmd.append('immediate') + print(cmd) if self.running: - self.zenith_cli.run(['stop']) + self.zenith_cli.run(cmd) self.running = False return self @@ -398,7 +403,7 @@ class ZenithPageserver(PgProtocol): return self def __exit__(self, exc_type, exc, tb): - self.stop() + self.stop(True) @cached_property def auth_keys(self) -> AuthKeys: @@ -444,7 +449,7 @@ def pageserver(zenith_cli: ZenithCli, repo_dir: str, pageserver_port: Pageserver # After the yield comes any cleanup code we need. print('Starting pageserver cleanup') - ps.stop() + ps.stop(True) class PgBin: """ A helper class for executing postgres binaries """ diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index 7730a058ed..a2c3f8d02b 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -41,7 +41,7 @@ fn handle_socket(socket: TcpStream, conf: WalAcceptorConf) -> Result<()> { socket.set_nodelay(true)?; let mut conn_handler = SendWalHandler::new(conf); - let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None)?; + let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None, false)?; // libpq replication protocol between wal_acceptor and replicas/pagers pgbackend.run(&mut conn_handler)?; diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 1c04e803e6..e86ce10041 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -88,7 +88,12 @@ fn main() -> Result<()> { ) .subcommand(SubCommand::with_name("status")) .subcommand(SubCommand::with_name("start").about("Start local pageserver")) - .subcommand(SubCommand::with_name("stop").about("Stop local pageserver")) + .subcommand(SubCommand::with_name("stop").about("Stop local pageserver") + .arg(Arg::with_name("immediate") + .help("Don't flush repository data at shutdown") + .required(false) + ) + ) .subcommand(SubCommand::with_name("restart").about("Restart local pageserver")) .subcommand( SubCommand::with_name("pg") @@ -196,10 +201,12 @@ fn main() -> Result<()> { } } - ("stop", Some(_sub_m)) => { + ("stop", Some(stop_match)) => { let pageserver = PageServerNode::from_env(&env); - if let Err(e) = pageserver.stop() { + let immediate = stop_match.is_present("immediate"); + + if let Err(e) = pageserver.stop(immediate) { eprintln!("pageserver stop failed: {}", e); exit(1); } @@ -208,7 +215,8 @@ fn main() -> Result<()> { ("restart", Some(_sub_m)) => { let pageserver = PageServerNode::from_env(&env); - if let Err(e) = pageserver.stop() { + //TODO what shutdown strategy should we use here? + if let Err(e) = pageserver.stop(false) { eprintln!("pageserver stop failed: {}", e); exit(1); } diff --git a/zenith_utils/src/http/endpoint.rs b/zenith_utils/src/http/endpoint.rs index 3c5b53b77a..30e7bfc921 100644 --- a/zenith_utils/src/http/endpoint.rs +++ b/zenith_utils/src/http/endpoint.rs @@ -12,8 +12,17 @@ use std::net::TcpListener; use zenith_metrics::{new_common_metric_name, register_int_counter, IntCounter}; use zenith_metrics::{Encoder, TextEncoder}; +use std::sync::Mutex; +use tokio::sync::oneshot::Sender; + use super::error::ApiError; +lazy_static! { + /// Channel used to send shutdown signal - wrapped in an Option to allow + /// it to be taken by value (since oneshot channels consume themselves on send) + static ref SHUTDOWN_SENDER: Mutex>> = Mutex::new(None); +} + lazy_static! { static ref SERVE_METRICS_COUNT: IntCounter = register_int_counter!( new_common_metric_name("serve_metrics_count"), @@ -143,11 +152,18 @@ pub fn check_permission(req: &Request, tenantid: Option) -> Res } } +// Send shutdown signal +pub fn shutdown() { + if let Some(tx) = SHUTDOWN_SENDER.lock().unwrap().take() { + let _ = tx.send(()); + } +} + pub fn serve_thread_main( router_builder: RouterBuilder, listener: TcpListener, ) -> anyhow::Result<()> { - log::info!("Starting a http endoint at {}", listener.local_addr()?); + log::info!("Starting a http endpoint at {}", listener.local_addr()?); // Create a Service from the router above to handle incoming requests. let service = RouterService::new(router_builder.build().map_err(|err| anyhow!(err))?).unwrap(); @@ -159,7 +175,14 @@ pub fn serve_thread_main( let _guard = runtime.enter(); - let server = Server::from_tcp(listener)?.serve(service); + let (send, recv) = tokio::sync::oneshot::channel::<()>(); + *SHUTDOWN_SENDER.lock().unwrap() = Some(send); + + let server = Server::from_tcp(listener)? + .serve(service) + .with_graceful_shutdown(async { + recv.await.ok(); + }); runtime.block_on(server)?; diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index b2e0a1a525..02eb330f3b 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -13,7 +13,11 @@ use serde::{Deserialize, Serialize}; use std::io::{self, Write}; use std::net::{Shutdown, SocketAddr, TcpStream}; use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::time::Duration; + +static PGBACKEND_SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false); pub trait Handler { /// Handle single query. @@ -135,13 +139,32 @@ pub fn query_from_cstring(query_string: Bytes) -> Vec { query_string } +// Helper function for socket read loops +pub fn is_socket_read_timed_out(error: &anyhow::Error) -> bool { + for cause in error.chain() { + if let Some(io_error) = cause.downcast_ref::() { + if io_error.kind() == std::io::ErrorKind::WouldBlock { + return true; + } + } + } + false +} + impl PostgresBackend { pub fn new( socket: TcpStream, auth_type: AuthType, tls_config: Option>, + set_read_timeout: bool, ) -> io::Result { let peer_addr = socket.peer_addr()?; + if set_read_timeout { + socket + .set_read_timeout(Some(Duration::from_secs(5))) + .unwrap(); + } + Ok(Self { stream: Some(Stream::Bidirectional(BidiStream::from_tcp(socket))), buf_out: BytesMut::with_capacity(10 * 1024), @@ -229,12 +252,26 @@ impl PostgresBackend { let mut unnamed_query_string = Bytes::new(); - while let Some(msg) = self.read_message()? { - trace!("got message {:?}", msg); + while !PGBACKEND_SHUTDOWN_REQUESTED.load(Ordering::Relaxed) { + match self.read_message() { + Ok(message) => { + if let Some(msg) = message { + trace!("got message {:?}", msg); - match self.process_message(handler, msg, &mut unnamed_query_string)? { - ProcessMsgResult::Continue => continue, - ProcessMsgResult::Break => break, + match self.process_message(handler, msg, &mut unnamed_query_string)? { + ProcessMsgResult::Continue => continue, + ProcessMsgResult::Break => break, + } + } else { + break; + } + } + Err(e) => { + // If it is a timeout error, continue the loop + if !is_socket_read_timed_out(&e) { + return Err(e); + } + } } } @@ -427,3 +464,8 @@ impl PostgresBackend { Ok(ProcessMsgResult::Continue) } } + +// Set the flag to inform connections to cancel +pub fn set_pgbackend_shutdown_requested() { + PGBACKEND_SHUTDOWN_REQUESTED.swap(true, Ordering::Relaxed); +} diff --git a/zenith_utils/tests/ssl_test.rs b/zenith_utils/tests/ssl_test.rs index ba0f63d6ec..2a597700ae 100644 --- a/zenith_utils/tests/ssl_test.rs +++ b/zenith_utils/tests/ssl_test.rs @@ -110,7 +110,7 @@ fn ssl() { .unwrap(); let tls_config = Some(Arc::new(cfg)); - let pgb = PostgresBackend::new(server_sock, AuthType::Trust, tls_config).unwrap(); + let pgb = PostgresBackend::new(server_sock, AuthType::Trust, tls_config, true).unwrap(); pgb.run(&mut handler).unwrap(); assert!(handler.got_query); @@ -150,7 +150,7 @@ fn no_ssl() { let mut handler = TestHandler; - let pgb = PostgresBackend::new(server_sock, AuthType::Trust, None).unwrap(); + let pgb = PostgresBackend::new(server_sock, AuthType::Trust, None, true).unwrap(); pgb.run(&mut handler).unwrap(); client_jh.join().unwrap(); @@ -214,7 +214,7 @@ fn server_forces_ssl() { .unwrap(); let tls_config = Some(Arc::new(cfg)); - let pgb = PostgresBackend::new(server_sock, AuthType::Trust, tls_config).unwrap(); + let pgb = PostgresBackend::new(server_sock, AuthType::Trust, tls_config, true).unwrap(); let res = pgb.run(&mut handler).unwrap_err(); assert_eq!("client did not connect with TLS", format!("{}", res));