diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 556d6a6828..f3d97c0dfb 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -83,16 +83,10 @@ struct ServiceState { nodes: Arc>, scheduler: Scheduler, - - compute_hook: Arc, - - result_tx: tokio::sync::mpsc::UnboundedSender, } impl ServiceState { fn new( - config: Config, - result_tx: tokio::sync::mpsc::UnboundedSender, nodes: HashMap, tenants: BTreeMap, scheduler: Scheduler, @@ -101,8 +95,6 @@ impl ServiceState { tenants, nodes: Arc::new(nodes), scheduler, - compute_hook: Arc::new(ComputeHook::new(config)), - result_tx, } } @@ -152,6 +144,8 @@ pub struct Service { inner: Arc>, config: Config, persistence: Arc, + compute_hook: Arc, + result_tx: tokio::sync::mpsc::UnboundedSender, // Process shutdown will fire this token cancel: CancellationToken, @@ -481,8 +475,6 @@ impl Service { notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>, deadline: Instant, ) -> HashSet { - let compute_hook = self.inner.read().unwrap().compute_hook.clone(); - let attempt_shards = notifications.iter().map(|i| i.0).collect::>(); let mut success_shards = HashSet::new(); @@ -490,7 +482,7 @@ impl Service { // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. let mut stream = futures::stream::iter(notifications.into_iter()) .map(|(tenant_shard_id, node_id, stripe_size)| { - let compute_hook = compute_hook.clone(); + let compute_hook = self.compute_hook.clone(); let cancel = self.cancel.clone(); async move { if let Err(e) = compute_hook @@ -730,14 +722,12 @@ impl Service { let this = Arc::new(Self { inner: Arc::new(std::sync::RwLock::new(ServiceState::new( - config.clone(), - result_tx, - nodes, - tenants, - scheduler, + nodes, tenants, scheduler, ))), - config, + config: config.clone(), persistence, + compute_hook: Arc::new(ComputeHook::new(config)), + result_tx, startup_complete: startup_complete.clone(), cancel: CancellationToken::new(), gate: Gate::default(), @@ -1145,8 +1135,6 @@ impl Service { let (waiters, response_shards) = { let mut locked = self.inner.write().unwrap(); - let result_tx = locked.result_tx.clone(); - let compute_hook = locked.compute_hook.clone(); let (nodes, tenants, scheduler) = locked.parts_mut(); let mut response_shards = Vec::new(); @@ -1231,17 +1219,7 @@ impl Service { let waiters = tenants .range_mut(TenantShardId::tenant_range(tenant_id)) - .filter_map(|(_shard_id, shard)| { - shard.maybe_reconcile( - result_tx.clone(), - nodes, - &compute_hook, - &self.config, - &self.persistence, - &self.gate, - &self.cancel, - ) - }) + .filter_map(|(_shard_id, shard)| self.maybe_reconcile_shard(shard, nodes)) .collect::>(); (waiters, response_shards) }; @@ -1432,8 +1410,6 @@ impl Service { let mut waiters = Vec::new(); { let mut locked = self.inner.write().unwrap(); - let result_tx = locked.result_tx.clone(); - let compute_hook = locked.compute_hook.clone(); let (nodes, tenants, scheduler) = locked.parts_mut(); for ShardUpdate { @@ -1461,15 +1437,7 @@ impl Service { shard.schedule(scheduler)?; - let maybe_waiter = shard.maybe_reconcile( - result_tx.clone(), - nodes, - &compute_hook, - &self.config, - &self.persistence, - &self.gate, - &self.cancel, - ); + let maybe_waiter = self.maybe_reconcile_shard(shard, nodes); if let Some(waiter) = maybe_waiter { waiters.push(waiter); } @@ -1514,20 +1482,10 @@ impl Service { let waiters = { let mut waiters = Vec::new(); let mut locked = self.inner.write().unwrap(); - let result_tx = locked.result_tx.clone(); - let compute_hook = locked.compute_hook.clone(); let (nodes, tenants, _scheduler) = locked.parts_mut(); for (_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) { shard.config = config.clone(); - if let Some(waiter) = shard.maybe_reconcile( - result_tx.clone(), - nodes, - &compute_hook, - &self.config, - &self.persistence, - &self.gate, - &self.cancel, - ) { + if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) { waiters.push(waiter); } } @@ -2159,7 +2117,7 @@ impl Service { } // Validate input, and calculate which shards we will create - let (old_shard_count, targets, compute_hook) = + let (old_shard_count, targets) = { let locked = self.inner.read().unwrap(); @@ -2255,7 +2213,7 @@ impl Service { } } - (old_shard_count, targets, locked.compute_hook.clone()) + (old_shard_count, targets) }; // unwrap safety: we would have returned above if we didn't find at least one shard to split @@ -2451,7 +2409,8 @@ impl Service { // Send compute notifications for all the new shards let mut failed_notifications = Vec::new(); for (child_id, child_ps, stripe_size) in child_locations { - if let Err(e) = compute_hook + if let Err(e) = self + .compute_hook .notify(child_id, child_ps, stripe_size, &self.cancel) .await { @@ -2481,8 +2440,6 @@ impl Service { ) -> Result { let waiter = { let mut locked = self.inner.write().unwrap(); - let result_tx = locked.result_tx.clone(); - let compute_hook = locked.compute_hook.clone(); let (nodes, tenants, scheduler) = locked.parts_mut(); let Some(node) = nodes.get(&migrate_req.node_id) else { @@ -2542,15 +2499,7 @@ impl Service { shard.sequence = shard.sequence.next(); } - shard.maybe_reconcile( - result_tx, - nodes, - &compute_hook, - &self.config, - &self.persistence, - &self.gate, - &self.cancel, - ) + self.maybe_reconcile_shard(shard, nodes) }; if let Some(waiter) = waiter { @@ -2814,8 +2763,6 @@ impl Service { } let mut locked = self.inner.write().unwrap(); - let result_tx = locked.result_tx.clone(); - let compute_hook = locked.compute_hook.clone(); let (nodes, tenants, scheduler) = locked.parts_mut(); let mut new_nodes = (**nodes).clone(); @@ -2867,16 +2814,8 @@ impl Service { tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id); } Ok(()) => { - if tenant_state - .maybe_reconcile( - result_tx.clone(), - &new_nodes, - &compute_hook, - &self.config, - &self.persistence, - &self.gate, - &self.cancel, - ) + if self + .maybe_reconcile_shard(tenant_state, &new_nodes) .is_some() { tenants_affected += 1; @@ -2900,15 +2839,7 @@ impl Service { tenant_state.observed.locations.get_mut(&config_req.node_id) { if observed_loc.conf.is_none() { - tenant_state.maybe_reconcile( - result_tx.clone(), - &new_nodes, - &compute_hook, - &self.config, - &self.persistence, - &self.gate, - &self.cancel, - ); + self.maybe_reconcile_shard(tenant_state, &new_nodes); } } } @@ -2937,22 +2868,12 @@ impl Service { tenant_id: TenantId, ) -> Result, anyhow::Error> { let mut waiters = Vec::new(); - let result_tx = locked.result_tx.clone(); - let compute_hook = locked.compute_hook.clone(); let (nodes, tenants, scheduler) = locked.parts_mut(); for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) { shard.schedule(scheduler)?; - if let Some(waiter) = shard.maybe_reconcile( - result_tx.clone(), - nodes, - &compute_hook, - &self.config, - &self.persistence, - &self.gate, - &self.cancel, - ) { + if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) { waiters.push(waiter); } } @@ -2987,28 +2908,34 @@ impl Service { Ok(()) } + /// Convenience wrapper around [`TenantState::maybe_reconcile`] that provides + /// all the references to parts of Self that are needed + fn maybe_reconcile_shard( + &self, + shard: &mut TenantState, + nodes: &Arc>, + ) -> Option { + shard.maybe_reconcile( + &self.result_tx, + nodes, + &self.compute_hook, + &self.config, + &self.persistence, + &self.gate, + &self.cancel, + ) + } + /// Check all tenants for pending reconciliation work, and reconcile those in need /// /// Returns how many reconciliation tasks were started fn reconcile_all(&self) -> usize { let mut locked = self.inner.write().unwrap(); - let result_tx = locked.result_tx.clone(); - let compute_hook = locked.compute_hook.clone(); let pageservers = locked.nodes.clone(); locked .tenants .iter_mut() - .filter_map(|(_tenant_shard_id, shard)| { - shard.maybe_reconcile( - result_tx.clone(), - &pageservers, - &compute_hook, - &self.config, - &self.persistence, - &self.gate, - &self.cancel, - ) - }) + .filter_map(|(_tenant_shard_id, shard)| self.maybe_reconcile_shard(shard, &pageservers)) .count() } diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs index c775736b31..3c91e09ac3 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -617,7 +617,7 @@ impl TenantState { #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))] pub(crate) fn maybe_reconcile( &mut self, - result_tx: tokio::sync::mpsc::UnboundedSender, + result_tx: &tokio::sync::mpsc::UnboundedSender, pageservers: &Arc>, compute_hook: &Arc, service_config: &service::Config, @@ -729,6 +729,7 @@ impl TenantState { tenant_id=%reconciler.tenant_shard_id.tenant_id, shard_id=%reconciler.tenant_shard_id.shard_slug()); metrics::RECONCILER.spawned.inc(); + let result_tx = result_tx.clone(); let join_handle = tokio::task::spawn( async move { // Wait for any previous reconcile task to complete before we start