From b807570f46482ee011dcbb395e96ed2bab947353 Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Thu, 23 Dec 2021 14:25:44 -0500 Subject: [PATCH] Use parking_lot::Mutex instead of std::Mutex in walreceiver (#1045) --- Cargo.lock | 1 + pageserver/Cargo.toml | 1 + pageserver/src/walreceiver.rs | 10 +++++----- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d2524fdc9e..b5f49f9207 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1180,6 +1180,7 @@ dependencies = [ "log", "nix", "once_cell", + "parking_lot", "postgres", "postgres-protocol", "postgres-types", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index ea3e630ccc..70aeb8b472 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -40,6 +40,7 @@ signal-hook = "0.3.10" url = "2" nix = "0.23" once_cell = "1.8.0" +parking_lot = "0.11.2" rust-s3 = { version = "0.28", default-features = false, features = ["no-verify-ssl", "tokio-rustls-tls"] } async-compression = {version = "0.3", features = ["zstd", "tokio"]} diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index c5aef7be60..4c8157825d 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -12,6 +12,7 @@ use crate::tenant_threads; use crate::walingest::WalIngest; use anyhow::{bail, Context, Error, Result}; use lazy_static::lazy_static; +use parking_lot::Mutex; use postgres::fallible_iterator::FallibleIterator; use postgres::replication::ReplicationIter; use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow}; @@ -21,7 +22,6 @@ use postgres_types::PgLsn; use std::cell::Cell; use std::collections::HashMap; use std::str::FromStr; -use std::sync::Mutex; use std::thread; use std::thread::JoinHandle; use std::thread_local; @@ -58,7 +58,7 @@ thread_local! { // per tenant/timeline to cancel inactive walreceivers. // TODO deal with blocking pg connections pub fn stop_wal_receiver(timelineid: ZTimelineId) { - let mut receivers = WAL_RECEIVERS.lock().unwrap(); + let mut receivers = WAL_RECEIVERS.lock(); if let Some(r) = receivers.get_mut(&timelineid) { r.wal_receiver_handle.take(); // r.wal_receiver_handle.take().map(JoinHandle::join); @@ -66,7 +66,7 @@ pub fn stop_wal_receiver(timelineid: ZTimelineId) { } pub fn drop_wal_receiver(timelineid: ZTimelineId, tenantid: ZTenantId) { - let mut receivers = WAL_RECEIVERS.lock().unwrap(); + let mut receivers = WAL_RECEIVERS.lock(); receivers.remove(&timelineid); // Check if it was the last walreceiver of the tenant. @@ -89,7 +89,7 @@ pub fn launch_wal_receiver( wal_producer_connstr: &str, tenantid: ZTenantId, ) { - let mut receivers = WAL_RECEIVERS.lock().unwrap(); + let mut receivers = WAL_RECEIVERS.lock(); match receivers.get_mut(&timelineid) { Some(receiver) => { @@ -120,7 +120,7 @@ pub fn launch_wal_receiver( // Look up current WAL producer connection string in the hash table fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String { - let receivers = WAL_RECEIVERS.lock().unwrap(); + let receivers = WAL_RECEIVERS.lock(); receivers .get(&timelineid)