refactor(walredo): avoid the need for a WalRedoManager in broken tenants

When we'll later introduce a global pool of pre-spawned walredo
processes (https://github.com/neondatabase/neon/issues/6581), this
refactoring avoids plumbing through the reference to the pool to all the
places where we create a broken tenant.

Builds atop the refactoring in #6583
This commit is contained in:
Christian Schwarz
2024-02-02 14:52:53 +00:00
parent 2374e1318e
commit f73aa3eb32
4 changed files with 19 additions and 16 deletions

View File

@@ -276,7 +276,7 @@ pub struct Tenant {
// with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
// timeout...
gc_cs: tokio::sync::Mutex<()>,
walredo_mgr: Arc<WalRedoManager>,
walredo_mgr: Option<Arc<WalRedoManager>>,
// provides access to timeline data sitting in the remote storage
pub(crate) remote_storage: Option<GenericRemoteStorage>,
@@ -630,7 +630,7 @@ impl Tenant {
conf,
attached_conf,
shard_identity,
wal_redo_manager,
Some(wal_redo_manager),
tenant_shard_id,
remote_storage.clone(),
deletion_queue_client,
@@ -1184,10 +1184,6 @@ impl Tenant {
tenant_shard_id: TenantShardId,
reason: String,
) -> Arc<Tenant> {
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf,
tenant_shard_id,
)));
Arc::new(Tenant::new(
TenantState::Broken {
reason,
@@ -1198,7 +1194,7 @@ impl Tenant {
// Shard identity isn't meaningful for a broken tenant: it's just a placeholder
// to occupy the slot for this TenantShardId.
ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
wal_redo_manager,
None,
tenant_shard_id,
None,
DeletionQueueClient::broken(),
@@ -1967,7 +1963,7 @@ impl Tenant {
}
pub(crate) fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
self.walredo_mgr.status()
self.walredo_mgr.as_ref().and_then(|mgr| mgr.status())
}
/// Changes tenant status to active, unless shutdown was already requested.
@@ -2607,7 +2603,7 @@ impl Tenant {
self.tenant_shard_id,
self.generation,
self.shard_identity,
Arc::clone(&self.walredo_mgr),
self.walredo_mgr.as_ref().map(Arc::clone),
resources,
pg_version,
state,
@@ -2625,7 +2621,7 @@ impl Tenant {
conf: &'static PageServerConf,
attached_conf: AttachedTenantConf,
shard_identity: ShardIdentity,
walredo_mgr: Arc<WalRedoManager>,
walredo_mgr: Option<Arc<WalRedoManager>>,
tenant_shard_id: TenantShardId,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
@@ -4056,7 +4052,7 @@ pub(crate) mod harness {
.unwrap(),
// This is a legacy/test code path: sharding isn't supported here.
ShardIdentity::unsharded(),
walredo_mgr,
Some(walredo_mgr),
self.tenant_shard_id,
Some(self.remote_storage.clone()),
self.deletion_queue.new_client(),

View File

@@ -199,7 +199,9 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
tenant.walredo_mgr.maybe_quiesce(period * 10);
if let Some(walredo_mgr) = &tenant.walredo_mgr {
walredo_mgr.maybe_quiesce(period * 10);
}
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())

View File

@@ -215,8 +215,8 @@ pub struct Timeline {
// Atomic would be more appropriate here.
last_freeze_ts: RwLock<Instant>,
// WAL redo manager
walredo_mgr: Arc<super::WalRedoManager>,
// WAL redo manager. `None` only for broken tenants.
walredo_mgr: Option<Arc<super::WalRedoManager>>,
/// Remote storage client.
/// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
@@ -1421,7 +1421,7 @@ impl Timeline {
tenant_shard_id: TenantShardId,
generation: Generation,
shard_identity: ShardIdentity,
walredo_mgr: Arc<super::WalRedoManager>,
walredo_mgr: Option<Arc<super::WalRedoManager>>,
resources: TimelineResources,
pg_version: u32,
state: TimelineState,
@@ -4445,6 +4445,9 @@ impl Timeline {
let img = match self
.walredo_mgr
.as_ref()
.context("timeline has no walredo manager")
.map_err(PageReconstructError::WalRedo)?
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
.await
.context("reconstruct a page image")

View File

@@ -10,11 +10,13 @@ use nix::poll::{PollFd, PollFlags};
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
use postgres_ffi::BLCKSZ;
use std::os::fd::AsRawFd;
#[cfg(feature = "testing")]
use std::sync::atomic::AtomicUsize;
use std::{
collections::VecDeque,
io::{Read, Write},
process::{ChildStdin, ChildStdout, Command, Stdio},
sync::{atomic::AtomicUsize, Mutex, MutexGuard},
sync::{Mutex, MutexGuard},
time::Duration,
};
use tracing::{debug, error, instrument, Instrument};