From 5202d2dc982e9861e2710e5ba099fe396f42c20f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 24 Jun 2024 19:03:30 +0000 Subject: [PATCH] WIP: solution approach 1: propagate cancellationtoken from tenant not enough, we need its gate, and there's no concept of child gate --- pageserver/src/tenant.rs | 20 +++++++++++++------- pageserver/src/walredo.rs | 4 ++++ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index b13521e0ed..42d9e10307 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -658,11 +658,6 @@ impl Tenant { walredo_global_state, } = resources; - let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new( - walredo_global_state, - tenant_shard_id, - ))); - let attach_mode = attached_conf.location.attach_mode; let generation = attached_conf.location.generation; @@ -2492,11 +2487,22 @@ impl Tenant { conf: &'static PageServerConf, attached_conf: AttachedTenantConf, shard_identity: ShardIdentity, - walredo_mgr: Option>, tenant_shard_id: TenantShardId, remote_storage: GenericRemoteStorage, deletion_queue_client: DeletionQueueClient, ) -> Tenant { + let cancel = CancellationToken::default(); + + let walredo_mgr = if let TenantState::Broken { .. } = &state { + None + } else { + Some(Arc::new(WalRedoManager::from(PostgresRedoManager::new( + walredo_global_state, + tenant_shard_id, + cancel.child_token(), + )))) + }; + let (state, mut rx) = watch::channel(state); tokio::spawn(async move { @@ -2571,7 +2577,7 @@ impl Tenant { cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)), eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()), activate_now_sem: tokio::sync::Semaphore::new(0), - cancel: CancellationToken::default(), + cancel, gate: Gate::default(), timeline_get_throttle: Arc::new(throttle::Throttle::new( Tenant::get_timeline_get_throttle_config(conf, &attached_conf.tenant_conf), diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index e9782e64b1..e3cb69bdc5 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -35,6 +35,7 @@ use anyhow::Context; use bytes::{Bytes, BytesMut}; use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus}; use pageserver_api::shard::TenantShardId; +use tokio_util::sync::CancellationToken; use std::sync::Arc; use std::time::Duration; use std::time::Instant; @@ -74,6 +75,7 @@ impl GlobalState { pub struct PostgresRedoManager { global_state: Arc, tenant_shard_id: TenantShardId, + cancel: CancellationToken, last_redo_at: std::sync::Mutex>, /// The current [`process::WalRedoProcess`] that is used by new redo requests. /// We use [`heavier_once_cell`] for coalescing the spawning, but the redo @@ -200,10 +202,12 @@ impl PostgresRedoManager { pub fn new( global_state: Arc, tenant_shard_id: TenantShardId, + cancel: CancellationToken, ) -> PostgresRedoManager { // The actual process is launched lazily, on first request. PostgresRedoManager { global_state, + cancel, tenant_shard_id, last_redo_at: std::sync::Mutex::default(), redo_process: heavier_once_cell::OnceCell::default(),