Fix wal receiver shutdown

This patch allows to shutdown wal receiver when there are no messages
and wal receiver is blocked inside tokio-postgres. In this case it
cannot check the shutdown flag.

This patch switches to use async interface of tokio-postgres directly
without sync wrappers. It opens the possibility to use tokio::select!
between the phsycal_stream.next() and a shutdown channel readiness to
interrupt replication process.

Also this allows to shutdown only particular wal receiver without
using global shutdown_requested flag.
This commit is contained in:
Dmitry Rodionov
2021-12-28 01:07:10 +03:00
committed by Dmitry Rodionov
parent 70778058d9
commit c910132d4b
3 changed files with 83 additions and 24 deletions

6
Cargo.lock generated
View File

@@ -1197,6 +1197,8 @@ dependencies = [
"tempfile",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-stream",
"toml_edit",
"tracing",
"tracing-futures",
@@ -2099,9 +2101,9 @@ dependencies = [
[[package]]
name = "tokio-stream"
version = "0.1.7"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2f3f698253f03119ac0102beaa64f67a67e08074d03a22d18784104543727f"
checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3"
dependencies = [
"futures-core",
"pin-project-lite",

View File

@@ -21,6 +21,8 @@ tokio = { version = "1.11", features = ["process", "sync", "macros", "fs", "rt",
postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" }
tokio-stream = "0.1.8"
routerify = "2"
anyhow = "1.0"
crc32c = "0.6.0"

View File

@@ -13,9 +13,6 @@ use crate::walingest::WalIngest;
use anyhow::{bail, Context, Error, Result};
use lazy_static::lazy_static;
use parking_lot::Mutex;
use postgres::fallible_iterator::FallibleIterator;
use postgres::replication::ReplicationIter;
use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow};
use postgres_ffi::waldecoder::*;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
@@ -26,6 +23,11 @@ use std::thread;
use std::thread::JoinHandle;
use std::thread_local;
use std::time::SystemTime;
use tokio::pin;
use tokio::sync::oneshot;
use tokio_postgres::replication::ReplicationStream;
use tokio_postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow};
use tokio_stream::StreamExt;
use tracing::*;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::ZTenantId;
@@ -37,6 +39,7 @@ use zenith_utils::zid::ZTimelineId;
struct WalReceiverEntry {
wal_producer_connstr: String,
wal_receiver_handle: Option<JoinHandle<()>>,
wal_receiver_interrupt_sender: Option<oneshot::Sender<()>>,
tenantid: ZTenantId,
}
@@ -59,9 +62,25 @@ thread_local! {
// TODO deal with blocking pg connections
pub fn stop_wal_receiver(timelineid: ZTimelineId) {
let mut receivers = WAL_RECEIVERS.lock();
if let Some(r) = receivers.get_mut(&timelineid) {
r.wal_receiver_handle.take();
// r.wal_receiver_handle.take().map(JoinHandle::join);
match r.wal_receiver_interrupt_sender.take() {
Some(s) => {
if s.send(()).is_err() {
warn!("wal receiver interrupt signal already sent");
}
}
None => {
warn!("wal_receiver_interrupt_sender is missing, wal recever shouldn't be running")
}
}
info!("waiting for wal receiver to stop");
let handle = r.wal_receiver_handle.take();
// do not hold the lock while joining the handle (deadlock is possible otherwise)
drop(receivers);
// there is no timeout or try_join option available so in case of a bug this can hang forever
handle.map(JoinHandle::join);
}
}
@@ -96,17 +115,20 @@ pub fn launch_wal_receiver(
receiver.wal_producer_connstr = wal_producer_connstr.into();
}
None => {
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let wal_receiver_handle = thread::Builder::new()
.name("WAL receiver thread".into())
.spawn(move || {
IS_WAL_RECEIVER.with(|c| c.set(true));
thread_main(conf, timelineid, tenantid);
thread_main(conf, timelineid, tenantid, rx);
})
.unwrap();
let receiver = WalReceiverEntry {
wal_producer_connstr: wal_producer_connstr.into(),
wal_receiver_handle: Some(wal_receiver_handle),
wal_receiver_interrupt_sender: Some(tx),
tenantid,
};
receivers.insert(timelineid, receiver);
@@ -132,7 +154,12 @@ fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String {
//
// This is the entry point for the WAL receiver thread.
//
fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: ZTenantId) {
fn thread_main(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
interrupt_receiver: oneshot::Receiver<()>,
) {
let _enter = info_span!("WAL receiver", timeline = %timelineid, tenant = %tenantid).entered();
info!("WAL receiver thread started");
@@ -141,7 +168,13 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid:
// Make a connection to the WAL safekeeper, or directly to the primary PostgreSQL server,
// and start streaming WAL from it.
let res = walreceiver_main(conf, timelineid, &wal_producer_connstr, tenantid);
let res = walreceiver_main(
conf,
tenantid,
timelineid,
&wal_producer_connstr,
interrupt_receiver,
);
// TODO cleanup info messages
if let Err(e) = res {
@@ -160,9 +193,10 @@ fn thread_main(conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid:
fn walreceiver_main(
_conf: &PageServerConf,
tenantid: ZTenantId,
timelineid: ZTimelineId,
wal_producer_connstr: &str,
tenantid: ZTenantId,
mut interrupt_receiver: oneshot::Receiver<()>,
) -> Result<(), Error> {
// Connect to the database in replication mode.
info!("connecting to {:?}", wal_producer_connstr);
@@ -171,7 +205,19 @@ fn walreceiver_main(
wal_producer_connstr
);
let mut rclient = Client::connect(&connect_cfg, NoTls)?;
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let (mut replication_client, connection) =
runtime.block_on(tokio_postgres::connect(&connect_cfg, NoTls))?;
// This is from tokio-postgres docs, but it is a bit weird in our case because we extensively use block_on
runtime.spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
info!("connected!");
// Immediately increment the gauge, then create a job to decrement it on thread exit.
@@ -183,7 +229,7 @@ fn walreceiver_main(
gauge.dec();
}
let identify = identify_system(&mut rclient)?;
let identify = runtime.block_on(identify_system(&mut replication_client))?;
info!("{:?}", identify);
let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
let mut caught_up = false;
@@ -218,14 +264,24 @@ fn walreceiver_main(
let query = format!("START_REPLICATION PHYSICAL {}", startpoint);
let copy_stream = rclient.copy_both_simple(&query)?;
let mut physical_stream = ReplicationIter::new(copy_stream);
let copy_stream = runtime.block_on(replication_client.copy_both_simple(&query))?;
let physical_stream = ReplicationStream::new(copy_stream);
pin!(physical_stream);
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut walingest = WalIngest::new(&*timeline, startpoint)?;
while let Some(replication_message) = physical_stream.next()? {
while let Some(replication_message) = runtime.block_on(async {
tokio::select! {
replication_message = physical_stream.next() => replication_message,
_ = &mut interrupt_receiver => {
info!("walreceiver interrupted");
None
}
}
}) {
let replication_message = replication_message?;
let status_update = match replication_message {
ReplicationMessage::XLogData(xlog_data) => {
// Pass the WAL data to the decoder, and see if we can decode
@@ -299,12 +355,11 @@ fn walreceiver_main(
let apply_lsn = PgLsn::from(u64::from(timeline_synced_disk_consistent_lsn));
let ts = SystemTime::now();
const NO_REPLY: u8 = 0;
physical_stream.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)?;
}
if tenant_mgr::shutdown_requested() {
debug!("stop walreceiver because pageserver shutdown is requested");
break;
runtime.block_on(
physical_stream
.as_mut()
.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY),
)?;
}
}
@@ -334,9 +389,9 @@ pub struct IdentifySystem {
pub struct IdentifyError;
/// Run the postgres `IDENTIFY_SYSTEM` command
pub fn identify_system(client: &mut Client) -> Result<IdentifySystem, Error> {
pub async fn identify_system(client: &mut Client) -> Result<IdentifySystem, Error> {
let query_str = "IDENTIFY_SYSTEM";
let response = client.simple_query(query_str)?;
let response = client.simple_query(query_str).await?;
// get(N) from row, then parse it as some destination type.
fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>