storage controller: refactor non-mutable members up into Service (#7086)

result_tx and compute_hook were in ServiceState (i.e. behind a sync
mutex), but didn't need to be.

Moving them up into Service removes a bunch of boilerplate clones.

While we're here, create a helper `Service::maybe_reconcile_shard` which
avoids writing out all the `&self.` arguments to
`TenantState::maybe_reconcile` everywhere we call it.
This commit is contained in:
John Spray
2024-03-11 14:29:32 +00:00
committed by GitHub
parent 26ae7b0b3e
commit b4972d07d4
2 changed files with 40 additions and 112 deletions

View File

@@ -83,16 +83,10 @@ struct ServiceState {
nodes: Arc<HashMap<NodeId, Node>>,
scheduler: Scheduler,
compute_hook: Arc<ComputeHook>,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
}
impl ServiceState {
fn new(
config: Config,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
nodes: HashMap<NodeId, Node>,
tenants: BTreeMap<TenantShardId, TenantState>,
scheduler: Scheduler,
@@ -101,8 +95,6 @@ impl ServiceState {
tenants,
nodes: Arc::new(nodes),
scheduler,
compute_hook: Arc::new(ComputeHook::new(config)),
result_tx,
}
}
@@ -152,6 +144,8 @@ pub struct Service {
inner: Arc<std::sync::RwLock<ServiceState>>,
config: Config,
persistence: Arc<Persistence>,
compute_hook: Arc<ComputeHook>,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
// Process shutdown will fire this token
cancel: CancellationToken,
@@ -481,8 +475,6 @@ impl Service {
notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
deadline: Instant,
) -> HashSet<TenantShardId> {
let compute_hook = self.inner.read().unwrap().compute_hook.clone();
let attempt_shards = notifications.iter().map(|i| i.0).collect::<HashSet<_>>();
let mut success_shards = HashSet::new();
@@ -490,7 +482,7 @@ impl Service {
// in order to subsequently use .buffered() on the stream to execute with bounded parallelism.
let mut stream = futures::stream::iter(notifications.into_iter())
.map(|(tenant_shard_id, node_id, stripe_size)| {
let compute_hook = compute_hook.clone();
let compute_hook = self.compute_hook.clone();
let cancel = self.cancel.clone();
async move {
if let Err(e) = compute_hook
@@ -730,14 +722,12 @@ impl Service {
let this = Arc::new(Self {
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
config.clone(),
result_tx,
nodes,
tenants,
scheduler,
nodes, tenants, scheduler,
))),
config,
config: config.clone(),
persistence,
compute_hook: Arc::new(ComputeHook::new(config)),
result_tx,
startup_complete: startup_complete.clone(),
cancel: CancellationToken::new(),
gate: Gate::default(),
@@ -1145,8 +1135,6 @@ impl Service {
let (waiters, response_shards) = {
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, scheduler) = locked.parts_mut();
let mut response_shards = Vec::new();
@@ -1231,17 +1219,7 @@ impl Service {
let waiters = tenants
.range_mut(TenantShardId::tenant_range(tenant_id))
.filter_map(|(_shard_id, shard)| {
shard.maybe_reconcile(
result_tx.clone(),
nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
)
})
.filter_map(|(_shard_id, shard)| self.maybe_reconcile_shard(shard, nodes))
.collect::<Vec<_>>();
(waiters, response_shards)
};
@@ -1432,8 +1410,6 @@ impl Service {
let mut waiters = Vec::new();
{
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, scheduler) = locked.parts_mut();
for ShardUpdate {
@@ -1461,15 +1437,7 @@ impl Service {
shard.schedule(scheduler)?;
let maybe_waiter = shard.maybe_reconcile(
result_tx.clone(),
nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
);
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
if let Some(waiter) = maybe_waiter {
waiters.push(waiter);
}
@@ -1514,20 +1482,10 @@ impl Service {
let waiters = {
let mut waiters = Vec::new();
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, _scheduler) = locked.parts_mut();
for (_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
shard.config = config.clone();
if let Some(waiter) = shard.maybe_reconcile(
result_tx.clone(),
nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
) {
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
waiters.push(waiter);
}
}
@@ -2159,7 +2117,7 @@ impl Service {
}
// Validate input, and calculate which shards we will create
let (old_shard_count, targets, compute_hook) =
let (old_shard_count, targets) =
{
let locked = self.inner.read().unwrap();
@@ -2255,7 +2213,7 @@ impl Service {
}
}
(old_shard_count, targets, locked.compute_hook.clone())
(old_shard_count, targets)
};
// unwrap safety: we would have returned above if we didn't find at least one shard to split
@@ -2451,7 +2409,8 @@ impl Service {
// Send compute notifications for all the new shards
let mut failed_notifications = Vec::new();
for (child_id, child_ps, stripe_size) in child_locations {
if let Err(e) = compute_hook
if let Err(e) = self
.compute_hook
.notify(child_id, child_ps, stripe_size, &self.cancel)
.await
{
@@ -2481,8 +2440,6 @@ impl Service {
) -> Result<TenantShardMigrateResponse, ApiError> {
let waiter = {
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, scheduler) = locked.parts_mut();
let Some(node) = nodes.get(&migrate_req.node_id) else {
@@ -2542,15 +2499,7 @@ impl Service {
shard.sequence = shard.sequence.next();
}
shard.maybe_reconcile(
result_tx,
nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
)
self.maybe_reconcile_shard(shard, nodes)
};
if let Some(waiter) = waiter {
@@ -2814,8 +2763,6 @@ impl Service {
}
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, scheduler) = locked.parts_mut();
let mut new_nodes = (**nodes).clone();
@@ -2867,16 +2814,8 @@ impl Service {
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id);
}
Ok(()) => {
if tenant_state
.maybe_reconcile(
result_tx.clone(),
&new_nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
)
if self
.maybe_reconcile_shard(tenant_state, &new_nodes)
.is_some()
{
tenants_affected += 1;
@@ -2900,15 +2839,7 @@ impl Service {
tenant_state.observed.locations.get_mut(&config_req.node_id)
{
if observed_loc.conf.is_none() {
tenant_state.maybe_reconcile(
result_tx.clone(),
&new_nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
);
self.maybe_reconcile_shard(tenant_state, &new_nodes);
}
}
}
@@ -2937,22 +2868,12 @@ impl Service {
tenant_id: TenantId,
) -> Result<Vec<ReconcilerWaiter>, anyhow::Error> {
let mut waiters = Vec::new();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, scheduler) = locked.parts_mut();
for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
shard.schedule(scheduler)?;
if let Some(waiter) = shard.maybe_reconcile(
result_tx.clone(),
nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
) {
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
waiters.push(waiter);
}
}
@@ -2987,28 +2908,34 @@ impl Service {
Ok(())
}
/// Convenience wrapper around [`TenantState::maybe_reconcile`] that provides
/// all the references to parts of Self that are needed
fn maybe_reconcile_shard(
&self,
shard: &mut TenantState,
nodes: &Arc<HashMap<NodeId, Node>>,
) -> Option<ReconcilerWaiter> {
shard.maybe_reconcile(
&self.result_tx,
nodes,
&self.compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
)
}
/// Check all tenants for pending reconciliation work, and reconcile those in need
///
/// Returns how many reconciliation tasks were started
fn reconcile_all(&self) -> usize {
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let pageservers = locked.nodes.clone();
locked
.tenants
.iter_mut()
.filter_map(|(_tenant_shard_id, shard)| {
shard.maybe_reconcile(
result_tx.clone(),
&pageservers,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
)
})
.filter_map(|(_tenant_shard_id, shard)| self.maybe_reconcile_shard(shard, &pageservers))
.count()
}

View File

@@ -617,7 +617,7 @@ impl TenantState {
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn maybe_reconcile(
&mut self,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
pageservers: &Arc<HashMap<NodeId, Node>>,
compute_hook: &Arc<ComputeHook>,
service_config: &service::Config,
@@ -729,6 +729,7 @@ impl TenantState {
tenant_id=%reconciler.tenant_shard_id.tenant_id,
shard_id=%reconciler.tenant_shard_id.shard_slug());
metrics::RECONCILER.spawned.inc();
let result_tx = result_tx.clone();
let join_handle = tokio::task::spawn(
async move {
// Wait for any previous reconcile task to complete before we start