From c910132d4b22830e443cbdd081cc9778832185df Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Tue, 28 Dec 2021 01:07:10 +0300 Subject: [PATCH] Fix wal receiver shutdown This patch allows to shutdown wal receiver when there are no messages and wal receiver is blocked inside tokio-postgres. In this case it cannot check the shutdown flag. This patch switches to use async interface of tokio-postgres directly without sync wrappers. It opens the possibility to use tokio::select! between the phsycal_stream.next() and a shutdown channel readiness to interrupt replication process. Also this allows to shutdown only particular wal receiver without using global shutdown_requested flag. --- Cargo.lock | 6 ++- pageserver/Cargo.toml | 2 + pageserver/src/walreceiver.rs | 99 +++++++++++++++++++++++++++-------- 3 files changed, 83 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b5f49f9207..5fd6076af2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,6 +1197,8 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-postgres", + "tokio-stream", "toml_edit", "tracing", "tracing-futures", @@ -2099,9 +2101,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b2f3f698253f03119ac0102beaa64f67a67e08074d03a22d18784104543727f" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" dependencies = [ "futures-core", "pin-project-lite", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 70aeb8b472..4782884363 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -21,6 +21,8 @@ tokio = { version = "1.11", features = ["process", "sync", "macros", "fs", "rt", postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } +tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } +tokio-stream = "0.1.8" routerify = "2" anyhow = "1.0" crc32c = "0.6.0" diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 8e7106355b..ef63d357f8 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -13,9 +13,6 @@ use crate::walingest::WalIngest; use anyhow::{bail, Context, Error, Result}; use lazy_static::lazy_static; use parking_lot::Mutex; -use postgres::fallible_iterator::FallibleIterator; -use postgres::replication::ReplicationIter; -use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow}; use postgres_ffi::waldecoder::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; @@ -26,6 +23,11 @@ use std::thread; use std::thread::JoinHandle; use std::thread_local; use std::time::SystemTime; +use tokio::pin; +use tokio::sync::oneshot; +use tokio_postgres::replication::ReplicationStream; +use tokio_postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow}; +use tokio_stream::StreamExt; use tracing::*; use zenith_utils::lsn::Lsn; use zenith_utils::zid::ZTenantId; @@ -37,6 +39,7 @@ use zenith_utils::zid::ZTimelineId; struct WalReceiverEntry { wal_producer_connstr: String, wal_receiver_handle: Option>, + wal_receiver_interrupt_sender: Option>, tenantid: ZTenantId, } @@ -59,9 +62,25 @@ thread_local! { // TODO deal with blocking pg connections pub fn stop_wal_receiver(timelineid: ZTimelineId) { let mut receivers = WAL_RECEIVERS.lock(); + if let Some(r) = receivers.get_mut(&timelineid) { - r.wal_receiver_handle.take(); - // r.wal_receiver_handle.take().map(JoinHandle::join); + match r.wal_receiver_interrupt_sender.take() { + Some(s) => { + if s.send(()).is_err() { + warn!("wal receiver interrupt signal already sent"); + } + } + None => { + warn!("wal_receiver_interrupt_sender is missing, wal recever shouldn't be running") + } + } + + info!("waiting for wal receiver to stop"); + let handle = r.wal_receiver_handle.take(); + // do not hold the lock while joining the handle (deadlock is possible otherwise) + drop(receivers); + // there is no timeout or try_join option available so in case of a bug this can hang forever + handle.map(JoinHandle::join); } } @@ -96,17 +115,20 @@ pub fn launch_wal_receiver( receiver.wal_producer_connstr = wal_producer_connstr.into(); } None => { + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + let wal_receiver_handle = thread::Builder::new() .name("WAL receiver thread".into()) .spawn(move || { IS_WAL_RECEIVER.with(|c| c.set(true)); - thread_main(conf, timelineid, tenantid); + thread_main(conf, timelineid, tenantid, rx); }) .unwrap(); let receiver = WalReceiverEntry { wal_producer_connstr: wal_producer_connstr.into(), wal_receiver_handle: Some(wal_receiver_handle), + wal_receiver_interrupt_sender: Some(tx), tenantid, }; receivers.insert(timelineid, receiver); @@ -132,7 +154,12 @@ fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String { // // This is the entry point for the WAL receiver thread. // -fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: ZTenantId) { +fn thread_main( + conf: &'static PageServerConf, + timelineid: ZTimelineId, + tenantid: ZTenantId, + interrupt_receiver: oneshot::Receiver<()>, +) { let _enter = info_span!("WAL receiver", timeline = %timelineid, tenant = %tenantid).entered(); info!("WAL receiver thread started"); @@ -141,7 +168,13 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: // Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server, // and start streaming WAL from it. - let res = walreceiver_main(conf, timelineid, &wal_producer_connstr, tenantid); + let res = walreceiver_main( + conf, + tenantid, + timelineid, + &wal_producer_connstr, + interrupt_receiver, + ); // TODO cleanup info messages if let Err(e) = res { @@ -160,9 +193,10 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: fn walreceiver_main( _conf: &PageServerConf, + tenantid: ZTenantId, timelineid: ZTimelineId, wal_producer_connstr: &str, - tenantid: ZTenantId, + mut interrupt_receiver: oneshot::Receiver<()>, ) -> Result<(), Error> { // Connect to the database in replication mode. info!("connecting to {:?}", wal_producer_connstr); @@ -171,7 +205,19 @@ fn walreceiver_main( wal_producer_connstr ); - let mut rclient = Client::connect(&connect_cfg, NoTls)?; + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + let (mut replication_client, connection) = + runtime.block_on(tokio_postgres::connect(&connect_cfg, NoTls))?; + // This is from tokio-postgres docs, but it is a bit weird in our case because we extensively use block_on + runtime.spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + info!("connected!"); // Immediately increment the gauge, then create a job to decrement it on thread exit. @@ -183,7 +229,7 @@ fn walreceiver_main( gauge.dec(); } - let identify = identify_system(&mut rclient)?; + let identify = runtime.block_on(identify_system(&mut replication_client))?; info!("{:?}", identify); let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); let mut caught_up = false; @@ -218,14 +264,24 @@ fn walreceiver_main( let query = format!("START_REPLICATION PHYSICAL {}", startpoint); - let copy_stream = rclient.copy_both_simple(&query)?; - let mut physical_stream = ReplicationIter::new(copy_stream); + let copy_stream = runtime.block_on(replication_client.copy_both_simple(&query))?; + let physical_stream = ReplicationStream::new(copy_stream); + pin!(physical_stream); let mut waldecoder = WalStreamDecoder::new(startpoint); let mut walingest = WalIngest::new(&*timeline, startpoint)?; - while let Some(replication_message) = physical_stream.next()? { + while let Some(replication_message) = runtime.block_on(async { + tokio::select! { + replication_message = physical_stream.next() => replication_message, + _ = &mut interrupt_receiver => { + info!("walreceiver interrupted"); + None + } + } + }) { + let replication_message = replication_message?; let status_update = match replication_message { ReplicationMessage::XLogData(xlog_data) => { // Pass the WAL data to the decoder, and see if we can decode @@ -299,12 +355,11 @@ fn walreceiver_main( let apply_lsn = PgLsn::from(u64::from(timeline_synced_disk_consistent_lsn)); let ts = SystemTime::now(); const NO_REPLY: u8 = 0; - physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?; - } - - if tenant_mgr::shutdown_requested() { - debug!("stop walreceiver because pageserver shutdown is requested"); - break; + runtime.block_on( + physical_stream + .as_mut() + .standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY), + )?; } } @@ -334,9 +389,9 @@ pub struct IdentifySystem { pub struct IdentifyError; /// Run the postgres `IDENTIFY_SYSTEM` command -pub fn identify_system(client: &mut Client) -> Result { +pub async fn identify_system(client: &mut Client) -> Result { let query_str = "IDENTIFY_SYSTEM"; - let response = client.simple_query(query_str)?; + let response = client.simple_query(query_str).await?; // get(N) from row, then parse it as some destination type. fn get_parse(row: &SimpleQueryRow, idx: usize) -> Result