mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 20:12:54 +00:00
storcon: improve initial shard scheduling (#9081)
## Problem Scheduling on tenant creation uses different heuristics compared to the scheduling done during background optimizations. This results in scenarios where shards are created and then immediately migrated by the optimizer. ## Summary of changes 1. Make scheduler aware of the type of the shard it is scheduling (attached vs secondary). We wish to have different heuristics. 2. For attached shards, include the attached shard count from the context in the node score calculation. This brings initial shard scheduling in line with what the optimization passes do. 3. Add a test for (2). This looks like a bigger change than required, but the refactoring serves as the basis for az-aware shard scheduling where we also need to make the distinction between attached and secondary shards. Closes https://github.com/neondatabase/neon/issues/8969
This commit is contained in:
@@ -2,7 +2,7 @@ use crate::{node::Node, tenant_shard::TenantShard};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::models::PageserverUtilization;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::{collections::HashMap, fmt::Debug};
|
||||
use utils::{http::error::ApiError, id::NodeId};
|
||||
|
||||
/// Scenarios in which we cannot find a suitable location for a tenant shard
|
||||
@@ -27,7 +27,7 @@ pub enum MaySchedule {
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct SchedulerNode {
|
||||
pub(crate) struct SchedulerNode {
|
||||
/// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
|
||||
shard_count: usize,
|
||||
/// How many shards are currently attached on this node, via their [`crate::tenant_shard::IntentState`].
|
||||
@@ -38,6 +38,137 @@ struct SchedulerNode {
|
||||
may_schedule: MaySchedule,
|
||||
}
|
||||
|
||||
pub(crate) trait NodeSchedulingScore: Debug + Ord + Copy + Sized {
|
||||
fn generate(
|
||||
node_id: &NodeId,
|
||||
node: &mut SchedulerNode,
|
||||
context: &ScheduleContext,
|
||||
) -> Option<Self>;
|
||||
fn is_overloaded(&self) -> bool;
|
||||
fn node_id(&self) -> NodeId;
|
||||
}
|
||||
|
||||
pub(crate) trait ShardTag {
|
||||
type Score: NodeSchedulingScore;
|
||||
}
|
||||
|
||||
pub(crate) struct AttachedShardTag {}
|
||||
impl ShardTag for AttachedShardTag {
|
||||
type Score = NodeAttachmentSchedulingScore;
|
||||
}
|
||||
|
||||
pub(crate) struct SecondaryShardTag {}
|
||||
impl ShardTag for SecondaryShardTag {
|
||||
type Score = NodeSecondarySchedulingScore;
|
||||
}
|
||||
|
||||
/// Scheduling score of a given node for shard attachments.
|
||||
/// Lower scores indicate more suitable nodes.
|
||||
/// Ordering is given by member declaration order (top to bottom).
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
|
||||
pub(crate) struct NodeAttachmentSchedulingScore {
|
||||
/// The number of shards belonging to the tenant currently being
|
||||
/// scheduled that are attached to this node.
|
||||
affinity_score: AffinityScore,
|
||||
/// Size of [`ScheduleContext::attached_nodes`] for the current node.
|
||||
/// This normally tracks the number of attached shards belonging to the
|
||||
/// tenant being scheduled that are already on this node.
|
||||
attached_shards_in_context: usize,
|
||||
/// Utilisation score that combines shard count and disk utilisation
|
||||
utilization_score: u64,
|
||||
/// Total number of shards attached to this node. When nodes have identical utilisation, this
|
||||
/// acts as an anti-affinity between attached shards.
|
||||
total_attached_shard_count: usize,
|
||||
/// Convenience to make selection deterministic in tests and empty systems
|
||||
node_id: NodeId,
|
||||
}
|
||||
|
||||
impl NodeSchedulingScore for NodeAttachmentSchedulingScore {
|
||||
fn generate(
|
||||
node_id: &NodeId,
|
||||
node: &mut SchedulerNode,
|
||||
context: &ScheduleContext,
|
||||
) -> Option<Self> {
|
||||
let utilization = match &mut node.may_schedule {
|
||||
MaySchedule::Yes(u) => u,
|
||||
MaySchedule::No => {
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
Some(Self {
|
||||
affinity_score: context
|
||||
.nodes
|
||||
.get(node_id)
|
||||
.copied()
|
||||
.unwrap_or(AffinityScore::FREE),
|
||||
attached_shards_in_context: context.attached_nodes.get(node_id).copied().unwrap_or(0),
|
||||
utilization_score: utilization.cached_score(),
|
||||
total_attached_shard_count: node.attached_shard_count,
|
||||
node_id: *node_id,
|
||||
})
|
||||
}
|
||||
|
||||
fn is_overloaded(&self) -> bool {
|
||||
PageserverUtilization::is_overloaded(self.utilization_score)
|
||||
}
|
||||
|
||||
fn node_id(&self) -> NodeId {
|
||||
self.node_id
|
||||
}
|
||||
}
|
||||
|
||||
/// Scheduling score of a given node for shard secondaries.
|
||||
/// Lower scores indicate more suitable nodes.
|
||||
/// Ordering is given by member declaration order (top to bottom).
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)]
|
||||
pub(crate) struct NodeSecondarySchedulingScore {
|
||||
/// The number of shards belonging to the tenant currently being
|
||||
/// scheduled that are attached to this node.
|
||||
affinity_score: AffinityScore,
|
||||
/// Utilisation score that combines shard count and disk utilisation
|
||||
utilization_score: u64,
|
||||
/// Total number of shards attached to this node. When nodes have identical utilisation, this
|
||||
/// acts as an anti-affinity between attached shards.
|
||||
total_attached_shard_count: usize,
|
||||
/// Convenience to make selection deterministic in tests and empty systems
|
||||
node_id: NodeId,
|
||||
}
|
||||
|
||||
impl NodeSchedulingScore for NodeSecondarySchedulingScore {
|
||||
fn generate(
|
||||
node_id: &NodeId,
|
||||
node: &mut SchedulerNode,
|
||||
context: &ScheduleContext,
|
||||
) -> Option<Self> {
|
||||
let utilization = match &mut node.may_schedule {
|
||||
MaySchedule::Yes(u) => u,
|
||||
MaySchedule::No => {
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
Some(Self {
|
||||
affinity_score: context
|
||||
.nodes
|
||||
.get(node_id)
|
||||
.copied()
|
||||
.unwrap_or(AffinityScore::FREE),
|
||||
utilization_score: utilization.cached_score(),
|
||||
total_attached_shard_count: node.attached_shard_count,
|
||||
node_id: *node_id,
|
||||
})
|
||||
}
|
||||
|
||||
fn is_overloaded(&self) -> bool {
|
||||
PageserverUtilization::is_overloaded(self.utilization_score)
|
||||
}
|
||||
|
||||
fn node_id(&self) -> NodeId {
|
||||
self.node_id
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for SchedulerNode {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
let may_schedule_matches = matches!(
|
||||
@@ -406,6 +537,28 @@ impl Scheduler {
|
||||
node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
|
||||
}
|
||||
|
||||
/// Compute a schedulling score for each node that the scheduler knows of
|
||||
/// minus a set of hard excluded nodes.
|
||||
fn compute_node_scores<Score>(
|
||||
&mut self,
|
||||
hard_exclude: &[NodeId],
|
||||
context: &ScheduleContext,
|
||||
) -> Vec<Score>
|
||||
where
|
||||
Score: NodeSchedulingScore,
|
||||
{
|
||||
self.nodes
|
||||
.iter_mut()
|
||||
.filter_map(|(k, v)| {
|
||||
if hard_exclude.contains(k) {
|
||||
None
|
||||
} else {
|
||||
Score::generate(k, v, context)
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
|
||||
/// are already in use by this shard -- we use this to avoid picking the same node
|
||||
/// as both attached and secondary location. This is a hard constraint: if we cannot
|
||||
@@ -415,7 +568,7 @@ impl Scheduler {
|
||||
/// to their anti-affinity score. We use this to prefeer to avoid placing shards in
|
||||
/// the same tenant on the same node. This is a soft constraint: the context will never
|
||||
/// cause us to fail to schedule a shard.
|
||||
pub(crate) fn schedule_shard(
|
||||
pub(crate) fn schedule_shard<Tag: ShardTag>(
|
||||
&mut self,
|
||||
hard_exclude: &[NodeId],
|
||||
context: &ScheduleContext,
|
||||
@@ -424,20 +577,7 @@ impl Scheduler {
|
||||
return Err(ScheduleError::NoPageservers);
|
||||
}
|
||||
|
||||
let mut scores: Vec<(NodeId, AffinityScore, u64, usize)> = self
|
||||
.nodes
|
||||
.iter_mut()
|
||||
.filter_map(|(k, v)| match &mut v.may_schedule {
|
||||
MaySchedule::No => None,
|
||||
MaySchedule::Yes(_) if hard_exclude.contains(k) => None,
|
||||
MaySchedule::Yes(utilization) => Some((
|
||||
*k,
|
||||
context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
|
||||
utilization.cached_score(),
|
||||
v.attached_shard_count,
|
||||
)),
|
||||
})
|
||||
.collect();
|
||||
let mut scores = self.compute_node_scores::<Tag::Score>(hard_exclude, context);
|
||||
|
||||
// Exclude nodes whose utilization is critically high, if there are alternatives available. This will
|
||||
// cause us to violate affinity rules if it is necessary to avoid critically overloading nodes: for example
|
||||
@@ -445,20 +585,18 @@ impl Scheduler {
|
||||
// overloaded.
|
||||
let non_overloaded_scores = scores
|
||||
.iter()
|
||||
.filter(|i| !PageserverUtilization::is_overloaded(i.2))
|
||||
.filter(|i| !i.is_overloaded())
|
||||
.copied()
|
||||
.collect::<Vec<_>>();
|
||||
if !non_overloaded_scores.is_empty() {
|
||||
scores = non_overloaded_scores;
|
||||
}
|
||||
|
||||
// Sort by, in order of precedence:
|
||||
// 1st: Affinity score. We should never pick a higher-score node if a lower-score node is available
|
||||
// 2nd: Utilization score (this combines shard count and disk utilization)
|
||||
// 3rd: Attached shard count. When nodes have identical utilization (e.g. when populating some
|
||||
// empty nodes), this acts as an anti-affinity between attached shards.
|
||||
// 4th: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
|
||||
scores.sort_by_key(|i| (i.1, i.2, i.3, i.0));
|
||||
// Sort the nodes by score. The one with the lowest scores will be the preferred node.
|
||||
// Refer to [`NodeAttachmentSchedulingScore`] for attached locations and
|
||||
// [`NodeSecondarySchedulingScore`] for secondary locations to understand how the nodes
|
||||
// are ranked.
|
||||
scores.sort();
|
||||
|
||||
if scores.is_empty() {
|
||||
// After applying constraints, no pageservers were left.
|
||||
@@ -481,12 +619,12 @@ impl Scheduler {
|
||||
}
|
||||
|
||||
// Lowest score wins
|
||||
let node_id = scores.first().unwrap().0;
|
||||
let node_id = scores.first().unwrap().node_id();
|
||||
|
||||
if !matches!(context.mode, ScheduleMode::Speculative) {
|
||||
tracing::info!(
|
||||
"scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
|
||||
scores.iter().map(|i| i.0 .0).collect::<Vec<_>>()
|
||||
scores.iter().map(|i| i.node_id().0).collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
|
||||
@@ -556,9 +694,9 @@ mod tests {
|
||||
|
||||
let context = ScheduleContext::default();
|
||||
|
||||
let scheduled = scheduler.schedule_shard(&[], &context)?;
|
||||
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &context)?;
|
||||
t1_intent.set_attached(&mut scheduler, Some(scheduled));
|
||||
let scheduled = scheduler.schedule_shard(&[], &context)?;
|
||||
let scheduled = scheduler.schedule_shard::<AttachedShardTag>(&[], &context)?;
|
||||
t2_intent.set_attached(&mut scheduler, Some(scheduled));
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
|
||||
@@ -567,7 +705,8 @@ mod tests {
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 1);
|
||||
assert_eq!(scheduler.get_node_attached_shard_count(NodeId(2)), 1);
|
||||
|
||||
let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
|
||||
let scheduled =
|
||||
scheduler.schedule_shard::<AttachedShardTag>(&t1_intent.all_pageservers(), &context)?;
|
||||
t1_intent.push_secondary(&mut scheduler, scheduled);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 1);
|
||||
@@ -621,7 +760,9 @@ mod tests {
|
||||
scheduler: &mut Scheduler,
|
||||
context: &ScheduleContext,
|
||||
) {
|
||||
let scheduled = scheduler.schedule_shard(&[], context).unwrap();
|
||||
let scheduled = scheduler
|
||||
.schedule_shard::<AttachedShardTag>(&[], context)
|
||||
.unwrap();
|
||||
let mut intent = IntentState::new();
|
||||
intent.set_attached(scheduler, Some(scheduled));
|
||||
scheduled_intents.push(intent);
|
||||
|
||||
@@ -26,7 +26,7 @@ use crate::{
|
||||
ShardGenerationState, TenantFilter,
|
||||
},
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
scheduler::{AttachedShardTag, MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
|
||||
ScheduleOptimizationAction,
|
||||
@@ -2629,7 +2629,8 @@ impl Service {
|
||||
let scheduler = &mut locked.scheduler;
|
||||
// Right now we only perform the operation on a single node without parallelization
|
||||
// TODO fan out the operation to multiple nodes for better performance
|
||||
let node_id = scheduler.schedule_shard(&[], &ScheduleContext::default())?;
|
||||
let node_id =
|
||||
scheduler.schedule_shard::<AttachedShardTag>(&[], &ScheduleContext::default())?;
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
@@ -2815,7 +2816,8 @@ impl Service {
|
||||
|
||||
// Pick an arbitrary node to use for remote deletions (does not have to be where the tenant
|
||||
// was attached, just has to be able to see the S3 content)
|
||||
let node_id = scheduler.schedule_shard(&[], &ScheduleContext::default())?;
|
||||
let node_id =
|
||||
scheduler.schedule_shard::<AttachedShardTag>(&[], &ScheduleContext::default())?;
|
||||
let node = nodes
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while lock is active");
|
||||
|
||||
@@ -8,7 +8,10 @@ use crate::{
|
||||
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
|
||||
persistence::TenantShardPersistence,
|
||||
reconciler::{ReconcileUnits, ReconcilerConfig},
|
||||
scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
|
||||
scheduler::{
|
||||
AffinityScore, AttachedShardTag, MaySchedule, RefCountUpdate, ScheduleContext,
|
||||
SecondaryShardTag,
|
||||
},
|
||||
service::ReconcileResultRequest,
|
||||
};
|
||||
use pageserver_api::controller_api::{
|
||||
@@ -335,19 +338,19 @@ pub(crate) enum ReconcileWaitError {
|
||||
Failed(TenantShardId, Arc<ReconcileError>),
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
#[derive(Eq, PartialEq, Debug, Clone)]
|
||||
pub(crate) struct ReplaceSecondary {
|
||||
old_node_id: NodeId,
|
||||
new_node_id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
#[derive(Eq, PartialEq, Debug, Clone)]
|
||||
pub(crate) struct MigrateAttachment {
|
||||
pub(crate) old_attached_node_id: NodeId,
|
||||
pub(crate) new_attached_node_id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
#[derive(Eq, PartialEq, Debug, Clone)]
|
||||
pub(crate) enum ScheduleOptimizationAction {
|
||||
// Replace one of our secondary locations with a different node
|
||||
ReplaceSecondary(ReplaceSecondary),
|
||||
@@ -355,7 +358,7 @@ pub(crate) enum ScheduleOptimizationAction {
|
||||
MigrateAttachment(MigrateAttachment),
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
#[derive(Eq, PartialEq, Debug, Clone)]
|
||||
pub(crate) struct ScheduleOptimization {
|
||||
// What was the reconcile sequence when we generated this optimization? The optimization
|
||||
// should only be applied if the shard's sequence is still at this value, in case other changes
|
||||
@@ -537,7 +540,8 @@ impl TenantShard {
|
||||
Ok((true, promote_secondary))
|
||||
} else {
|
||||
// Pick a fresh node: either we had no secondaries or none were schedulable
|
||||
let node_id = scheduler.schedule_shard(&self.intent.secondary, context)?;
|
||||
let node_id =
|
||||
scheduler.schedule_shard::<AttachedShardTag>(&self.intent.secondary, context)?;
|
||||
tracing::debug!("Selected {} as attached", node_id);
|
||||
self.intent.set_attached(scheduler, Some(node_id));
|
||||
Ok((true, node_id))
|
||||
@@ -613,7 +617,8 @@ impl TenantShard {
|
||||
|
||||
let mut used_pageservers = vec![attached_node_id];
|
||||
while self.intent.secondary.len() < secondary_count {
|
||||
let node_id = scheduler.schedule_shard(&used_pageservers, context)?;
|
||||
let node_id = scheduler
|
||||
.schedule_shard::<SecondaryShardTag>(&used_pageservers, context)?;
|
||||
self.intent.push_secondary(scheduler, node_id);
|
||||
used_pageservers.push(node_id);
|
||||
modified = true;
|
||||
@@ -626,7 +631,7 @@ impl TenantShard {
|
||||
modified = true;
|
||||
} else if self.intent.secondary.is_empty() {
|
||||
// Populate secondary by scheduling a fresh node
|
||||
let node_id = scheduler.schedule_shard(&[], context)?;
|
||||
let node_id = scheduler.schedule_shard::<SecondaryShardTag>(&[], context)?;
|
||||
self.intent.push_secondary(scheduler, node_id);
|
||||
modified = true;
|
||||
}
|
||||
@@ -803,9 +808,10 @@ impl TenantShard {
|
||||
// Let the scheduler suggest a node, where it would put us if we were scheduling afresh
|
||||
// This implicitly limits the choice to nodes that are available, and prefers nodes
|
||||
// with lower utilization.
|
||||
let Ok(candidate_node) =
|
||||
scheduler.schedule_shard(&self.intent.all_pageservers(), schedule_context)
|
||||
else {
|
||||
let Ok(candidate_node) = scheduler.schedule_shard::<SecondaryShardTag>(
|
||||
&self.intent.all_pageservers(),
|
||||
schedule_context,
|
||||
) else {
|
||||
// A scheduling error means we have no possible candidate replacements
|
||||
continue;
|
||||
};
|
||||
@@ -1333,6 +1339,8 @@ impl TenantShard {
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use std::{cell::RefCell, rc::Rc};
|
||||
|
||||
use pageserver_api::{
|
||||
controller_api::NodeAvailability,
|
||||
shard::{ShardCount, ShardNumber},
|
||||
@@ -1637,12 +1645,14 @@ pub(crate) mod tests {
|
||||
|
||||
// Optimize til quiescent: this emulates what Service::optimize_all does, when
|
||||
// called repeatedly in the background.
|
||||
// Returns the applied optimizations
|
||||
fn optimize_til_idle(
|
||||
nodes: &HashMap<NodeId, Node>,
|
||||
scheduler: &mut Scheduler,
|
||||
shards: &mut [TenantShard],
|
||||
) {
|
||||
) -> Vec<ScheduleOptimization> {
|
||||
let mut loop_n = 0;
|
||||
let mut optimizations = Vec::default();
|
||||
loop {
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
let mut any_changed = false;
|
||||
@@ -1657,6 +1667,7 @@ pub(crate) mod tests {
|
||||
for shard in shards.iter_mut() {
|
||||
let optimization = shard.optimize_attachment(nodes, &schedule_context);
|
||||
if let Some(optimization) = optimization {
|
||||
optimizations.push(optimization.clone());
|
||||
shard.apply_optimization(scheduler, optimization);
|
||||
any_changed = true;
|
||||
break;
|
||||
@@ -1664,6 +1675,7 @@ pub(crate) mod tests {
|
||||
|
||||
let optimization = shard.optimize_secondary(scheduler, &schedule_context);
|
||||
if let Some(optimization) = optimization {
|
||||
optimizations.push(optimization.clone());
|
||||
shard.apply_optimization(scheduler, optimization);
|
||||
any_changed = true;
|
||||
break;
|
||||
@@ -1678,6 +1690,8 @@ pub(crate) mod tests {
|
||||
loop_n += 1;
|
||||
assert!(loop_n < 1000);
|
||||
}
|
||||
|
||||
optimizations
|
||||
}
|
||||
|
||||
/// Test the balancing behavior of shard scheduling: that it achieves a balance, and
|
||||
@@ -1730,4 +1744,48 @@ pub(crate) mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test that initial shard scheduling is optimal. By optimal we mean
|
||||
/// that the optimizer cannot find a way to improve it.
|
||||
///
|
||||
/// This test is an example of the scheduling issue described in
|
||||
/// https://github.com/neondatabase/neon/issues/8969
|
||||
#[test]
|
||||
fn initial_scheduling_is_optimal() -> anyhow::Result<()> {
|
||||
use itertools::Itertools;
|
||||
|
||||
let nodes = make_test_nodes(2);
|
||||
|
||||
let mut scheduler = Scheduler::new([].iter());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
|
||||
|
||||
let mut a = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
|
||||
let a_context = Rc::new(RefCell::new(ScheduleContext::default()));
|
||||
|
||||
let mut b = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
|
||||
let b_context = Rc::new(RefCell::new(ScheduleContext::default()));
|
||||
|
||||
let a_shards_with_context = a.iter_mut().map(|shard| (shard, a_context.clone()));
|
||||
let b_shards_with_context = b.iter_mut().map(|shard| (shard, b_context.clone()));
|
||||
|
||||
let schedule_order = a_shards_with_context.interleave(b_shards_with_context);
|
||||
|
||||
for (shard, context) in schedule_order {
|
||||
let context = &mut *context.borrow_mut();
|
||||
shard.schedule(&mut scheduler, context).unwrap();
|
||||
}
|
||||
|
||||
let applied_to_a = optimize_til_idle(&nodes, &mut scheduler, &mut a);
|
||||
assert_eq!(applied_to_a, vec![]);
|
||||
|
||||
let applied_to_b = optimize_til_idle(&nodes, &mut scheduler, &mut b);
|
||||
assert_eq!(applied_to_b, vec![]);
|
||||
|
||||
for shard in a.iter_mut().chain(b.iter_mut()) {
|
||||
shard.intent.clear(&mut scheduler);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user