mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-06 20:20:37 +00:00
Compare commits
8 Commits
compute_ct
...
jcsp/storc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a12072b789 | ||
|
|
e791083be0 | ||
|
|
f7a4642a64 | ||
|
|
1cdfa198ef | ||
|
|
6d2a752dac | ||
|
|
8d06a28350 | ||
|
|
7413a6dd7c | ||
|
|
41d440f1da |
@@ -101,6 +101,15 @@ impl PageserverClient {
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
|
||||
measured_request!(
|
||||
"tenant_heatmap_upload",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner.tenant_heatmap_upload(tenant_id).await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn location_config(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
@@ -58,6 +58,70 @@ pub(crate) struct Scheduler {
|
||||
nodes: HashMap<NodeId, SchedulerNode>,
|
||||
}
|
||||
|
||||
/// Score for soft constraint scheduling: lower scores are preferred to higher scores.
|
||||
///
|
||||
/// For example, we may set an affinity score based on the number of shards from the same
|
||||
/// tenant already on a node, to implicitly prefer to balance out shards.
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
|
||||
pub(crate) struct AffinityScore(pub(crate) usize);
|
||||
|
||||
impl AffinityScore {
|
||||
/// If we have no anti-affinity at all toward a node, this is its score. It means
|
||||
/// the scheduler has a free choice amongst nodes with this score, and may pick a node
|
||||
/// based on other information such as total utilization.
|
||||
pub(crate) const FREE: Self = Self(0);
|
||||
|
||||
pub(crate) fn inc(&mut self) {
|
||||
self.0 += 1;
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Add for AffinityScore {
|
||||
type Output = Self;
|
||||
|
||||
fn add(self, rhs: Self) -> Self::Output {
|
||||
Self(self.0 + rhs.0)
|
||||
}
|
||||
}
|
||||
|
||||
// For carrying state between multiple calls to [`TenantState::schedule`], e.g. when calling
|
||||
// it for many shards in the same tenant.
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct ScheduleContext {
|
||||
/// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
|
||||
pub(crate) nodes: HashMap<NodeId, AffinityScore>,
|
||||
|
||||
/// Specifically how many _attached_ locations are on each node
|
||||
pub(crate) attached_nodes: HashMap<NodeId, usize>,
|
||||
}
|
||||
|
||||
impl ScheduleContext {
|
||||
/// Input is a list of nodes we would like to avoid using again within this context. The more
|
||||
/// times a node is passed into this call, the less inclined we are to use it.
|
||||
pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
|
||||
for node_id in nodes {
|
||||
let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
|
||||
entry.inc()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn push_attached(&mut self, node_id: NodeId) {
|
||||
let entry = self.attached_nodes.entry(node_id).or_default();
|
||||
*entry += 1;
|
||||
}
|
||||
|
||||
pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
|
||||
self.nodes
|
||||
.get(&node_id)
|
||||
.copied()
|
||||
.unwrap_or(AffinityScore::FREE)
|
||||
}
|
||||
|
||||
pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
|
||||
self.attached_nodes.get(&node_id).copied().unwrap_or(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
|
||||
let mut scheduler_nodes = HashMap::new();
|
||||
@@ -224,27 +288,40 @@ impl Scheduler {
|
||||
node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_shard(&self, hard_exclude: &[NodeId]) -> Result<NodeId, ScheduleError> {
|
||||
/// Hard Exclude: only consider nodes not in this list.
|
||||
/// Soft exclude: only use nodes in this list if no others are available.
|
||||
pub(crate) fn schedule_shard(
|
||||
&self,
|
||||
hard_exclude: &[NodeId],
|
||||
context: &ScheduleContext,
|
||||
) -> Result<NodeId, ScheduleError> {
|
||||
if self.nodes.is_empty() {
|
||||
return Err(ScheduleError::NoPageservers);
|
||||
}
|
||||
|
||||
let mut tenant_counts: Vec<(NodeId, usize)> = self
|
||||
let mut scores: Vec<(NodeId, AffinityScore, usize)> = self
|
||||
.nodes
|
||||
.iter()
|
||||
.filter_map(|(k, v)| {
|
||||
if hard_exclude.contains(k) || v.may_schedule == MaySchedule::No {
|
||||
None
|
||||
} else {
|
||||
Some((*k, v.shard_count))
|
||||
Some((
|
||||
*k,
|
||||
context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
|
||||
v.shard_count,
|
||||
))
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Sort by tenant count. Nodes with the same tenant count are sorted by ID.
|
||||
tenant_counts.sort_by_key(|i| (i.1, i.0));
|
||||
// Sort by, in order of precedence:
|
||||
// 1st: Affinity score. We should never pick a higher-score node if a lower-score node is available
|
||||
// 2nd: Utilization. Within nodes with the same affinity, use the least loaded nodes.
|
||||
// 3rd: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
|
||||
scores.sort_by_key(|i| (i.1, i.2, i.0));
|
||||
|
||||
if tenant_counts.is_empty() {
|
||||
if scores.is_empty() {
|
||||
// After applying constraints, no pageservers were left. We log some detail about
|
||||
// the state of nodes to help understand why this happened. This is not logged as an error because
|
||||
// it is legitimately possible for enough nodes to be Offline to prevent scheduling a shard.
|
||||
@@ -260,10 +337,11 @@ impl Scheduler {
|
||||
return Err(ScheduleError::ImpossibleConstraint);
|
||||
}
|
||||
|
||||
let node_id = tenant_counts.first().unwrap().0;
|
||||
// Lowest score wins
|
||||
let node_id = scores.first().unwrap().0;
|
||||
tracing::info!(
|
||||
"scheduler selected node {node_id} (elegible nodes {:?}, exclude: {hard_exclude:?})",
|
||||
tenant_counts.iter().map(|i| i.0 .0).collect::<Vec<_>>()
|
||||
"scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
|
||||
scores.iter().map(|i| i.0 .0).collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
// Note that we do not update shard count here to reflect the scheduling: that
|
||||
@@ -271,6 +349,12 @@ impl Scheduler {
|
||||
|
||||
Ok(node_id)
|
||||
}
|
||||
|
||||
/// Unit test access to internal state
|
||||
#[cfg(test)]
|
||||
pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
|
||||
self.nodes.get(&node_id).unwrap().shard_count
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -316,15 +400,17 @@ mod tests {
|
||||
let mut t1_intent = IntentState::new();
|
||||
let mut t2_intent = IntentState::new();
|
||||
|
||||
let scheduled = scheduler.schedule_shard(&[])?;
|
||||
let context = ScheduleContext::default();
|
||||
|
||||
let scheduled = scheduler.schedule_shard(&[], &context)?;
|
||||
t1_intent.set_attached(&mut scheduler, Some(scheduled));
|
||||
let scheduled = scheduler.schedule_shard(&[])?;
|
||||
let scheduled = scheduler.schedule_shard(&[], &context)?;
|
||||
t2_intent.set_attached(&mut scheduler, Some(scheduled));
|
||||
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
|
||||
|
||||
let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers())?;
|
||||
let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
|
||||
t1_intent.push_secondary(&mut scheduler, scheduled);
|
||||
|
||||
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
|
||||
|
||||
@@ -11,6 +11,7 @@ use crate::{
|
||||
id_lock_map::IdLockMap,
|
||||
persistence::{AbortShardSplitStatus, TenantFilter},
|
||||
reconciler::ReconcileError,
|
||||
scheduler::ScheduleContext,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use control_plane::storage_controller::{
|
||||
@@ -345,9 +346,15 @@ impl Service {
|
||||
}
|
||||
|
||||
// Populate each tenant's intent state
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for (tenant_shard_id, tenant_state) in tenants.iter_mut() {
|
||||
if tenant_shard_id.shard_number == ShardNumber(0) {
|
||||
// Reset scheduling context each time we advance to the next Tenant
|
||||
schedule_context = ScheduleContext::default();
|
||||
}
|
||||
|
||||
tenant_state.intent_from_observed(scheduler);
|
||||
if let Err(e) = tenant_state.schedule(scheduler) {
|
||||
if let Err(e) = tenant_state.schedule(scheduler, &mut schedule_context) {
|
||||
// Non-fatal error: we are unable to properly schedule the tenant, perhaps because
|
||||
// not enough pageservers are available. The tenant may well still be available
|
||||
// to clients.
|
||||
@@ -671,7 +678,13 @@ impl Service {
|
||||
let mut interval = tokio::time::interval(BACKGROUND_RECONCILE_PERIOD);
|
||||
while !self.cancel.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => { self.reconcile_all(); }
|
||||
_ = interval.tick() => {
|
||||
let reconciles_spawned = self.reconcile_all();
|
||||
if reconciles_spawned == 0 {
|
||||
// Run optimizer only when we didn't find any other work to do
|
||||
self.optimize_all();
|
||||
}
|
||||
}
|
||||
_ = self.cancel.cancelled() => return
|
||||
}
|
||||
}
|
||||
@@ -1627,6 +1640,8 @@ impl Service {
|
||||
Err(e) => return Err(ApiError::InternalServerError(anyhow::anyhow!(e))),
|
||||
};
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
let (waiters, response_shards) = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
@@ -1648,11 +1663,14 @@ impl Service {
|
||||
// attached and secondary locations (independently) away frorm those
|
||||
// pageservers also holding a shard for this tenant.
|
||||
|
||||
entry.get_mut().schedule(scheduler).map_err(|e| {
|
||||
ApiError::Conflict(format!(
|
||||
"Failed to schedule shard {tenant_shard_id}: {e}"
|
||||
))
|
||||
})?;
|
||||
entry
|
||||
.get_mut()
|
||||
.schedule(scheduler, &mut schedule_context)
|
||||
.map_err(|e| {
|
||||
ApiError::Conflict(format!(
|
||||
"Failed to schedule shard {tenant_shard_id}: {e}"
|
||||
))
|
||||
})?;
|
||||
|
||||
if let Some(node_id) = entry.get().intent.get_attached() {
|
||||
let generation = entry
|
||||
@@ -1680,7 +1698,7 @@ impl Service {
|
||||
|
||||
state.generation = initial_generation;
|
||||
state.config = create_req.config.clone();
|
||||
if let Err(e) = state.schedule(scheduler) {
|
||||
if let Err(e) = state.schedule(scheduler, &mut schedule_context) {
|
||||
schcedule_error = Some(e);
|
||||
}
|
||||
|
||||
@@ -1888,6 +1906,7 @@ impl Service {
|
||||
// Persist updates
|
||||
// Ordering: write to the database before applying changes in-memory, so that
|
||||
// we will not appear time-travel backwards on a restart.
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for ShardUpdate {
|
||||
tenant_shard_id,
|
||||
placement_policy,
|
||||
@@ -1935,7 +1954,7 @@ impl Service {
|
||||
shard.generation = Some(generation);
|
||||
}
|
||||
|
||||
shard.schedule(scheduler)?;
|
||||
shard.schedule(scheduler, &mut schedule_context)?;
|
||||
|
||||
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
|
||||
if let Some(waiter) = maybe_waiter {
|
||||
@@ -2095,7 +2114,7 @@ impl Service {
|
||||
let scheduler = &locked.scheduler;
|
||||
// Right now we only perform the operation on a single node without parallelization
|
||||
// TODO fan out the operation to multiple nodes for better performance
|
||||
let node_id = scheduler.schedule_shard(&[])?;
|
||||
let node_id = scheduler.schedule_shard(&[], &ScheduleContext::default())?;
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
@@ -2364,6 +2383,7 @@ impl Service {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
@@ -2382,7 +2402,7 @@ impl Service {
|
||||
}
|
||||
|
||||
// In case scheduling is being switched back on, try it now.
|
||||
shard.schedule(scheduler).ok();
|
||||
shard.schedule(scheduler, &mut schedule_context).ok();
|
||||
self.maybe_reconcile_shard(shard, nodes);
|
||||
}
|
||||
|
||||
@@ -2846,7 +2866,7 @@ impl Service {
|
||||
|
||||
tracing::info!("Restoring parent shard {tenant_shard_id}");
|
||||
shard.splitting = SplitState::Idle;
|
||||
if let Err(e) = shard.schedule(scheduler) {
|
||||
if let Err(e) = shard.schedule(scheduler, &mut ScheduleContext::default()) {
|
||||
// If this shard can't be scheduled now (perhaps due to offline nodes or
|
||||
// capacity issues), that must not prevent us rolling back a split. In this
|
||||
// case it should be eventually scheduled in the background.
|
||||
@@ -2970,6 +2990,7 @@ impl Service {
|
||||
)
|
||||
};
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for child in child_ids {
|
||||
let mut child_shard = parent_ident;
|
||||
child_shard.number = child.shard_number;
|
||||
@@ -3005,7 +3026,7 @@ impl Service {
|
||||
|
||||
child_locations.push((child, pageserver, child_shard.stripe_size));
|
||||
|
||||
if let Err(e) = child_state.schedule(scheduler) {
|
||||
if let Err(e) = child_state.schedule(scheduler, &mut schedule_context) {
|
||||
// This is not fatal, because we've implicitly already got an attached
|
||||
// location for the child shard. Failure here just means we couldn't
|
||||
// find a secondary (e.g. because cluster is overloaded).
|
||||
@@ -3388,6 +3409,14 @@ impl Service {
|
||||
.join(",")
|
||||
);
|
||||
|
||||
// Optimization: publish heatmaps immediately, so that secondary locations can start warming up.
|
||||
for child in child_ids {
|
||||
if let Err(e) = client.tenant_heatmap_upload(*child).await {
|
||||
// Non-fatal, this is just an optimization
|
||||
tracing::warn!("Failed to upload child {child} heatmap: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
if &response.new_shards != child_ids {
|
||||
// This should never happen: the pageserver should agree with us on how shard splits work.
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
@@ -3869,6 +3898,7 @@ impl Service {
|
||||
AvailabilityTransition::ToOffline => {
|
||||
tracing::info!("Node {} transition to offline", node_id);
|
||||
let mut tenants_affected: usize = 0;
|
||||
|
||||
for (tenant_shard_id, tenant_state) in tenants {
|
||||
if let Some(observed_loc) = tenant_state.observed.locations.get_mut(&node_id) {
|
||||
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
|
||||
@@ -3885,7 +3915,13 @@ impl Service {
|
||||
|
||||
if tenant_state.intent.demote_attached(node_id) {
|
||||
tenant_state.sequence = tenant_state.sequence.next();
|
||||
match tenant_state.schedule(scheduler) {
|
||||
|
||||
// TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
|
||||
// for tenants without secondary locations: if they have a secondary location, then this
|
||||
// schedule() call is just promoting an existing secondary)
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
match tenant_state.schedule(scheduler, &mut schedule_context) {
|
||||
Err(e) => {
|
||||
// It is possible that some tenants will become unschedulable when too many pageservers
|
||||
// go offline: in this case there isn't much we can do other than make the issue observable.
|
||||
@@ -3947,8 +3983,9 @@ impl Service {
|
||||
let mut waiters = Vec::new();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
|
||||
shard.schedule(scheduler)?;
|
||||
shard.schedule(scheduler, &mut schedule_context)?;
|
||||
|
||||
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
|
||||
waiters.push(waiter);
|
||||
@@ -4012,8 +4049,131 @@ impl Service {
|
||||
let (nodes, tenants, _scheduler) = locked.parts_mut();
|
||||
let pageservers = nodes.clone();
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
let mut reconciles_spawned = 0;
|
||||
for (_tenant_shard_id, shard) in tenants.iter_mut() {
|
||||
for (tenant_shard_id, shard) in tenants.iter_mut() {
|
||||
if tenant_shard_id.is_zero() {
|
||||
schedule_context = ScheduleContext::default();
|
||||
}
|
||||
|
||||
// Eventual consistency: if an earlier reconcile job failed, and the shard is still
|
||||
// dirty, spawn another rone
|
||||
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
|
||||
reconciles_spawned += 1;
|
||||
}
|
||||
|
||||
schedule_context.avoid(&shard.intent.all_pageservers());
|
||||
}
|
||||
|
||||
reconciles_spawned
|
||||
}
|
||||
|
||||
/// `optimize` in this context means identifying shards which have valid scheduled locations, but
|
||||
/// could be scheduled somewhere better:
|
||||
/// - Cutting over to a secondary if the node with the secondary is more lightly loaded
|
||||
/// * e.g. after a node fails then recovers, to move some work back to it
|
||||
/// - Cutting over to a secondary if it improves the spread of shard attachments within a tenant
|
||||
/// * e.g. after a shard split, the initial attached locations will all be on the node where
|
||||
/// we did the split, but are probably better placed elsewhere.
|
||||
/// - Creating new secondary locations if it improves the spreading of a sharded tenant
|
||||
/// * e.g. after a shard split, some locations will be on the same node (where the split
|
||||
/// happened), and will probably be better placed elsewhere.
|
||||
///
|
||||
/// To put it more briefly: whereas the scheduler respects soft constraints in a ScheduleContext at
|
||||
/// the time of scheduling, this function looks for cases where a better-scoring location is available
|
||||
/// according to those same soft constraints.
|
||||
fn optimize_all(&self) -> usize {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
let pageservers = nodes.clone();
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
let mut reconciles_spawned = 0;
|
||||
|
||||
let mut tenant_shards: Vec<&TenantState> = Vec::new();
|
||||
|
||||
// Limit on how many shards' optmizations each call to this function will execute. Combined
|
||||
// with the frequency of background calls, this acts as an implicit rate limit that runs a small
|
||||
// trickle of optimizations in the background, rather than executing a large number in parallel
|
||||
// when a change occurs.
|
||||
const MAX_OPTIMIZATIONS_PER_PASS: usize = 2;
|
||||
|
||||
let mut work = Vec::new();
|
||||
|
||||
for (tenant_shard_id, shard) in tenants.iter() {
|
||||
if tenant_shard_id.is_zero() {
|
||||
// Reset accumulators on the first shard in a tenant
|
||||
schedule_context = ScheduleContext::default();
|
||||
tenant_shards.clear();
|
||||
}
|
||||
|
||||
if work.len() >= MAX_OPTIMIZATIONS_PER_PASS {
|
||||
break;
|
||||
}
|
||||
|
||||
// Accumulate the schedule context for all the shards in a tenant: we must have
|
||||
// the total view of all shards before we can try to optimize any of them.
|
||||
schedule_context.avoid(&shard.intent.all_pageservers());
|
||||
if let Some(attached) = shard.intent.get_attached() {
|
||||
schedule_context.push_attached(*attached);
|
||||
}
|
||||
tenant_shards.push(shard);
|
||||
|
||||
// Once we have seen the last shard in the tenant, proceed to search across all shards
|
||||
// in the tenant for optimizations
|
||||
if shard.shard.number.0 == shard.shard.count.count() - 1 {
|
||||
if tenant_shards.iter().any(|s| s.reconciler.is_some()) {
|
||||
// Do not start any optimizations while another change to the tenant is ongoing: this
|
||||
// is not necessary for correctness, but simplifies operations and implicitly throttles
|
||||
// optimization changes to happen in a "trickle" over time.
|
||||
continue;
|
||||
}
|
||||
|
||||
if tenant_shards
|
||||
.iter()
|
||||
.any(|s| !matches!(s.splitting, SplitState::Idle))
|
||||
{
|
||||
// Never attempt to optimize a tenant that is currently being split
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: optimization calculations are relatively expensive: create some fast-path for
|
||||
// the common idle case (avoiding the search on tenants that we have recently checked)
|
||||
|
||||
for shard in &tenant_shards {
|
||||
if let Some(optimization) =
|
||||
// If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
|
||||
// its primary location based on soft constraints, cut it over.
|
||||
shard.optimize_attachment(nodes, &schedule_context)
|
||||
{
|
||||
work.push((shard.tenant_shard_id, optimization));
|
||||
break;
|
||||
} else if let Some(optimization) =
|
||||
// If idle, maybe optimize secondary locations: if a shard has a secondary location that would be
|
||||
// better placed on another node, based on ScheduleContext, then adjust it. This
|
||||
// covers cases like after a shard split, where we might have too many shards
|
||||
// in the same tenant with secondary locations on the node where they originally split.
|
||||
shard.optimize_secondary(scheduler, &schedule_context)
|
||||
{
|
||||
work.push((shard.tenant_shard_id, optimization));
|
||||
break;
|
||||
}
|
||||
|
||||
// TODO: extend this mechanism to prefer attaching on nodes with fewer attached
|
||||
// tenants (i.e. extend schedule state to distinguish attached from secondary counts),
|
||||
// for the total number of attachments on a node (not just within a tenant.)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (tenant_shard_id, optimization) in work {
|
||||
let shard = tenants
|
||||
.get_mut(&tenant_shard_id)
|
||||
.expect("We held lock from place we got this ID");
|
||||
shard.apply_optimization(scheduler, optimization);
|
||||
|
||||
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
|
||||
reconciles_spawned += 1;
|
||||
}
|
||||
@@ -4026,7 +4186,11 @@ impl Service {
|
||||
/// also wait for any generated Reconcilers to complete. Calling this until it returns zero should
|
||||
/// put the system into a quiescent state where future background reconciliations won't do anything.
|
||||
pub(crate) async fn reconcile_all_now(&self) -> Result<usize, ReconcileWaitError> {
|
||||
self.reconcile_all();
|
||||
let reconciles_spawned = self.reconcile_all();
|
||||
if reconciles_spawned == 0 {
|
||||
// Only optimize when we are otherwise idle
|
||||
self.optimize_all();
|
||||
}
|
||||
|
||||
let waiters = {
|
||||
let mut waiters = Vec::new();
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::{
|
||||
use crate::{
|
||||
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
|
||||
persistence::TenantShardPersistence,
|
||||
scheduler::{AffinityScore, MaySchedule, ScheduleContext},
|
||||
};
|
||||
use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
|
||||
use pageserver_api::{
|
||||
@@ -250,8 +251,13 @@ impl IntentState {
|
||||
|
||||
impl Drop for IntentState {
|
||||
fn drop(&mut self) {
|
||||
// Must clear before dropping, to avoid leaving stale refcounts in the Scheduler
|
||||
debug_assert!(self.attached.is_none() && self.secondary.is_empty());
|
||||
// Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
|
||||
// We do not check this while panicking, to avoid polluting unit test failures or
|
||||
// other assertions with this assertion's output. It's still wrong to leak these,
|
||||
// but if we already have a panic then we don't need to independently flag this case.
|
||||
if !(std::thread::panicking()) {
|
||||
debug_assert!(self.attached.is_none() && self.secondary.is_empty());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,6 +302,26 @@ pub enum ReconcileWaitError {
|
||||
Failed(TenantShardId, String),
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
pub(crate) struct ReplaceSecondary {
|
||||
old_node_id: NodeId,
|
||||
new_node_id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
pub(crate) struct MigrateAttachment {
|
||||
old_attached_node_id: NodeId,
|
||||
new_attached_node_id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug)]
|
||||
pub(crate) enum ScheduleOptimization {
|
||||
// Replace one of our secondary locations with a different node
|
||||
ReplaceSecondary(ReplaceSecondary),
|
||||
// Migrate attachment to an existing secondary location
|
||||
MigrateAttachment(MigrateAttachment),
|
||||
}
|
||||
|
||||
impl ReconcilerWaiter {
|
||||
pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
|
||||
tokio::select! {
|
||||
@@ -430,6 +456,7 @@ impl TenantState {
|
||||
fn schedule_attached(
|
||||
&mut self,
|
||||
scheduler: &mut Scheduler,
|
||||
context: &ScheduleContext,
|
||||
) -> Result<(bool, NodeId), ScheduleError> {
|
||||
// No work to do if we already have an attached tenant
|
||||
if let Some(node_id) = self.intent.attached {
|
||||
@@ -443,14 +470,33 @@ impl TenantState {
|
||||
Ok((true, promote_secondary))
|
||||
} else {
|
||||
// Pick a fresh node: either we had no secondaries or none were schedulable
|
||||
let node_id = scheduler.schedule_shard(&self.intent.secondary)?;
|
||||
let node_id = scheduler.schedule_shard(&self.intent.secondary, context)?;
|
||||
tracing::debug!("Selected {} as attached", node_id);
|
||||
self.intent.set_attached(scheduler, Some(node_id));
|
||||
Ok((true, node_id))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn schedule(&mut self, scheduler: &mut Scheduler) -> Result<(), ScheduleError> {
|
||||
pub(crate) fn schedule(
|
||||
&mut self,
|
||||
scheduler: &mut Scheduler,
|
||||
context: &mut ScheduleContext,
|
||||
) -> Result<(), ScheduleError> {
|
||||
let r = self.do_schedule(scheduler, context);
|
||||
|
||||
context.avoid(&self.intent.all_pageservers());
|
||||
if let Some(attached) = self.intent.get_attached() {
|
||||
context.push_attached(*attached);
|
||||
}
|
||||
|
||||
r
|
||||
}
|
||||
|
||||
pub(crate) fn do_schedule(
|
||||
&mut self,
|
||||
scheduler: &mut Scheduler,
|
||||
context: &ScheduleContext,
|
||||
) -> Result<(), ScheduleError> {
|
||||
// TODO: before scheduling new nodes, check if any existing content in
|
||||
// self.intent refers to pageservers that are offline, and pick other
|
||||
// pageservers if so.
|
||||
@@ -494,12 +540,13 @@ impl TenantState {
|
||||
}
|
||||
|
||||
// Should have exactly one attached, and N secondaries
|
||||
let (modified_attached, attached_node_id) = self.schedule_attached(scheduler)?;
|
||||
let (modified_attached, attached_node_id) =
|
||||
self.schedule_attached(scheduler, context)?;
|
||||
modified |= modified_attached;
|
||||
|
||||
let mut used_pageservers = vec![attached_node_id];
|
||||
while self.intent.secondary.len() < secondary_count {
|
||||
let node_id = scheduler.schedule_shard(&used_pageservers)?;
|
||||
let node_id = scheduler.schedule_shard(&used_pageservers, context)?;
|
||||
self.intent.push_secondary(scheduler, node_id);
|
||||
used_pageservers.push(node_id);
|
||||
modified = true;
|
||||
@@ -512,7 +559,7 @@ impl TenantState {
|
||||
modified = true;
|
||||
} else if self.intent.secondary.is_empty() {
|
||||
// Populate secondary by scheduling a fresh node
|
||||
let node_id = scheduler.schedule_shard(&[])?;
|
||||
let node_id = scheduler.schedule_shard(&[], context)?;
|
||||
self.intent.push_secondary(scheduler, node_id);
|
||||
modified = true;
|
||||
}
|
||||
@@ -539,6 +586,162 @@ impl TenantState {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Optimize attachments: if a shard has a secondary location that is preferable to
|
||||
/// its primary location based on soft constraints, switch that secondary location
|
||||
/// to be attached.
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
pub(crate) fn optimize_attachment(
|
||||
&self,
|
||||
nodes: &HashMap<NodeId, Node>,
|
||||
schedule_context: &ScheduleContext,
|
||||
) -> Option<ScheduleOptimization> {
|
||||
let attached = (*self.intent.get_attached())?;
|
||||
if self.intent.secondary.is_empty() {
|
||||
// We can only do useful work if we have both attached and secondary locations: this
|
||||
// function doesn't schedule new locations, only swaps between attached and secondaries.
|
||||
return None;
|
||||
}
|
||||
|
||||
let current_affinity_score = schedule_context.get_node_affinity(attached);
|
||||
let current_attachment_count = schedule_context.get_node_attachments(attached);
|
||||
|
||||
// Generate score for each node, dropping any un-schedulable nodes.
|
||||
let all_pageservers = self.intent.all_pageservers();
|
||||
let mut scores = all_pageservers
|
||||
.iter()
|
||||
.flat_map(|node_id| {
|
||||
if matches!(
|
||||
nodes
|
||||
.get(node_id)
|
||||
.map(|n| n.may_schedule())
|
||||
.unwrap_or(MaySchedule::No),
|
||||
MaySchedule::No
|
||||
) {
|
||||
None
|
||||
} else {
|
||||
let affinity_score = schedule_context.get_node_affinity(*node_id);
|
||||
let attachment_count = schedule_context.get_node_attachments(*node_id);
|
||||
Some((*node_id, affinity_score, attachment_count))
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Sort precedence:
|
||||
// 1st - prefer nodes with the lowest total affinity score
|
||||
// 2nd - prefer nodes with the lowest number of attachments in this context
|
||||
// 3rd - if all else is equal, sort by node ID for determinism in tests.
|
||||
scores.sort_by_key(|i| (i.1, i.2, i.0));
|
||||
|
||||
if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
|
||||
scores.first()
|
||||
{
|
||||
if attached != *preferred_node {
|
||||
// The best alternative must be more than 1 better than us, otherwise we could end
|
||||
// up flapping back next time we're called (e.g. there's no point migrating from
|
||||
// a location with score 1 to a score zero, because on next location the situation
|
||||
// would be the same, but in reverse).
|
||||
if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
|
||||
|| current_attachment_count > *preferred_attachment_count + 1
|
||||
{
|
||||
tracing::info!(
|
||||
"Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
|
||||
self.intent.get_secondary()
|
||||
);
|
||||
return Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id: attached,
|
||||
new_attached_node_id: *preferred_node,
|
||||
}));
|
||||
}
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"Node {} is already preferred (score {:?})",
|
||||
preferred_node,
|
||||
preferred_affinity_score
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Fall-through: we didn't find an optimization
|
||||
None
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
pub(crate) fn optimize_secondary(
|
||||
&self,
|
||||
scheduler: &Scheduler,
|
||||
schedule_context: &ScheduleContext,
|
||||
) -> Option<ScheduleOptimization> {
|
||||
if self.intent.secondary.is_empty() {
|
||||
// We can only do useful work if we have both attached and secondary locations: this
|
||||
// function doesn't schedule new locations, only swaps between attached and secondaries.
|
||||
return None;
|
||||
}
|
||||
|
||||
for secondary in self.intent.get_secondary() {
|
||||
let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
|
||||
// We're already on a node unaffected any affinity constraints,
|
||||
// so we won't change it.
|
||||
continue;
|
||||
};
|
||||
|
||||
// Let the scheduler suggest a node, where it would put us if we were scheduling afresh
|
||||
// This implicitly limits the choice to nodes that are available, and prefers nodes
|
||||
// with lower utilization.
|
||||
let Ok(candidate_node) =
|
||||
scheduler.schedule_shard(&self.intent.all_pageservers(), schedule_context)
|
||||
else {
|
||||
// A scheduling error means we have no possible candidate replacements
|
||||
continue;
|
||||
};
|
||||
|
||||
let candidate_affinity_score = schedule_context
|
||||
.nodes
|
||||
.get(&candidate_node)
|
||||
.unwrap_or(&AffinityScore::FREE);
|
||||
|
||||
// The best alternative must be more than 1 better than us, otherwise we could end
|
||||
// up flapping back next time we're called.
|
||||
if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
|
||||
// If some other node is available and has a lower score than this node, then
|
||||
// that other node is a good place to migrate to.
|
||||
tracing::info!(
|
||||
"Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
|
||||
self.intent.get_secondary()
|
||||
);
|
||||
return Some(ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
|
||||
old_node_id: *secondary,
|
||||
new_node_id: candidate_node,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) fn apply_optimization(
|
||||
&mut self,
|
||||
scheduler: &mut Scheduler,
|
||||
optimization: ScheduleOptimization,
|
||||
) {
|
||||
match optimization {
|
||||
ScheduleOptimization::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id,
|
||||
new_attached_node_id,
|
||||
}) => {
|
||||
self.intent.demote_attached(old_attached_node_id);
|
||||
self.intent
|
||||
.promote_attached(scheduler, new_attached_node_id);
|
||||
}
|
||||
ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
|
||||
old_node_id,
|
||||
new_node_id,
|
||||
}) => {
|
||||
self.intent.remove_secondary(scheduler, old_node_id);
|
||||
self.intent.push_secondary(scheduler, new_node_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Query whether the tenant's observed state for attached node matches its intent state, and if so,
|
||||
/// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
|
||||
/// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
|
||||
@@ -953,6 +1156,32 @@ pub(crate) mod tests {
|
||||
)
|
||||
}
|
||||
|
||||
fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantState> {
|
||||
let tenant_id = TenantId::generate();
|
||||
|
||||
(0..shard_count.count())
|
||||
.map(|i| {
|
||||
let shard_number = ShardNumber(i);
|
||||
|
||||
let tenant_shard_id = TenantShardId {
|
||||
tenant_id,
|
||||
shard_number,
|
||||
shard_count,
|
||||
};
|
||||
TenantState::new(
|
||||
tenant_shard_id,
|
||||
ShardIdentity::new(
|
||||
shard_number,
|
||||
shard_count,
|
||||
pageserver_api::shard::ShardStripeSize(32768),
|
||||
)
|
||||
.unwrap(),
|
||||
policy.clone(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Test the scheduling behaviors used when a tenant configured for HA is subject
|
||||
/// to nodes being marked offline.
|
||||
#[test]
|
||||
@@ -962,10 +1191,11 @@ pub(crate) mod tests {
|
||||
let mut nodes = make_test_nodes(3);
|
||||
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
let mut context = ScheduleContext::default();
|
||||
|
||||
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
tenant_state
|
||||
.schedule(&mut scheduler)
|
||||
.schedule(&mut scheduler, &mut context)
|
||||
.expect("we have enough nodes, scheduling should work");
|
||||
|
||||
// Expect to initially be schedule on to different nodes
|
||||
@@ -991,7 +1221,7 @@ pub(crate) mod tests {
|
||||
|
||||
// Scheduling the node should promote the still-available secondary node to attached
|
||||
tenant_state
|
||||
.schedule(&mut scheduler)
|
||||
.schedule(&mut scheduler, &mut context)
|
||||
.expect("active nodes are available");
|
||||
assert_eq!(tenant_state.intent.attached.unwrap(), secondary_node_id);
|
||||
|
||||
@@ -1065,15 +1295,209 @@ pub(crate) mod tests {
|
||||
|
||||
// In pause mode, schedule() shouldn't do anything
|
||||
tenant_state.scheduling_policy = ShardSchedulingPolicy::Pause;
|
||||
assert!(tenant_state.schedule(&mut scheduler).is_ok());
|
||||
assert!(tenant_state
|
||||
.schedule(&mut scheduler, &mut ScheduleContext::default())
|
||||
.is_ok());
|
||||
assert!(tenant_state.intent.all_pageservers().is_empty());
|
||||
|
||||
// In active mode, schedule() works
|
||||
tenant_state.scheduling_policy = ShardSchedulingPolicy::Active;
|
||||
assert!(tenant_state.schedule(&mut scheduler).is_ok());
|
||||
assert!(tenant_state
|
||||
.schedule(&mut scheduler, &mut ScheduleContext::default())
|
||||
.is_ok());
|
||||
assert!(!tenant_state.intent.all_pageservers().is_empty());
|
||||
|
||||
tenant_state.intent.clear(&mut scheduler);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn optimize_attachment() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(3);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
|
||||
// Initially: both nodes attached on shard 1, and both have secondary locations
|
||||
// on different nodes.
|
||||
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
|
||||
shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
|
||||
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
|
||||
shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
schedule_context.avoid(&shard_a.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
|
||||
schedule_context.avoid(&shard_b.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
|
||||
|
||||
let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
|
||||
|
||||
// Either shard should recognize that it has the option to switch to a secondary location where there
|
||||
// would be no other shards from the same tenant, and request to do so.
|
||||
assert_eq!(
|
||||
optimization_a,
|
||||
Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id: NodeId(1),
|
||||
new_attached_node_id: NodeId(2)
|
||||
}))
|
||||
);
|
||||
|
||||
// Note that these optimizing two shards in the same tenant with the same ScheduleContext is
|
||||
// mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
|
||||
// of [`Service::optimize_all`] to avoid trying
|
||||
// to do optimizations for multiple shards in the same tenant at the same time. Generating
|
||||
// both optimizations is just done for test purposes
|
||||
let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
|
||||
assert_eq!(
|
||||
optimization_b,
|
||||
Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
|
||||
old_attached_node_id: NodeId(1),
|
||||
new_attached_node_id: NodeId(3)
|
||||
}))
|
||||
);
|
||||
|
||||
// Applying these optimizations should result in the end state proposed
|
||||
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
|
||||
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
|
||||
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
|
||||
shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
|
||||
assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
|
||||
assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
|
||||
|
||||
shard_a.intent.clear(&mut scheduler);
|
||||
shard_b.intent.clear(&mut scheduler);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn optimize_secondary() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(4);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
|
||||
// Initially: both nodes attached on shard 1, and both have secondary locations
|
||||
// on different nodes.
|
||||
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
|
||||
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
|
||||
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
|
||||
shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
schedule_context.avoid(&shard_a.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
|
||||
schedule_context.avoid(&shard_b.intent.all_pageservers());
|
||||
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
|
||||
|
||||
let optimization_a = shard_a.optimize_secondary(&scheduler, &schedule_context);
|
||||
|
||||
// Since there is a node with no locations available, the node with two locations for the
|
||||
// same tenant should generate an optimization to move one away
|
||||
assert_eq!(
|
||||
optimization_a,
|
||||
Some(ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
|
||||
old_node_id: NodeId(3),
|
||||
new_node_id: NodeId(4)
|
||||
}))
|
||||
);
|
||||
|
||||
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
|
||||
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
|
||||
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
|
||||
|
||||
shard_a.intent.clear(&mut scheduler);
|
||||
shard_b.intent.clear(&mut scheduler);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Optimize til quiescent: this emulates what Service::optimize_all does, when
|
||||
// called repeatedly in the background.
|
||||
fn optimize_til_idle(
|
||||
nodes: &HashMap<NodeId, Node>,
|
||||
scheduler: &mut Scheduler,
|
||||
shards: &mut [TenantState],
|
||||
) {
|
||||
let mut loop_n = 0;
|
||||
loop {
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
let mut any_changed = false;
|
||||
|
||||
for shard in shards.iter() {
|
||||
schedule_context.avoid(&shard.intent.all_pageservers());
|
||||
if let Some(attached) = shard.intent.get_attached() {
|
||||
schedule_context.push_attached(*attached);
|
||||
}
|
||||
}
|
||||
|
||||
for shard in shards.iter_mut() {
|
||||
let optimization = shard.optimize_attachment(nodes, &schedule_context);
|
||||
if let Some(optimization) = optimization {
|
||||
shard.apply_optimization(scheduler, optimization);
|
||||
any_changed = true;
|
||||
break;
|
||||
}
|
||||
|
||||
let optimization = shard.optimize_secondary(scheduler, &schedule_context);
|
||||
if let Some(optimization) = optimization {
|
||||
shard.apply_optimization(scheduler, optimization);
|
||||
any_changed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if !any_changed {
|
||||
break;
|
||||
}
|
||||
|
||||
// Assert no infinite loop
|
||||
loop_n += 1;
|
||||
assert!(loop_n < 1000);
|
||||
}
|
||||
}
|
||||
|
||||
/// Test the balancing behavior of shard scheduling: that it achieves a balance, and
|
||||
/// that it converges.
|
||||
#[test]
|
||||
fn optimize_add_nodes() -> anyhow::Result<()> {
|
||||
let nodes = make_test_nodes(4);
|
||||
|
||||
// Only show the scheduler a couple of nodes
|
||||
let mut scheduler = Scheduler::new([].iter());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
|
||||
|
||||
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for shard in &mut shards {
|
||||
assert!(shard
|
||||
.schedule(&mut scheduler, &mut schedule_context)
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
// We should see equal number of locations on the two nodes.
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
|
||||
|
||||
// Add another two nodes: we should see the shards spread out when their optimize
|
||||
// methods are called
|
||||
scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
|
||||
scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
|
||||
optimize_til_idle(&nodes, &mut scheduler, &mut shards);
|
||||
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
|
||||
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
|
||||
|
||||
for shard in shards.iter_mut() {
|
||||
shard.intent.clear(&mut scheduler);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -271,6 +271,17 @@ impl Client {
|
||||
Ok((status, progress))
|
||||
}
|
||||
|
||||
pub async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
|
||||
let path = reqwest::Url::parse(&format!(
|
||||
"{}/v1/tenant/{}/heatmap_upload",
|
||||
self.mgmt_api_endpoint, tenant_id
|
||||
))
|
||||
.expect("Cannot build URL");
|
||||
|
||||
self.request(Method::POST, path, ()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn location_config(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
79
test_runner/performance/test_sharding_scale.py
Normal file
79
test_runner/performance/test_sharding_scale.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import concurrent.futures
|
||||
import threading
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
)
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
|
||||
def test_sharding_split_big_tenant(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
"""
|
||||
Check that splitting works as expected for a tenant with a reasonable amount of data, larger
|
||||
than we use in a typical test.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 4
|
||||
env = neon_env_builder.init_configs()
|
||||
neon_env_builder.start()
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id, timeline_id, shard_count=1, placement_policy='{"Attached":1}'
|
||||
)
|
||||
|
||||
# TODO: a large scale/size
|
||||
expect_size = 100e6
|
||||
scale = 500
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
|
||||
with env.endpoints.create_start(
|
||||
"main",
|
||||
tenant_id=tenant_id,
|
||||
) as ep:
|
||||
options = "-cstatement_timeout=0 " + ep.default_options.get("options", "")
|
||||
connstr = ep.connstr(password=None, options=options)
|
||||
password = ep.default_options.get("password", None)
|
||||
environ = {}
|
||||
if password is not None:
|
||||
environ["PGPASSWORD"] = password
|
||||
args = ["pgbench", f"-s{scale}", "-i", "-I", "dtGvp", connstr]
|
||||
|
||||
# Write a lot of data into the tenant
|
||||
pg_bin.run(args, env=environ)
|
||||
|
||||
# Confirm that we have created a physical size as large as expected
|
||||
timeline_info = env.storage_controller.pageserver_api().timeline_detail(
|
||||
tenant_id, timeline_id
|
||||
)
|
||||
log.info(f"Timeline after init: {timeline_info}")
|
||||
assert timeline_info["current_physical_size"] > expect_size
|
||||
|
||||
background_job_duration = 30
|
||||
background_stop = threading.Event()
|
||||
|
||||
def background_load():
|
||||
while not background_stop.is_set():
|
||||
args = [
|
||||
"pgbench",
|
||||
"-N",
|
||||
"-c4",
|
||||
f"-T{background_job_duration}",
|
||||
"-P2",
|
||||
"--progress-timestamp",
|
||||
connstr,
|
||||
]
|
||||
pg_bin.run(args, env=environ)
|
||||
|
||||
bg_fut = executor.submit(background_load)
|
||||
|
||||
# Do a split while the endpoint is alive
|
||||
env.storage_controller.tenant_shard_split(tenant_id, shard_count=4)
|
||||
|
||||
# Pump the scheduler to do all the changes it would do in the background
|
||||
# after a shard split.
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=300)
|
||||
|
||||
background_stop.set()
|
||||
bg_fut.result(timeout=background_job_duration * 2)
|
||||
109
test_runner/performance/test_storage_controller_scale.py
Normal file
109
test_runner/performance/test_storage_controller_scale.py
Normal file
@@ -0,0 +1,109 @@
|
||||
import concurrent.futures
|
||||
import random
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.types import TenantId, TenantShardId, TimelineId
|
||||
|
||||
|
||||
def test_sharding_service_many_tenants(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Check that we cope well with a not-totally-trivial number of tenants.
|
||||
|
||||
This is checking for:
|
||||
- Obvious concurrency bugs from issuing many tenant creations/modifications
|
||||
concurrently.
|
||||
- Obvious scaling bugs like O(N^2) scaling that would be so slow that even
|
||||
a basic test starts failing from slowness.
|
||||
|
||||
This is _not_ a comprehensive scale test: just a basic sanity check that
|
||||
we don't fall over for a thousand shards.
|
||||
"""
|
||||
|
||||
neon_env_builder.num_pageservers = 5
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Total tenants
|
||||
tenant_count = 2000
|
||||
|
||||
# Shards per tenant
|
||||
shard_count = 2
|
||||
stripe_size = 1024
|
||||
|
||||
tenants = set(TenantId.generate() for _i in range(0, tenant_count))
|
||||
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
|
||||
# We use a fixed seed to make the test reproducible: we want a randomly
|
||||
# chosen order, but not to change the order every time we run the test.
|
||||
rng = random.Random(1234)
|
||||
|
||||
# We will create tenants directly via API, not via neon_local, to avoid any false
|
||||
# serialization of operations in neon_local (it e.g. loads/saves a config file on each call)
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
futs = []
|
||||
for tenant_id in tenants:
|
||||
f = executor.submit(
|
||||
env.storage_controller.tenant_create, tenant_id, shard_count, stripe_size
|
||||
)
|
||||
futs.append(f)
|
||||
|
||||
# Wait for creations to finish
|
||||
for f in futs:
|
||||
f.result()
|
||||
|
||||
# Generate a mixture of operations and dispatch them all concurrently
|
||||
futs = []
|
||||
for tenant_id in tenants:
|
||||
op = rng.choice([0, 1, 2])
|
||||
if op == 0:
|
||||
# A fan-out write operation to all shards in a tenant (timeline creation)
|
||||
f = executor.submit(
|
||||
virtual_ps_http.timeline_create,
|
||||
PgVersion.NOT_SET,
|
||||
tenant_id,
|
||||
TimelineId.generate(),
|
||||
)
|
||||
elif op == 1:
|
||||
# A reconciler operation: migrate a shard.
|
||||
shard_number = rng.randint(0, shard_count - 1)
|
||||
tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count)
|
||||
dest_ps_id = rng.choice([ps.id for ps in env.pageservers])
|
||||
f = executor.submit(
|
||||
env.storage_controller.tenant_shard_migrate, tenant_shard_id, dest_ps_id
|
||||
)
|
||||
elif op == 2:
|
||||
# A passthrough read to shard zero
|
||||
f = executor.submit(virtual_ps_http.tenant_status, tenant_id)
|
||||
|
||||
futs.append(f)
|
||||
|
||||
# Wait for mixed ops to finish
|
||||
for f in futs:
|
||||
f.result()
|
||||
|
||||
# Rolling node failures: this is a small number of requests, but results in a large
|
||||
# number of scheduler calls and reconcile tasks.
|
||||
for pageserver in env.pageservers:
|
||||
env.storage_controller.node_configure(pageserver.id, {"availability": "Offline"})
|
||||
# The sleeps are just to make sure we aren't optimizing-away any re-scheduling operations
|
||||
# from a brief flap in node state.
|
||||
time.sleep(1)
|
||||
env.storage_controller.node_configure(pageserver.id, {"availability": "Active"})
|
||||
time.sleep(1)
|
||||
|
||||
# Restart the storage controller
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
# Restart pageservers: this exercises the /re-attach API
|
||||
for pageserver in env.pageservers:
|
||||
pageserver.stop()
|
||||
pageserver.start()
|
||||
@@ -364,3 +364,67 @@ def test_slots_and_branching(neon_simple_env: NeonEnv):
|
||||
# Check that we can create slot with the same name
|
||||
ws_cur = ws_branch.connect().cursor()
|
||||
ws_cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')")
|
||||
|
||||
|
||||
def test_replication_shutdown(neon_simple_env: NeonEnv):
|
||||
# Ensure Postgres can exit without stuck when a replication job is active + neon extension installed
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_replication_shutdown_publisher", "empty")
|
||||
pub = env.endpoints.create("test_replication_shutdown_publisher")
|
||||
|
||||
env.neon_cli.create_branch("test_replication_shutdown_subscriber")
|
||||
sub = env.endpoints.create("test_replication_shutdown_subscriber")
|
||||
|
||||
pub.respec(skip_pg_catalog_updates=False)
|
||||
pub.start()
|
||||
|
||||
sub.respec(skip_pg_catalog_updates=False)
|
||||
sub.start()
|
||||
|
||||
pub.wait_for_migrations()
|
||||
sub.wait_for_migrations()
|
||||
|
||||
with pub.cursor() as cur:
|
||||
cur.execute(
|
||||
"CREATE ROLE mr_whiskers WITH PASSWORD 'cat' LOGIN INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser"
|
||||
)
|
||||
cur.execute("CREATE DATABASE neondb WITH OWNER mr_whiskers")
|
||||
cur.execute("GRANT ALL PRIVILEGES ON DATABASE neondb TO neon_superuser")
|
||||
|
||||
# If we don't do this, creating the subscription will fail later on PG16
|
||||
pub.edit_hba(["host all mr_whiskers 0.0.0.0/0 md5"])
|
||||
|
||||
with sub.cursor() as cur:
|
||||
cur.execute(
|
||||
"CREATE ROLE mr_whiskers WITH PASSWORD 'cat' LOGIN INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser"
|
||||
)
|
||||
cur.execute("CREATE DATABASE neondb WITH OWNER mr_whiskers")
|
||||
cur.execute("GRANT ALL PRIVILEGES ON DATABASE neondb TO neon_superuser")
|
||||
|
||||
with pub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as cur:
|
||||
cur.execute("CREATE PUBLICATION pub FOR ALL TABLES")
|
||||
cur.execute("CREATE TABLE t (a int)")
|
||||
cur.execute("INSERT INTO t VALUES (10), (20)")
|
||||
cur.execute("SELECT * from t")
|
||||
res = cur.fetchall()
|
||||
assert [r[0] for r in res] == [10, 20]
|
||||
|
||||
with sub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as cur:
|
||||
cur.execute("CREATE TABLE t (a int)")
|
||||
|
||||
pub_conn = f"host=localhost port={pub.pg_port} dbname=neondb user=mr_whiskers password=cat"
|
||||
query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_conn}' PUBLICATION pub"
|
||||
log.info(f"Creating subscription: {query}")
|
||||
cur.execute(query)
|
||||
|
||||
with pub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as pcur:
|
||||
pcur.execute("INSERT INTO t VALUES (30), (40)")
|
||||
|
||||
def check_that_changes_propagated():
|
||||
cur.execute("SELECT * FROM t")
|
||||
res = cur.fetchall()
|
||||
log.info(res)
|
||||
assert len(res) == 4
|
||||
assert [r[0] for r in res] == [10, 20, 30, 40]
|
||||
|
||||
wait_until(10, 0.5, check_that_changes_propagated)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import time
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.log_helper import log
|
||||
@@ -43,6 +44,12 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
with closing(endpoint_main.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT extversion from pg_extension where extname='neon'")
|
||||
# IMPORTANT:
|
||||
# If the version has changed, the test should be updated.
|
||||
# Ensure that the default version is also updated in the neon.control file
|
||||
assert cur.fetchone() == ("1.3",)
|
||||
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
|
||||
all_versions = ["1.3", "1.2", "1.1", "1.0"]
|
||||
current_version = "1.3"
|
||||
for idx, begin_version in enumerate(all_versions):
|
||||
@@ -60,3 +67,30 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
|
||||
cur.execute(
|
||||
f"ALTER EXTENSION neon UPDATE TO '{begin_version}'; -- {target_version}->{begin_version}"
|
||||
)
|
||||
|
||||
|
||||
# Verify that the neon extension can be auto-upgraded to the latest version.
|
||||
def test_neon_extension_auto_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_neon_extension_auto_upgrade")
|
||||
|
||||
endpoint_main = env.endpoints.create("test_neon_extension_auto_upgrade")
|
||||
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
|
||||
endpoint_main.respec(skip_pg_catalog_updates=False)
|
||||
endpoint_main.start()
|
||||
|
||||
with closing(endpoint_main.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("ALTER EXTENSION neon UPDATE TO '1.0';")
|
||||
cur.execute("SELECT extversion from pg_extension where extname='neon'")
|
||||
assert cur.fetchone() == ("1.0",) # Ensure the extension gets downgraded
|
||||
|
||||
endpoint_main.stop()
|
||||
time.sleep(1)
|
||||
endpoint_main.start()
|
||||
time.sleep(1)
|
||||
|
||||
with closing(endpoint_main.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT extversion from pg_extension where extname='neon'")
|
||||
assert cur.fetchone() != ("1.0",) # Ensure the extension gets upgraded
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from collections import defaultdict
|
||||
@@ -146,7 +147,7 @@ def test_sharding_split_smoke(
|
||||
# 8 shards onto separate pageservers
|
||||
shard_count = 4
|
||||
split_shard_count = 8
|
||||
neon_env_builder.num_pageservers = split_shard_count
|
||||
neon_env_builder.num_pageservers = split_shard_count * 2
|
||||
|
||||
# 1MiB stripes: enable getting some meaningful data distribution without
|
||||
# writing large quantities of data in this test. The stripe size is given
|
||||
@@ -174,6 +175,7 @@ def test_sharding_split_smoke(
|
||||
placement_policy='{"Attached": 1}',
|
||||
conf=non_default_tenant_config,
|
||||
)
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
|
||||
workload.init()
|
||||
|
||||
@@ -265,26 +267,13 @@ def test_sharding_split_smoke(
|
||||
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
|
||||
workload.validate()
|
||||
|
||||
migrate_to_pageserver_ids = list(
|
||||
set(p.id for p in env.pageservers) - set(pre_split_pageserver_ids)
|
||||
)
|
||||
assert len(migrate_to_pageserver_ids) == split_shard_count - shard_count
|
||||
# Enough background reconciliations should result in the shards being properly distributed
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# Migrate shards away from the node where the split happened
|
||||
for ps_id in pre_split_pageserver_ids:
|
||||
shards_here = [
|
||||
tenant_shard_id
|
||||
for (tenant_shard_id, pageserver) in all_shards
|
||||
if pageserver.id == ps_id
|
||||
]
|
||||
assert len(shards_here) == 2
|
||||
migrate_shard = shards_here[0]
|
||||
destination = migrate_to_pageserver_ids.pop()
|
||||
|
||||
log.info(f"Migrating shard {migrate_shard} from {ps_id} to {destination}")
|
||||
env.storage_controller.tenant_shard_migrate(migrate_shard, destination)
|
||||
|
||||
workload.validate()
|
||||
# We have 8 shards and 16 nodes
|
||||
# Initially I expect 4 nodes to have 2 attached locations each, and another 8 nodes to have
|
||||
# 1 secondary location each
|
||||
# 2 2 2 2 1 1 1 1 1 1 1 1 0 0 0 0
|
||||
|
||||
# Assert on how many reconciles happened during the process. This is something of an
|
||||
# implementation detail, but it is useful to detect any bugs that might generate spurious
|
||||
@@ -294,8 +283,9 @@ def test_sharding_split_smoke(
|
||||
# - shard_count reconciles for the original setup of the tenant
|
||||
# - shard_count reconciles for detaching the original secondary locations during split
|
||||
# - split_shard_count reconciles during shard splitting, for setting up secondaries.
|
||||
# - shard_count reconciles for the migrations we did to move child shards away from their split location
|
||||
expect_reconciles = shard_count * 2 + split_shard_count + shard_count
|
||||
# - shard_count of the child shards will need to fail over to their secondaries
|
||||
# - shard_count of the child shard secondary locations will get moved to emptier nodes
|
||||
expect_reconciles = shard_count * 2 + split_shard_count + shard_count * 2
|
||||
reconcile_ok = env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
||||
)
|
||||
@@ -1050,3 +1040,82 @@ def test_sharding_backpressure(neon_env_builder: NeonEnvBuilder):
|
||||
max_lsn = max(Lsn(info["last_record_lsn"]) for info in infos)
|
||||
diff = max_lsn - min_lsn
|
||||
assert diff < 2 * 1024 * 1024, f"LSN diff={diff}, expected diff < 2MB due to backpressure"
|
||||
# Stripe sizes in number of pages.
|
||||
TINY_STRIPES = 16
|
||||
LARGE_STRIPES = 32768
|
||||
|
||||
|
||||
@pytest.mark.parametrize("stripe_size", [TINY_STRIPES, LARGE_STRIPES])
|
||||
def test_sharding_compaction(neon_env_builder: NeonEnvBuilder, stripe_size: int):
|
||||
"""
|
||||
Use small stripes, small layers, and small compaction thresholds to exercise how compaction
|
||||
and image layer generation interacts with sharding.
|
||||
"""
|
||||
|
||||
TENANT_CONF = {
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": f"{128 * 1024}",
|
||||
"compaction_threshold": "1",
|
||||
"compaction_target_size": f"{128 * 1024}",
|
||||
# no PITR horizon, we specify the horizon when we request on-demand GC
|
||||
"pitr_interval": "0s",
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# create image layers eagerly: we want to exercise image layer creation in this test.
|
||||
"image_creation_threshold": "1",
|
||||
"image_layer_creation_check_threshold": 0,
|
||||
}
|
||||
|
||||
neon_env_builder.num_pageservers = 4
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf=TENANT_CONF,
|
||||
initial_tenant_shard_count=4,
|
||||
initial_tenant_shard_stripe_size=stripe_size,
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(64)
|
||||
for _i in range(0, 10):
|
||||
# Each of these does some writes then a checkpoint: because we set image_creation_threshold to 1,
|
||||
# these should result in image layers each time we write some data into a shard, and also shards
|
||||
# recieving less data hitting their "empty image layer" path (wherre they should skip writing the layer,
|
||||
# rather than asserting)
|
||||
workload.churn_rows(64)
|
||||
|
||||
# Assert that we got some image layers: this is important because this test's purpose is to exercise the sharding changes
|
||||
# to Timeline::create_image_layers, so if we weren't creating any image layers we wouldn't be doing our job.
|
||||
shard_has_image_layers = []
|
||||
for shard in env.storage_controller.locate(tenant_id):
|
||||
pageserver = env.get_pageserver(shard["node_id"])
|
||||
shard_id = shard["shard_id"]
|
||||
layer_map = pageserver.http_client().layer_map_info(shard_id, timeline_id)
|
||||
image_layer_sizes = {}
|
||||
for layer in layer_map.historic_layers:
|
||||
if layer.kind == "Image":
|
||||
image_layer_sizes[layer.layer_file_name] = layer.layer_file_size
|
||||
|
||||
# Pageserver should assert rather than emit an empty layer file, but double check here
|
||||
assert layer.layer_file_size is not None
|
||||
assert layer.layer_file_size > 0
|
||||
|
||||
shard_has_image_layers.append(len(image_layer_sizes) > 1)
|
||||
log.info(f"Shard {shard_id} layer sizes: {json.dumps(image_layer_sizes, indent=2)}")
|
||||
|
||||
# TODO: once keyspace partitioning is updated, assert that layer sizes are as expected
|
||||
# (see https://github.com/neondatabase/neon/issues/6774)
|
||||
|
||||
if stripe_size == TINY_STRIPES:
|
||||
# Expect writes were scattered across all pageservers: they should all have compacted some image layers
|
||||
assert all(shard_has_image_layers)
|
||||
else:
|
||||
# With large stripes, it is expected that most of our writes went to one pageserver, so we just require
|
||||
# that at least one of them has some image layers.
|
||||
assert any(shard_has_image_layers)
|
||||
|
||||
# Assert that everything is still readable
|
||||
workload.validate()
|
||||
|
||||
188
test_runner/regress/test_storage_controller_stress.py
Normal file
188
test_runner/regress/test_storage_controller_stress.py
Normal file
@@ -0,0 +1,188 @@
|
||||
import concurrent.futures
|
||||
import random
|
||||
from collections import defaultdict
|
||||
|
||||
from fixtures.compute_reconfigure import ComputeReconfigure
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
|
||||
def get_node_shard_counts(env: NeonEnv, tenant_ids):
|
||||
total: defaultdict[int, int] = defaultdict(int)
|
||||
attached: defaultdict[int, int] = defaultdict(int)
|
||||
for tid in tenant_ids:
|
||||
for shard in env.storage_controller.tenant_describe(tid)["shards"]:
|
||||
log.info(
|
||||
f"{shard['tenant_shard_id']}: attached={shard['node_attached']}, secondary={shard['node_secondary']} "
|
||||
)
|
||||
for node in shard["node_secondary"]:
|
||||
total[int(node)] += 1
|
||||
attached[int(shard["node_attached"])] += 1
|
||||
total[int(shard["node_attached"])] += 1
|
||||
|
||||
return total, attached
|
||||
|
||||
|
||||
def test_storcon_rolling_failures(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
compute_reconfigure_listener: ComputeReconfigure,
|
||||
):
|
||||
neon_env_builder.num_pageservers = 8
|
||||
|
||||
neon_env_builder.control_plane_compute_hook_api = (
|
||||
compute_reconfigure_listener.control_plane_compute_hook_api
|
||||
)
|
||||
|
||||
workloads: dict[TenantId, Workload] = {}
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
for ps in env.pageservers:
|
||||
# We will do unclean detaches
|
||||
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
|
||||
|
||||
n_tenants = 32
|
||||
tenants = [(env.initial_tenant, env.initial_timeline)]
|
||||
for i in range(0, n_tenants - 1):
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
shard_count = [1, 2, 4][i % 3]
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id, timeline_id, shard_count=shard_count, placement_policy='{"Double":1}'
|
||||
)
|
||||
tenants.append((tenant_id, timeline_id))
|
||||
|
||||
# Background pain:
|
||||
# - TODO: some fraction of pageserver API requests hang
|
||||
# (this requires implementing wrap of location_conf calls with proper timeline/cancel)
|
||||
# - TODO: continuous tenant/timeline creation/destruction over a different ID range than
|
||||
# the ones we're using for availability checks.
|
||||
|
||||
rng = random.Random(0xDEADBEEF)
|
||||
|
||||
for tenant_id, timeline_id in tenants:
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
compute_reconfigure_listener.register_workload(workload)
|
||||
workloads[tenant_id] = workload
|
||||
|
||||
def node_evacuated(node_id: int):
|
||||
total, attached = get_node_shard_counts(env, [t[0] for t in tenants])
|
||||
assert attached[node_id] == 0
|
||||
|
||||
def attachments_active():
|
||||
for tid, _tlid in tenants:
|
||||
for shard in env.storage_controller.locate(tid):
|
||||
psid = shard["node_id"]
|
||||
tsid = TenantShardId.parse(shard["shard_id"])
|
||||
status = env.get_pageserver(psid).http_client().tenant_status(tenant_id=tsid)
|
||||
assert status["state"]["slug"] == "Active"
|
||||
log.info(f"Shard {tsid} active on node {psid}")
|
||||
|
||||
failpoints = ("api-503", "5%1000*return(1)")
|
||||
failpoints_str = f"{failpoints[0]}={failpoints[1]}"
|
||||
for ps in env.pageservers:
|
||||
ps.http_client().configure_failpoints(failpoints)
|
||||
|
||||
def for_all_workloads(callback, timeout=60):
|
||||
futs = []
|
||||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
||||
for _tenant_id, workload in workloads.items():
|
||||
futs.append(pool.submit(callback, workload))
|
||||
|
||||
for f in futs:
|
||||
f.result(timeout=timeout)
|
||||
|
||||
def clean_fail_restore():
|
||||
"""
|
||||
Clean shutdown of a node: mark it offline in storage controller, wait for new attachment
|
||||
locations to activate, then SIGTERM it.
|
||||
- Endpoints should not fail any queries
|
||||
- New attach locations should activate within bounded time.
|
||||
"""
|
||||
victim = rng.choice(env.pageservers)
|
||||
env.storage_controller.node_configure(victim.id, {"availability": "Offline"})
|
||||
|
||||
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
victim.stop(immediate=False)
|
||||
|
||||
traffic()
|
||||
|
||||
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
|
||||
|
||||
# Revert shards to attach at their original locations
|
||||
# TODO
|
||||
# env.storage_controller.balance_attached()
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
def hard_fail_restore():
|
||||
"""
|
||||
Simulate an unexpected death of a pageserver node
|
||||
"""
|
||||
victim = rng.choice(env.pageservers)
|
||||
victim.stop(immediate=True)
|
||||
# TODO: once we implement heartbeats detecting node failures, remove this
|
||||
# explicit marking offline and rely on storage controller to detect it itself.
|
||||
env.storage_controller.node_configure(victim.id, {"availability": "Offline"})
|
||||
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
|
||||
wait_until(10, 1, attachments_active)
|
||||
traffic()
|
||||
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
|
||||
# TODO
|
||||
# env.storage_controller.balance_attached()
|
||||
wait_until(10, 1, attachments_active)
|
||||
|
||||
def traffic():
|
||||
"""
|
||||
Check that all tenants are working for postgres clients
|
||||
"""
|
||||
|
||||
def exercise_one(workload):
|
||||
workload.churn_rows(100)
|
||||
workload.validate()
|
||||
|
||||
for_all_workloads(exercise_one)
|
||||
|
||||
def init_one(workload):
|
||||
workload.init()
|
||||
workload.write_rows(100)
|
||||
|
||||
for_all_workloads(init_one, timeout=60)
|
||||
|
||||
for i in range(0, 20):
|
||||
mode = rng.choice([0, 1, 2])
|
||||
log.info(f"Iteration {i}, mode {mode}")
|
||||
if mode == 0:
|
||||
# Traffic interval: sometimes, instead of a failure, just let the clients
|
||||
# write a load of data. This avoids chaos tests ending up with unrealistically
|
||||
# small quantities of data in flight.
|
||||
traffic()
|
||||
elif mode == 1:
|
||||
clean_fail_restore()
|
||||
elif mode == 2:
|
||||
hard_fail_restore()
|
||||
|
||||
# Fail and restart: hard-kill one node. Notify the storage controller that it is offline.
|
||||
# Success criteria:
|
||||
# - New attach locations should activate within bounded time
|
||||
# - TODO: once we do heartbeating, we should not have to explicitly mark the node offline
|
||||
|
||||
# TODO: fail and remove: fail a node, and remove it from the cluster.
|
||||
# Success criteria:
|
||||
# - Endpoints should not fail any queries
|
||||
# - New attach locations should activate within bounded time
|
||||
# - New secondary locations should fill up with data within bounded time
|
||||
|
||||
# TODO: somehow need to wait for reconciles to complete before doing consistency check
|
||||
# (or make the check wait).
|
||||
|
||||
# Do consistency check on every iteration, not just at the end: this makes it more obvious
|
||||
# which change caused an issue.
|
||||
env.storage_controller.consistency_check()
|
||||
Reference in New Issue
Block a user