mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
Use threadlocal for walreceiver check (#692)
This commit is contained in:
@@ -36,6 +36,7 @@ use crate::layered_repository::inmemory_layer::FreezeLayers;
|
||||
use crate::relish::*;
|
||||
use crate::relish_storage::storage_uploader::QueueBasedRelishUploader;
|
||||
use crate::repository::{GcResult, Repository, Timeline, WALRecord};
|
||||
use crate::walreceiver::IS_WAL_RECEIVER;
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
@@ -643,8 +644,11 @@ impl Timeline for LayeredTimeline {
|
||||
/// Wait until WAL has been received up to the given LSN.
|
||||
fn wait_lsn(&self, lsn: Lsn) -> Result<()> {
|
||||
// This should never be called from the WAL receiver thread, because that could lead
|
||||
// to a deadlock. FIXME: Is there a less hacky way to check that?
|
||||
assert_ne!(thread::current().name(), Some("WAL receiver thread"));
|
||||
// to a deadlock.
|
||||
assert!(
|
||||
!IS_WAL_RECEIVER.with(|c| c.get()),
|
||||
"wait_lsn called by WAL receiver thread"
|
||||
);
|
||||
|
||||
self.last_record_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
|
||||
@@ -20,6 +20,7 @@ use postgres_ffi::xlog_utils::*;
|
||||
use postgres_ffi::*;
|
||||
use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use postgres_types::PgLsn;
|
||||
use std::cell::Cell;
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
@@ -27,6 +28,7 @@ use std::str::FromStr;
|
||||
use std::sync::Mutex;
|
||||
use std::thread;
|
||||
use std::thread::sleep;
|
||||
use std::thread_local;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::zid::ZTenantId;
|
||||
@@ -44,6 +46,13 @@ lazy_static! {
|
||||
Mutex::new(HashMap::new());
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
// Boolean that is true only for WAL receiver threads
|
||||
//
|
||||
// This is used in `wait_lsn` to guard against usage that might lead to a deadlock.
|
||||
pub(crate) static IS_WAL_RECEIVER: Cell<bool> = Cell::new(false);
|
||||
}
|
||||
|
||||
// Launch a new WAL receiver, or tell one that's running about change in connection string
|
||||
pub fn launch_wal_receiver(
|
||||
conf: &'static PageServerConf,
|
||||
@@ -64,12 +73,10 @@ pub fn launch_wal_receiver(
|
||||
receivers.insert(timelineid, receiver);
|
||||
|
||||
// Also launch a new thread to handle this connection
|
||||
//
|
||||
// NOTE: This thread name is checked in the assertion in wait_lsn. If you change
|
||||
// this, make sure you update the assertion too.
|
||||
let _walreceiver_thread = thread::Builder::new()
|
||||
.name("WAL receiver thread".into())
|
||||
.spawn(move || {
|
||||
IS_WAL_RECEIVER.with(|c| c.set(true));
|
||||
thread_main(conf, timelineid, tenantid);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user