Compare commits

...

8 Commits

Author SHA1 Message Date
John Spray
a12072b789 tests: add test_sharding_compaction 2024-04-04 10:45:34 +01:00
John Spray
e791083be0 tests: add shard splitting scale test 2024-03-28 17:57:29 +00:00
John Spray
f7a4642a64 tests: controller scale 2024-03-28 17:57:11 +00:00
John Spray
1cdfa198ef tests: controller stress 2024-03-28 17:56:46 +00:00
John Spray
6d2a752dac storage controller: upload heatmaps at end of shard split 2024-03-28 14:44:45 +00:00
John Spray
8d06a28350 tests: update sharding split test for background optimization and secondaries 2024-03-28 14:44:45 +00:00
John Spray
7413a6dd7c Background optimization of shard locations 2024-03-28 14:44:45 +00:00
John Spray
41d440f1da anti-affinity scheduling for shards 2024-03-28 14:42:55 +00:00
11 changed files with 1299 additions and 62 deletions

View File

@@ -101,6 +101,15 @@ impl PageserverClient {
)
}
pub(crate) async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
measured_request!(
"tenant_heatmap_upload",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.tenant_heatmap_upload(tenant_id).await
)
}
pub(crate) async fn location_config(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -58,6 +58,70 @@ pub(crate) struct Scheduler {
nodes: HashMap<NodeId, SchedulerNode>,
}
/// Score for soft constraint scheduling: lower scores are preferred to higher scores.
///
/// For example, we may set an affinity score based on the number of shards from the same
/// tenant already on a node, to implicitly prefer to balance out shards.
#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub(crate) struct AffinityScore(pub(crate) usize);
impl AffinityScore {
/// If we have no anti-affinity at all toward a node, this is its score. It means
/// the scheduler has a free choice amongst nodes with this score, and may pick a node
/// based on other information such as total utilization.
pub(crate) const FREE: Self = Self(0);
pub(crate) fn inc(&mut self) {
self.0 += 1;
}
}
impl std::ops::Add for AffinityScore {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
Self(self.0 + rhs.0)
}
}
// For carrying state between multiple calls to [`TenantState::schedule`], e.g. when calling
// it for many shards in the same tenant.
#[derive(Debug, Default)]
pub(crate) struct ScheduleContext {
/// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
pub(crate) nodes: HashMap<NodeId, AffinityScore>,
/// Specifically how many _attached_ locations are on each node
pub(crate) attached_nodes: HashMap<NodeId, usize>,
}
impl ScheduleContext {
/// Input is a list of nodes we would like to avoid using again within this context. The more
/// times a node is passed into this call, the less inclined we are to use it.
pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
for node_id in nodes {
let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
entry.inc()
}
}
pub(crate) fn push_attached(&mut self, node_id: NodeId) {
let entry = self.attached_nodes.entry(node_id).or_default();
*entry += 1;
}
pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
self.nodes
.get(&node_id)
.copied()
.unwrap_or(AffinityScore::FREE)
}
pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
self.attached_nodes.get(&node_id).copied().unwrap_or(0)
}
}
impl Scheduler {
pub(crate) fn new<'a>(nodes: impl Iterator<Item = &'a Node>) -> Self {
let mut scheduler_nodes = HashMap::new();
@@ -224,27 +288,40 @@ impl Scheduler {
node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
}
pub(crate) fn schedule_shard(&self, hard_exclude: &[NodeId]) -> Result<NodeId, ScheduleError> {
/// Hard Exclude: only consider nodes not in this list.
/// Soft exclude: only use nodes in this list if no others are available.
pub(crate) fn schedule_shard(
&self,
hard_exclude: &[NodeId],
context: &ScheduleContext,
) -> Result<NodeId, ScheduleError> {
if self.nodes.is_empty() {
return Err(ScheduleError::NoPageservers);
}
let mut tenant_counts: Vec<(NodeId, usize)> = self
let mut scores: Vec<(NodeId, AffinityScore, usize)> = self
.nodes
.iter()
.filter_map(|(k, v)| {
if hard_exclude.contains(k) || v.may_schedule == MaySchedule::No {
None
} else {
Some((*k, v.shard_count))
Some((
*k,
context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
v.shard_count,
))
}
})
.collect();
// Sort by tenant count. Nodes with the same tenant count are sorted by ID.
tenant_counts.sort_by_key(|i| (i.1, i.0));
// Sort by, in order of precedence:
// 1st: Affinity score. We should never pick a higher-score node if a lower-score node is available
// 2nd: Utilization. Within nodes with the same affinity, use the least loaded nodes.
// 3rd: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
scores.sort_by_key(|i| (i.1, i.2, i.0));
if tenant_counts.is_empty() {
if scores.is_empty() {
// After applying constraints, no pageservers were left. We log some detail about
// the state of nodes to help understand why this happened. This is not logged as an error because
// it is legitimately possible for enough nodes to be Offline to prevent scheduling a shard.
@@ -260,10 +337,11 @@ impl Scheduler {
return Err(ScheduleError::ImpossibleConstraint);
}
let node_id = tenant_counts.first().unwrap().0;
// Lowest score wins
let node_id = scores.first().unwrap().0;
tracing::info!(
"scheduler selected node {node_id} (elegible nodes {:?}, exclude: {hard_exclude:?})",
tenant_counts.iter().map(|i| i.0 .0).collect::<Vec<_>>()
"scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
scores.iter().map(|i| i.0 .0).collect::<Vec<_>>()
);
// Note that we do not update shard count here to reflect the scheduling: that
@@ -271,6 +349,12 @@ impl Scheduler {
Ok(node_id)
}
/// Unit test access to internal state
#[cfg(test)]
pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
self.nodes.get(&node_id).unwrap().shard_count
}
}
#[cfg(test)]
@@ -316,15 +400,17 @@ mod tests {
let mut t1_intent = IntentState::new();
let mut t2_intent = IntentState::new();
let scheduled = scheduler.schedule_shard(&[])?;
let context = ScheduleContext::default();
let scheduled = scheduler.schedule_shard(&[], &context)?;
t1_intent.set_attached(&mut scheduler, Some(scheduled));
let scheduled = scheduler.schedule_shard(&[])?;
let scheduled = scheduler.schedule_shard(&[], &context)?;
t2_intent.set_attached(&mut scheduler, Some(scheduled));
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers())?;
let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
t1_intent.push_secondary(&mut scheduler, scheduled);
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);

View File

@@ -11,6 +11,7 @@ use crate::{
id_lock_map::IdLockMap,
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::ReconcileError,
scheduler::ScheduleContext,
};
use anyhow::Context;
use control_plane::storage_controller::{
@@ -345,9 +346,15 @@ impl Service {
}
// Populate each tenant's intent state
let mut schedule_context = ScheduleContext::default();
for (tenant_shard_id, tenant_state) in tenants.iter_mut() {
if tenant_shard_id.shard_number == ShardNumber(0) {
// Reset scheduling context each time we advance to the next Tenant
schedule_context = ScheduleContext::default();
}
tenant_state.intent_from_observed(scheduler);
if let Err(e) = tenant_state.schedule(scheduler) {
if let Err(e) = tenant_state.schedule(scheduler, &mut schedule_context) {
// Non-fatal error: we are unable to properly schedule the tenant, perhaps because
// not enough pageservers are available. The tenant may well still be available
// to clients.
@@ -671,7 +678,13 @@ impl Service {
let mut interval = tokio::time::interval(BACKGROUND_RECONCILE_PERIOD);
while !self.cancel.is_cancelled() {
tokio::select! {
_ = interval.tick() => { self.reconcile_all(); }
_ = interval.tick() => {
let reconciles_spawned = self.reconcile_all();
if reconciles_spawned == 0 {
// Run optimizer only when we didn't find any other work to do
self.optimize_all();
}
}
_ = self.cancel.cancelled() => return
}
}
@@ -1627,6 +1640,8 @@ impl Service {
Err(e) => return Err(ApiError::InternalServerError(anyhow::anyhow!(e))),
};
let mut schedule_context = ScheduleContext::default();
let (waiters, response_shards) = {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
@@ -1648,11 +1663,14 @@ impl Service {
// attached and secondary locations (independently) away frorm those
// pageservers also holding a shard for this tenant.
entry.get_mut().schedule(scheduler).map_err(|e| {
ApiError::Conflict(format!(
"Failed to schedule shard {tenant_shard_id}: {e}"
))
})?;
entry
.get_mut()
.schedule(scheduler, &mut schedule_context)
.map_err(|e| {
ApiError::Conflict(format!(
"Failed to schedule shard {tenant_shard_id}: {e}"
))
})?;
if let Some(node_id) = entry.get().intent.get_attached() {
let generation = entry
@@ -1680,7 +1698,7 @@ impl Service {
state.generation = initial_generation;
state.config = create_req.config.clone();
if let Err(e) = state.schedule(scheduler) {
if let Err(e) = state.schedule(scheduler, &mut schedule_context) {
schcedule_error = Some(e);
}
@@ -1888,6 +1906,7 @@ impl Service {
// Persist updates
// Ordering: write to the database before applying changes in-memory, so that
// we will not appear time-travel backwards on a restart.
let mut schedule_context = ScheduleContext::default();
for ShardUpdate {
tenant_shard_id,
placement_policy,
@@ -1935,7 +1954,7 @@ impl Service {
shard.generation = Some(generation);
}
shard.schedule(scheduler)?;
shard.schedule(scheduler, &mut schedule_context)?;
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
if let Some(waiter) = maybe_waiter {
@@ -2095,7 +2114,7 @@ impl Service {
let scheduler = &locked.scheduler;
// Right now we only perform the operation on a single node without parallelization
// TODO fan out the operation to multiple nodes for better performance
let node_id = scheduler.schedule_shard(&[])?;
let node_id = scheduler.schedule_shard(&[], &ScheduleContext::default())?;
let node = locked
.nodes
.get(&node_id)
@@ -2364,6 +2383,7 @@ impl Service {
)
.await?;
let mut schedule_context = ScheduleContext::default();
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
@@ -2382,7 +2402,7 @@ impl Service {
}
// In case scheduling is being switched back on, try it now.
shard.schedule(scheduler).ok();
shard.schedule(scheduler, &mut schedule_context).ok();
self.maybe_reconcile_shard(shard, nodes);
}
@@ -2846,7 +2866,7 @@ impl Service {
tracing::info!("Restoring parent shard {tenant_shard_id}");
shard.splitting = SplitState::Idle;
if let Err(e) = shard.schedule(scheduler) {
if let Err(e) = shard.schedule(scheduler, &mut ScheduleContext::default()) {
// If this shard can't be scheduled now (perhaps due to offline nodes or
// capacity issues), that must not prevent us rolling back a split. In this
// case it should be eventually scheduled in the background.
@@ -2970,6 +2990,7 @@ impl Service {
)
};
let mut schedule_context = ScheduleContext::default();
for child in child_ids {
let mut child_shard = parent_ident;
child_shard.number = child.shard_number;
@@ -3005,7 +3026,7 @@ impl Service {
child_locations.push((child, pageserver, child_shard.stripe_size));
if let Err(e) = child_state.schedule(scheduler) {
if let Err(e) = child_state.schedule(scheduler, &mut schedule_context) {
// This is not fatal, because we've implicitly already got an attached
// location for the child shard. Failure here just means we couldn't
// find a secondary (e.g. because cluster is overloaded).
@@ -3388,6 +3409,14 @@ impl Service {
.join(",")
);
// Optimization: publish heatmaps immediately, so that secondary locations can start warming up.
for child in child_ids {
if let Err(e) = client.tenant_heatmap_upload(*child).await {
// Non-fatal, this is just an optimization
tracing::warn!("Failed to upload child {child} heatmap: {e}");
}
}
if &response.new_shards != child_ids {
// This should never happen: the pageserver should agree with us on how shard splits work.
return Err(ApiError::InternalServerError(anyhow::anyhow!(
@@ -3869,6 +3898,7 @@ impl Service {
AvailabilityTransition::ToOffline => {
tracing::info!("Node {} transition to offline", node_id);
let mut tenants_affected: usize = 0;
for (tenant_shard_id, tenant_state) in tenants {
if let Some(observed_loc) = tenant_state.observed.locations.get_mut(&node_id) {
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
@@ -3885,7 +3915,13 @@ impl Service {
if tenant_state.intent.demote_attached(node_id) {
tenant_state.sequence = tenant_state.sequence.next();
match tenant_state.schedule(scheduler) {
// TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
// for tenants without secondary locations: if they have a secondary location, then this
// schedule() call is just promoting an existing secondary)
let mut schedule_context = ScheduleContext::default();
match tenant_state.schedule(scheduler, &mut schedule_context) {
Err(e) => {
// It is possible that some tenants will become unschedulable when too many pageservers
// go offline: in this case there isn't much we can do other than make the issue observable.
@@ -3947,8 +3983,9 @@ impl Service {
let mut waiters = Vec::new();
let (nodes, tenants, scheduler) = locked.parts_mut();
let mut schedule_context = ScheduleContext::default();
for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
shard.schedule(scheduler)?;
shard.schedule(scheduler, &mut schedule_context)?;
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
waiters.push(waiter);
@@ -4012,8 +4049,131 @@ impl Service {
let (nodes, tenants, _scheduler) = locked.parts_mut();
let pageservers = nodes.clone();
let mut schedule_context = ScheduleContext::default();
let mut reconciles_spawned = 0;
for (_tenant_shard_id, shard) in tenants.iter_mut() {
for (tenant_shard_id, shard) in tenants.iter_mut() {
if tenant_shard_id.is_zero() {
schedule_context = ScheduleContext::default();
}
// Eventual consistency: if an earlier reconcile job failed, and the shard is still
// dirty, spawn another rone
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
reconciles_spawned += 1;
}
schedule_context.avoid(&shard.intent.all_pageservers());
}
reconciles_spawned
}
/// `optimize` in this context means identifying shards which have valid scheduled locations, but
/// could be scheduled somewhere better:
/// - Cutting over to a secondary if the node with the secondary is more lightly loaded
/// * e.g. after a node fails then recovers, to move some work back to it
/// - Cutting over to a secondary if it improves the spread of shard attachments within a tenant
/// * e.g. after a shard split, the initial attached locations will all be on the node where
/// we did the split, but are probably better placed elsewhere.
/// - Creating new secondary locations if it improves the spreading of a sharded tenant
/// * e.g. after a shard split, some locations will be on the same node (where the split
/// happened), and will probably be better placed elsewhere.
///
/// To put it more briefly: whereas the scheduler respects soft constraints in a ScheduleContext at
/// the time of scheduling, this function looks for cases where a better-scoring location is available
/// according to those same soft constraints.
fn optimize_all(&self) -> usize {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let pageservers = nodes.clone();
let mut schedule_context = ScheduleContext::default();
let mut reconciles_spawned = 0;
let mut tenant_shards: Vec<&TenantState> = Vec::new();
// Limit on how many shards' optmizations each call to this function will execute. Combined
// with the frequency of background calls, this acts as an implicit rate limit that runs a small
// trickle of optimizations in the background, rather than executing a large number in parallel
// when a change occurs.
const MAX_OPTIMIZATIONS_PER_PASS: usize = 2;
let mut work = Vec::new();
for (tenant_shard_id, shard) in tenants.iter() {
if tenant_shard_id.is_zero() {
// Reset accumulators on the first shard in a tenant
schedule_context = ScheduleContext::default();
tenant_shards.clear();
}
if work.len() >= MAX_OPTIMIZATIONS_PER_PASS {
break;
}
// Accumulate the schedule context for all the shards in a tenant: we must have
// the total view of all shards before we can try to optimize any of them.
schedule_context.avoid(&shard.intent.all_pageservers());
if let Some(attached) = shard.intent.get_attached() {
schedule_context.push_attached(*attached);
}
tenant_shards.push(shard);
// Once we have seen the last shard in the tenant, proceed to search across all shards
// in the tenant for optimizations
if shard.shard.number.0 == shard.shard.count.count() - 1 {
if tenant_shards.iter().any(|s| s.reconciler.is_some()) {
// Do not start any optimizations while another change to the tenant is ongoing: this
// is not necessary for correctness, but simplifies operations and implicitly throttles
// optimization changes to happen in a "trickle" over time.
continue;
}
if tenant_shards
.iter()
.any(|s| !matches!(s.splitting, SplitState::Idle))
{
// Never attempt to optimize a tenant that is currently being split
continue;
}
// TODO: optimization calculations are relatively expensive: create some fast-path for
// the common idle case (avoiding the search on tenants that we have recently checked)
for shard in &tenant_shards {
if let Some(optimization) =
// If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
// its primary location based on soft constraints, cut it over.
shard.optimize_attachment(nodes, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
} else if let Some(optimization) =
// If idle, maybe optimize secondary locations: if a shard has a secondary location that would be
// better placed on another node, based on ScheduleContext, then adjust it. This
// covers cases like after a shard split, where we might have too many shards
// in the same tenant with secondary locations on the node where they originally split.
shard.optimize_secondary(scheduler, &schedule_context)
{
work.push((shard.tenant_shard_id, optimization));
break;
}
// TODO: extend this mechanism to prefer attaching on nodes with fewer attached
// tenants (i.e. extend schedule state to distinguish attached from secondary counts),
// for the total number of attachments on a node (not just within a tenant.)
}
}
}
for (tenant_shard_id, optimization) in work {
let shard = tenants
.get_mut(&tenant_shard_id)
.expect("We held lock from place we got this ID");
shard.apply_optimization(scheduler, optimization);
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
reconciles_spawned += 1;
}
@@ -4026,7 +4186,11 @@ impl Service {
/// also wait for any generated Reconcilers to complete. Calling this until it returns zero should
/// put the system into a quiescent state where future background reconciliations won't do anything.
pub(crate) async fn reconcile_all_now(&self) -> Result<usize, ReconcileWaitError> {
self.reconcile_all();
let reconciles_spawned = self.reconcile_all();
if reconciles_spawned == 0 {
// Only optimize when we are otherwise idle
self.optimize_all();
}
let waiters = {
let mut waiters = Vec::new();

View File

@@ -7,6 +7,7 @@ use std::{
use crate::{
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
persistence::TenantShardPersistence,
scheduler::{AffinityScore, MaySchedule, ScheduleContext},
};
use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
use pageserver_api::{
@@ -250,8 +251,13 @@ impl IntentState {
impl Drop for IntentState {
fn drop(&mut self) {
// Must clear before dropping, to avoid leaving stale refcounts in the Scheduler
debug_assert!(self.attached.is_none() && self.secondary.is_empty());
// Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
// We do not check this while panicking, to avoid polluting unit test failures or
// other assertions with this assertion's output. It's still wrong to leak these,
// but if we already have a panic then we don't need to independently flag this case.
if !(std::thread::panicking()) {
debug_assert!(self.attached.is_none() && self.secondary.is_empty());
}
}
}
@@ -296,6 +302,26 @@ pub enum ReconcileWaitError {
Failed(TenantShardId, String),
}
#[derive(Eq, PartialEq, Debug)]
pub(crate) struct ReplaceSecondary {
old_node_id: NodeId,
new_node_id: NodeId,
}
#[derive(Eq, PartialEq, Debug)]
pub(crate) struct MigrateAttachment {
old_attached_node_id: NodeId,
new_attached_node_id: NodeId,
}
#[derive(Eq, PartialEq, Debug)]
pub(crate) enum ScheduleOptimization {
// Replace one of our secondary locations with a different node
ReplaceSecondary(ReplaceSecondary),
// Migrate attachment to an existing secondary location
MigrateAttachment(MigrateAttachment),
}
impl ReconcilerWaiter {
pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
tokio::select! {
@@ -430,6 +456,7 @@ impl TenantState {
fn schedule_attached(
&mut self,
scheduler: &mut Scheduler,
context: &ScheduleContext,
) -> Result<(bool, NodeId), ScheduleError> {
// No work to do if we already have an attached tenant
if let Some(node_id) = self.intent.attached {
@@ -443,14 +470,33 @@ impl TenantState {
Ok((true, promote_secondary))
} else {
// Pick a fresh node: either we had no secondaries or none were schedulable
let node_id = scheduler.schedule_shard(&self.intent.secondary)?;
let node_id = scheduler.schedule_shard(&self.intent.secondary, context)?;
tracing::debug!("Selected {} as attached", node_id);
self.intent.set_attached(scheduler, Some(node_id));
Ok((true, node_id))
}
}
pub(crate) fn schedule(&mut self, scheduler: &mut Scheduler) -> Result<(), ScheduleError> {
pub(crate) fn schedule(
&mut self,
scheduler: &mut Scheduler,
context: &mut ScheduleContext,
) -> Result<(), ScheduleError> {
let r = self.do_schedule(scheduler, context);
context.avoid(&self.intent.all_pageservers());
if let Some(attached) = self.intent.get_attached() {
context.push_attached(*attached);
}
r
}
pub(crate) fn do_schedule(
&mut self,
scheduler: &mut Scheduler,
context: &ScheduleContext,
) -> Result<(), ScheduleError> {
// TODO: before scheduling new nodes, check if any existing content in
// self.intent refers to pageservers that are offline, and pick other
// pageservers if so.
@@ -494,12 +540,13 @@ impl TenantState {
}
// Should have exactly one attached, and N secondaries
let (modified_attached, attached_node_id) = self.schedule_attached(scheduler)?;
let (modified_attached, attached_node_id) =
self.schedule_attached(scheduler, context)?;
modified |= modified_attached;
let mut used_pageservers = vec![attached_node_id];
while self.intent.secondary.len() < secondary_count {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
let node_id = scheduler.schedule_shard(&used_pageservers, context)?;
self.intent.push_secondary(scheduler, node_id);
used_pageservers.push(node_id);
modified = true;
@@ -512,7 +559,7 @@ impl TenantState {
modified = true;
} else if self.intent.secondary.is_empty() {
// Populate secondary by scheduling a fresh node
let node_id = scheduler.schedule_shard(&[])?;
let node_id = scheduler.schedule_shard(&[], context)?;
self.intent.push_secondary(scheduler, node_id);
modified = true;
}
@@ -539,6 +586,162 @@ impl TenantState {
Ok(())
}
/// Optimize attachments: if a shard has a secondary location that is preferable to
/// its primary location based on soft constraints, switch that secondary location
/// to be attached.
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn optimize_attachment(
&self,
nodes: &HashMap<NodeId, Node>,
schedule_context: &ScheduleContext,
) -> Option<ScheduleOptimization> {
let attached = (*self.intent.get_attached())?;
if self.intent.secondary.is_empty() {
// We can only do useful work if we have both attached and secondary locations: this
// function doesn't schedule new locations, only swaps between attached and secondaries.
return None;
}
let current_affinity_score = schedule_context.get_node_affinity(attached);
let current_attachment_count = schedule_context.get_node_attachments(attached);
// Generate score for each node, dropping any un-schedulable nodes.
let all_pageservers = self.intent.all_pageservers();
let mut scores = all_pageservers
.iter()
.flat_map(|node_id| {
if matches!(
nodes
.get(node_id)
.map(|n| n.may_schedule())
.unwrap_or(MaySchedule::No),
MaySchedule::No
) {
None
} else {
let affinity_score = schedule_context.get_node_affinity(*node_id);
let attachment_count = schedule_context.get_node_attachments(*node_id);
Some((*node_id, affinity_score, attachment_count))
}
})
.collect::<Vec<_>>();
// Sort precedence:
// 1st - prefer nodes with the lowest total affinity score
// 2nd - prefer nodes with the lowest number of attachments in this context
// 3rd - if all else is equal, sort by node ID for determinism in tests.
scores.sort_by_key(|i| (i.1, i.2, i.0));
if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
scores.first()
{
if attached != *preferred_node {
// The best alternative must be more than 1 better than us, otherwise we could end
// up flapping back next time we're called (e.g. there's no point migrating from
// a location with score 1 to a score zero, because on next location the situation
// would be the same, but in reverse).
if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
|| current_attachment_count > *preferred_attachment_count + 1
{
tracing::info!(
"Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
self.intent.get_secondary()
);
return Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
old_attached_node_id: attached,
new_attached_node_id: *preferred_node,
}));
}
} else {
tracing::debug!(
"Node {} is already preferred (score {:?})",
preferred_node,
preferred_affinity_score
);
}
}
// Fall-through: we didn't find an optimization
None
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn optimize_secondary(
&self,
scheduler: &Scheduler,
schedule_context: &ScheduleContext,
) -> Option<ScheduleOptimization> {
if self.intent.secondary.is_empty() {
// We can only do useful work if we have both attached and secondary locations: this
// function doesn't schedule new locations, only swaps between attached and secondaries.
return None;
}
for secondary in self.intent.get_secondary() {
let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
// We're already on a node unaffected any affinity constraints,
// so we won't change it.
continue;
};
// Let the scheduler suggest a node, where it would put us if we were scheduling afresh
// This implicitly limits the choice to nodes that are available, and prefers nodes
// with lower utilization.
let Ok(candidate_node) =
scheduler.schedule_shard(&self.intent.all_pageservers(), schedule_context)
else {
// A scheduling error means we have no possible candidate replacements
continue;
};
let candidate_affinity_score = schedule_context
.nodes
.get(&candidate_node)
.unwrap_or(&AffinityScore::FREE);
// The best alternative must be more than 1 better than us, otherwise we could end
// up flapping back next time we're called.
if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
// If some other node is available and has a lower score than this node, then
// that other node is a good place to migrate to.
tracing::info!(
"Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
self.intent.get_secondary()
);
return Some(ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
old_node_id: *secondary,
new_node_id: candidate_node,
}));
}
}
None
}
pub(crate) fn apply_optimization(
&mut self,
scheduler: &mut Scheduler,
optimization: ScheduleOptimization,
) {
match optimization {
ScheduleOptimization::MigrateAttachment(MigrateAttachment {
old_attached_node_id,
new_attached_node_id,
}) => {
self.intent.demote_attached(old_attached_node_id);
self.intent
.promote_attached(scheduler, new_attached_node_id);
}
ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
old_node_id,
new_node_id,
}) => {
self.intent.remove_secondary(scheduler, old_node_id);
self.intent.push_secondary(scheduler, new_node_id);
}
}
}
/// Query whether the tenant's observed state for attached node matches its intent state, and if so,
/// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
/// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
@@ -953,6 +1156,32 @@ pub(crate) mod tests {
)
}
fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantState> {
let tenant_id = TenantId::generate();
(0..shard_count.count())
.map(|i| {
let shard_number = ShardNumber(i);
let tenant_shard_id = TenantShardId {
tenant_id,
shard_number,
shard_count,
};
TenantState::new(
tenant_shard_id,
ShardIdentity::new(
shard_number,
shard_count,
pageserver_api::shard::ShardStripeSize(32768),
)
.unwrap(),
policy.clone(),
)
})
.collect()
}
/// Test the scheduling behaviors used when a tenant configured for HA is subject
/// to nodes being marked offline.
#[test]
@@ -962,10 +1191,11 @@ pub(crate) mod tests {
let mut nodes = make_test_nodes(3);
let mut scheduler = Scheduler::new(nodes.values());
let mut context = ScheduleContext::default();
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Attached(1));
tenant_state
.schedule(&mut scheduler)
.schedule(&mut scheduler, &mut context)
.expect("we have enough nodes, scheduling should work");
// Expect to initially be schedule on to different nodes
@@ -991,7 +1221,7 @@ pub(crate) mod tests {
// Scheduling the node should promote the still-available secondary node to attached
tenant_state
.schedule(&mut scheduler)
.schedule(&mut scheduler, &mut context)
.expect("active nodes are available");
assert_eq!(tenant_state.intent.attached.unwrap(), secondary_node_id);
@@ -1065,15 +1295,209 @@ pub(crate) mod tests {
// In pause mode, schedule() shouldn't do anything
tenant_state.scheduling_policy = ShardSchedulingPolicy::Pause;
assert!(tenant_state.schedule(&mut scheduler).is_ok());
assert!(tenant_state
.schedule(&mut scheduler, &mut ScheduleContext::default())
.is_ok());
assert!(tenant_state.intent.all_pageservers().is_empty());
// In active mode, schedule() works
tenant_state.scheduling_policy = ShardSchedulingPolicy::Active;
assert!(tenant_state.schedule(&mut scheduler).is_ok());
assert!(tenant_state
.schedule(&mut scheduler, &mut ScheduleContext::default())
.is_ok());
assert!(!tenant_state.intent.all_pageservers().is_empty());
tenant_state.intent.clear(&mut scheduler);
Ok(())
}
#[test]
fn optimize_attachment() -> anyhow::Result<()> {
let nodes = make_test_nodes(3);
let mut scheduler = Scheduler::new(nodes.values());
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
// Initially: both nodes attached on shard 1, and both have secondary locations
// on different nodes.
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
let mut schedule_context = ScheduleContext::default();
schedule_context.avoid(&shard_a.intent.all_pageservers());
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
schedule_context.avoid(&shard_b.intent.all_pageservers());
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
// Either shard should recognize that it has the option to switch to a secondary location where there
// would be no other shards from the same tenant, and request to do so.
assert_eq!(
optimization_a,
Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(2)
}))
);
// Note that these optimizing two shards in the same tenant with the same ScheduleContext is
// mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
// of [`Service::optimize_all`] to avoid trying
// to do optimizations for multiple shards in the same tenant at the same time. Generating
// both optimizations is just done for test purposes
let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
assert_eq!(
optimization_b,
Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(3)
}))
);
// Applying these optimizations should result in the end state proposed
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
shard_a.intent.clear(&mut scheduler);
shard_b.intent.clear(&mut scheduler);
Ok(())
}
#[test]
fn optimize_secondary() -> anyhow::Result<()> {
let nodes = make_test_nodes(4);
let mut scheduler = Scheduler::new(nodes.values());
let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
// Initially: both nodes attached on shard 1, and both have secondary locations
// on different nodes.
shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
let mut schedule_context = ScheduleContext::default();
schedule_context.avoid(&shard_a.intent.all_pageservers());
schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
schedule_context.avoid(&shard_b.intent.all_pageservers());
schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
let optimization_a = shard_a.optimize_secondary(&scheduler, &schedule_context);
// Since there is a node with no locations available, the node with two locations for the
// same tenant should generate an optimization to move one away
assert_eq!(
optimization_a,
Some(ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
old_node_id: NodeId(3),
new_node_id: NodeId(4)
}))
);
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
shard_a.intent.clear(&mut scheduler);
shard_b.intent.clear(&mut scheduler);
Ok(())
}
// Optimize til quiescent: this emulates what Service::optimize_all does, when
// called repeatedly in the background.
fn optimize_til_idle(
nodes: &HashMap<NodeId, Node>,
scheduler: &mut Scheduler,
shards: &mut [TenantState],
) {
let mut loop_n = 0;
loop {
let mut schedule_context = ScheduleContext::default();
let mut any_changed = false;
for shard in shards.iter() {
schedule_context.avoid(&shard.intent.all_pageservers());
if let Some(attached) = shard.intent.get_attached() {
schedule_context.push_attached(*attached);
}
}
for shard in shards.iter_mut() {
let optimization = shard.optimize_attachment(nodes, &schedule_context);
if let Some(optimization) = optimization {
shard.apply_optimization(scheduler, optimization);
any_changed = true;
break;
}
let optimization = shard.optimize_secondary(scheduler, &schedule_context);
if let Some(optimization) = optimization {
shard.apply_optimization(scheduler, optimization);
any_changed = true;
break;
}
}
if !any_changed {
break;
}
// Assert no infinite loop
loop_n += 1;
assert!(loop_n < 1000);
}
}
/// Test the balancing behavior of shard scheduling: that it achieves a balance, and
/// that it converges.
#[test]
fn optimize_add_nodes() -> anyhow::Result<()> {
let nodes = make_test_nodes(4);
// Only show the scheduler a couple of nodes
let mut scheduler = Scheduler::new([].iter());
scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
let mut schedule_context = ScheduleContext::default();
for shard in &mut shards {
assert!(shard
.schedule(&mut scheduler, &mut schedule_context)
.is_ok());
}
// We should see equal number of locations on the two nodes.
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
// Add another two nodes: we should see the shards spread out when their optimize
// methods are called
scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
optimize_til_idle(&nodes, &mut scheduler, &mut shards);
assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
for shard in shards.iter_mut() {
shard.intent.clear(&mut scheduler);
}
Ok(())
}
}

View File

@@ -271,6 +271,17 @@ impl Client {
Ok((status, progress))
}
pub async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
let path = reqwest::Url::parse(&format!(
"{}/v1/tenant/{}/heatmap_upload",
self.mgmt_api_endpoint, tenant_id
))
.expect("Cannot build URL");
self.request(Method::POST, path, ()).await?;
Ok(())
}
pub async fn location_config(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -0,0 +1,79 @@
import concurrent.futures
import threading
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
)
from fixtures.types import TenantId, TimelineId
def test_sharding_split_big_tenant(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"""
Check that splitting works as expected for a tenant with a reasonable amount of data, larger
than we use in a typical test.
"""
neon_env_builder.num_pageservers = 4
env = neon_env_builder.init_configs()
neon_env_builder.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.neon_cli.create_tenant(
tenant_id, timeline_id, shard_count=1, placement_policy='{"Attached":1}'
)
# TODO: a large scale/size
expect_size = 100e6
scale = 500
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
with env.endpoints.create_start(
"main",
tenant_id=tenant_id,
) as ep:
options = "-cstatement_timeout=0 " + ep.default_options.get("options", "")
connstr = ep.connstr(password=None, options=options)
password = ep.default_options.get("password", None)
environ = {}
if password is not None:
environ["PGPASSWORD"] = password
args = ["pgbench", f"-s{scale}", "-i", "-I", "dtGvp", connstr]
# Write a lot of data into the tenant
pg_bin.run(args, env=environ)
# Confirm that we have created a physical size as large as expected
timeline_info = env.storage_controller.pageserver_api().timeline_detail(
tenant_id, timeline_id
)
log.info(f"Timeline after init: {timeline_info}")
assert timeline_info["current_physical_size"] > expect_size
background_job_duration = 30
background_stop = threading.Event()
def background_load():
while not background_stop.is_set():
args = [
"pgbench",
"-N",
"-c4",
f"-T{background_job_duration}",
"-P2",
"--progress-timestamp",
connstr,
]
pg_bin.run(args, env=environ)
bg_fut = executor.submit(background_load)
# Do a split while the endpoint is alive
env.storage_controller.tenant_shard_split(tenant_id, shard_count=4)
# Pump the scheduler to do all the changes it would do in the background
# after a shard split.
env.storage_controller.reconcile_until_idle(timeout_secs=300)
background_stop.set()
bg_fut.result(timeout=background_job_duration * 2)

View File

@@ -0,0 +1,109 @@
import concurrent.futures
import random
import time
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pg_version import PgVersion
from fixtures.types import TenantId, TenantShardId, TimelineId
def test_sharding_service_many_tenants(
neon_env_builder: NeonEnvBuilder,
):
"""
Check that we cope well with a not-totally-trivial number of tenants.
This is checking for:
- Obvious concurrency bugs from issuing many tenant creations/modifications
concurrently.
- Obvious scaling bugs like O(N^2) scaling that would be so slow that even
a basic test starts failing from slowness.
This is _not_ a comprehensive scale test: just a basic sanity check that
we don't fall over for a thousand shards.
"""
neon_env_builder.num_pageservers = 5
env = neon_env_builder.init_start()
# Total tenants
tenant_count = 2000
# Shards per tenant
shard_count = 2
stripe_size = 1024
tenants = set(TenantId.generate() for _i in range(0, tenant_count))
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
# We use a fixed seed to make the test reproducible: we want a randomly
# chosen order, but not to change the order every time we run the test.
rng = random.Random(1234)
# We will create tenants directly via API, not via neon_local, to avoid any false
# serialization of operations in neon_local (it e.g. loads/saves a config file on each call)
with concurrent.futures.ThreadPoolExecutor() as executor:
futs = []
for tenant_id in tenants:
f = executor.submit(
env.storage_controller.tenant_create, tenant_id, shard_count, stripe_size
)
futs.append(f)
# Wait for creations to finish
for f in futs:
f.result()
# Generate a mixture of operations and dispatch them all concurrently
futs = []
for tenant_id in tenants:
op = rng.choice([0, 1, 2])
if op == 0:
# A fan-out write operation to all shards in a tenant (timeline creation)
f = executor.submit(
virtual_ps_http.timeline_create,
PgVersion.NOT_SET,
tenant_id,
TimelineId.generate(),
)
elif op == 1:
# A reconciler operation: migrate a shard.
shard_number = rng.randint(0, shard_count - 1)
tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count)
dest_ps_id = rng.choice([ps.id for ps in env.pageservers])
f = executor.submit(
env.storage_controller.tenant_shard_migrate, tenant_shard_id, dest_ps_id
)
elif op == 2:
# A passthrough read to shard zero
f = executor.submit(virtual_ps_http.tenant_status, tenant_id)
futs.append(f)
# Wait for mixed ops to finish
for f in futs:
f.result()
# Rolling node failures: this is a small number of requests, but results in a large
# number of scheduler calls and reconcile tasks.
for pageserver in env.pageservers:
env.storage_controller.node_configure(pageserver.id, {"availability": "Offline"})
# The sleeps are just to make sure we aren't optimizing-away any re-scheduling operations
# from a brief flap in node state.
time.sleep(1)
env.storage_controller.node_configure(pageserver.id, {"availability": "Active"})
time.sleep(1)
# Restart the storage controller
env.storage_controller.stop()
env.storage_controller.start()
# Restart pageservers: this exercises the /re-attach API
for pageserver in env.pageservers:
pageserver.stop()
pageserver.start()

View File

@@ -364,3 +364,67 @@ def test_slots_and_branching(neon_simple_env: NeonEnv):
# Check that we can create slot with the same name
ws_cur = ws_branch.connect().cursor()
ws_cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')")
def test_replication_shutdown(neon_simple_env: NeonEnv):
# Ensure Postgres can exit without stuck when a replication job is active + neon extension installed
env = neon_simple_env
env.neon_cli.create_branch("test_replication_shutdown_publisher", "empty")
pub = env.endpoints.create("test_replication_shutdown_publisher")
env.neon_cli.create_branch("test_replication_shutdown_subscriber")
sub = env.endpoints.create("test_replication_shutdown_subscriber")
pub.respec(skip_pg_catalog_updates=False)
pub.start()
sub.respec(skip_pg_catalog_updates=False)
sub.start()
pub.wait_for_migrations()
sub.wait_for_migrations()
with pub.cursor() as cur:
cur.execute(
"CREATE ROLE mr_whiskers WITH PASSWORD 'cat' LOGIN INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser"
)
cur.execute("CREATE DATABASE neondb WITH OWNER mr_whiskers")
cur.execute("GRANT ALL PRIVILEGES ON DATABASE neondb TO neon_superuser")
# If we don't do this, creating the subscription will fail later on PG16
pub.edit_hba(["host all mr_whiskers 0.0.0.0/0 md5"])
with sub.cursor() as cur:
cur.execute(
"CREATE ROLE mr_whiskers WITH PASSWORD 'cat' LOGIN INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser"
)
cur.execute("CREATE DATABASE neondb WITH OWNER mr_whiskers")
cur.execute("GRANT ALL PRIVILEGES ON DATABASE neondb TO neon_superuser")
with pub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as cur:
cur.execute("CREATE PUBLICATION pub FOR ALL TABLES")
cur.execute("CREATE TABLE t (a int)")
cur.execute("INSERT INTO t VALUES (10), (20)")
cur.execute("SELECT * from t")
res = cur.fetchall()
assert [r[0] for r in res] == [10, 20]
with sub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as cur:
cur.execute("CREATE TABLE t (a int)")
pub_conn = f"host=localhost port={pub.pg_port} dbname=neondb user=mr_whiskers password=cat"
query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_conn}' PUBLICATION pub"
log.info(f"Creating subscription: {query}")
cur.execute(query)
with pub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as pcur:
pcur.execute("INSERT INTO t VALUES (30), (40)")
def check_that_changes_propagated():
cur.execute("SELECT * FROM t")
res = cur.fetchall()
log.info(res)
assert len(res) == 4
assert [r[0] for r in res] == [10, 20, 30, 40]
wait_until(10, 0.5, check_that_changes_propagated)

View File

@@ -1,3 +1,4 @@
import time
from contextlib import closing
from fixtures.log_helper import log
@@ -43,6 +44,12 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
with closing(endpoint_main.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SELECT extversion from pg_extension where extname='neon'")
# IMPORTANT:
# If the version has changed, the test should be updated.
# Ensure that the default version is also updated in the neon.control file
assert cur.fetchone() == ("1.3",)
cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
all_versions = ["1.3", "1.2", "1.1", "1.0"]
current_version = "1.3"
for idx, begin_version in enumerate(all_versions):
@@ -60,3 +67,30 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
cur.execute(
f"ALTER EXTENSION neon UPDATE TO '{begin_version}'; -- {target_version}->{begin_version}"
)
# Verify that the neon extension can be auto-upgraded to the latest version.
def test_neon_extension_auto_upgrade(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_neon_extension_auto_upgrade")
endpoint_main = env.endpoints.create("test_neon_extension_auto_upgrade")
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
endpoint_main.respec(skip_pg_catalog_updates=False)
endpoint_main.start()
with closing(endpoint_main.connect()) as conn:
with conn.cursor() as cur:
cur.execute("ALTER EXTENSION neon UPDATE TO '1.0';")
cur.execute("SELECT extversion from pg_extension where extname='neon'")
assert cur.fetchone() == ("1.0",) # Ensure the extension gets downgraded
endpoint_main.stop()
time.sleep(1)
endpoint_main.start()
time.sleep(1)
with closing(endpoint_main.connect()) as conn:
with conn.cursor() as cur:
cur.execute("SELECT extversion from pg_extension where extname='neon'")
assert cur.fetchone() != ("1.0",) # Ensure the extension gets upgraded

View File

@@ -1,3 +1,4 @@
import json
import os
import time
from collections import defaultdict
@@ -146,7 +147,7 @@ def test_sharding_split_smoke(
# 8 shards onto separate pageservers
shard_count = 4
split_shard_count = 8
neon_env_builder.num_pageservers = split_shard_count
neon_env_builder.num_pageservers = split_shard_count * 2
# 1MiB stripes: enable getting some meaningful data distribution without
# writing large quantities of data in this test. The stripe size is given
@@ -174,6 +175,7 @@ def test_sharding_split_smoke(
placement_policy='{"Attached": 1}',
conf=non_default_tenant_config,
)
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
workload.init()
@@ -265,26 +267,13 @@ def test_sharding_split_smoke(
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
workload.validate()
migrate_to_pageserver_ids = list(
set(p.id for p in env.pageservers) - set(pre_split_pageserver_ids)
)
assert len(migrate_to_pageserver_ids) == split_shard_count - shard_count
# Enough background reconciliations should result in the shards being properly distributed
env.storage_controller.reconcile_until_idle()
# Migrate shards away from the node where the split happened
for ps_id in pre_split_pageserver_ids:
shards_here = [
tenant_shard_id
for (tenant_shard_id, pageserver) in all_shards
if pageserver.id == ps_id
]
assert len(shards_here) == 2
migrate_shard = shards_here[0]
destination = migrate_to_pageserver_ids.pop()
log.info(f"Migrating shard {migrate_shard} from {ps_id} to {destination}")
env.storage_controller.tenant_shard_migrate(migrate_shard, destination)
workload.validate()
# We have 8 shards and 16 nodes
# Initially I expect 4 nodes to have 2 attached locations each, and another 8 nodes to have
# 1 secondary location each
# 2 2 2 2 1 1 1 1 1 1 1 1 0 0 0 0
# Assert on how many reconciles happened during the process. This is something of an
# implementation detail, but it is useful to detect any bugs that might generate spurious
@@ -294,8 +283,9 @@ def test_sharding_split_smoke(
# - shard_count reconciles for the original setup of the tenant
# - shard_count reconciles for detaching the original secondary locations during split
# - split_shard_count reconciles during shard splitting, for setting up secondaries.
# - shard_count reconciles for the migrations we did to move child shards away from their split location
expect_reconciles = shard_count * 2 + split_shard_count + shard_count
# - shard_count of the child shards will need to fail over to their secondaries
# - shard_count of the child shard secondary locations will get moved to emptier nodes
expect_reconciles = shard_count * 2 + split_shard_count + shard_count * 2
reconcile_ok = env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
)
@@ -1050,3 +1040,82 @@ def test_sharding_backpressure(neon_env_builder: NeonEnvBuilder):
max_lsn = max(Lsn(info["last_record_lsn"]) for info in infos)
diff = max_lsn - min_lsn
assert diff < 2 * 1024 * 1024, f"LSN diff={diff}, expected diff < 2MB due to backpressure"
# Stripe sizes in number of pages.
TINY_STRIPES = 16
LARGE_STRIPES = 32768
@pytest.mark.parametrize("stripe_size", [TINY_STRIPES, LARGE_STRIPES])
def test_sharding_compaction(neon_env_builder: NeonEnvBuilder, stripe_size: int):
"""
Use small stripes, small layers, and small compaction thresholds to exercise how compaction
and image layer generation interacts with sharding.
"""
TENANT_CONF = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{128 * 1024}",
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "0s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# create image layers eagerly: we want to exercise image layer creation in this test.
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": 0,
}
neon_env_builder.num_pageservers = 4
env = neon_env_builder.init_start(
initial_tenant_conf=TENANT_CONF,
initial_tenant_shard_count=4,
initial_tenant_shard_stripe_size=stripe_size,
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(64)
for _i in range(0, 10):
# Each of these does some writes then a checkpoint: because we set image_creation_threshold to 1,
# these should result in image layers each time we write some data into a shard, and also shards
# recieving less data hitting their "empty image layer" path (wherre they should skip writing the layer,
# rather than asserting)
workload.churn_rows(64)
# Assert that we got some image layers: this is important because this test's purpose is to exercise the sharding changes
# to Timeline::create_image_layers, so if we weren't creating any image layers we wouldn't be doing our job.
shard_has_image_layers = []
for shard in env.storage_controller.locate(tenant_id):
pageserver = env.get_pageserver(shard["node_id"])
shard_id = shard["shard_id"]
layer_map = pageserver.http_client().layer_map_info(shard_id, timeline_id)
image_layer_sizes = {}
for layer in layer_map.historic_layers:
if layer.kind == "Image":
image_layer_sizes[layer.layer_file_name] = layer.layer_file_size
# Pageserver should assert rather than emit an empty layer file, but double check here
assert layer.layer_file_size is not None
assert layer.layer_file_size > 0
shard_has_image_layers.append(len(image_layer_sizes) > 1)
log.info(f"Shard {shard_id} layer sizes: {json.dumps(image_layer_sizes, indent=2)}")
# TODO: once keyspace partitioning is updated, assert that layer sizes are as expected
# (see https://github.com/neondatabase/neon/issues/6774)
if stripe_size == TINY_STRIPES:
# Expect writes were scattered across all pageservers: they should all have compacted some image layers
assert all(shard_has_image_layers)
else:
# With large stripes, it is expected that most of our writes went to one pageserver, so we just require
# that at least one of them has some image layers.
assert any(shard_has_image_layers)
# Assert that everything is still readable
workload.validate()

View File

@@ -0,0 +1,188 @@
import concurrent.futures
import random
from collections import defaultdict
from fixtures.compute_reconfigure import ComputeReconfigure
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
)
from fixtures.types import TenantId, TenantShardId, TimelineId
from fixtures.utils import wait_until
from fixtures.workload import Workload
def get_node_shard_counts(env: NeonEnv, tenant_ids):
total: defaultdict[int, int] = defaultdict(int)
attached: defaultdict[int, int] = defaultdict(int)
for tid in tenant_ids:
for shard in env.storage_controller.tenant_describe(tid)["shards"]:
log.info(
f"{shard['tenant_shard_id']}: attached={shard['node_attached']}, secondary={shard['node_secondary']} "
)
for node in shard["node_secondary"]:
total[int(node)] += 1
attached[int(shard["node_attached"])] += 1
total[int(shard["node_attached"])] += 1
return total, attached
def test_storcon_rolling_failures(
neon_env_builder: NeonEnvBuilder,
compute_reconfigure_listener: ComputeReconfigure,
):
neon_env_builder.num_pageservers = 8
neon_env_builder.control_plane_compute_hook_api = (
compute_reconfigure_listener.control_plane_compute_hook_api
)
workloads: dict[TenantId, Workload] = {}
env = neon_env_builder.init_start()
for ps in env.pageservers:
# We will do unclean detaches
ps.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
n_tenants = 32
tenants = [(env.initial_tenant, env.initial_timeline)]
for i in range(0, n_tenants - 1):
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
shard_count = [1, 2, 4][i % 3]
env.neon_cli.create_tenant(
tenant_id, timeline_id, shard_count=shard_count, placement_policy='{"Double":1}'
)
tenants.append((tenant_id, timeline_id))
# Background pain:
# - TODO: some fraction of pageserver API requests hang
# (this requires implementing wrap of location_conf calls with proper timeline/cancel)
# - TODO: continuous tenant/timeline creation/destruction over a different ID range than
# the ones we're using for availability checks.
rng = random.Random(0xDEADBEEF)
for tenant_id, timeline_id in tenants:
workload = Workload(env, tenant_id, timeline_id)
compute_reconfigure_listener.register_workload(workload)
workloads[tenant_id] = workload
def node_evacuated(node_id: int):
total, attached = get_node_shard_counts(env, [t[0] for t in tenants])
assert attached[node_id] == 0
def attachments_active():
for tid, _tlid in tenants:
for shard in env.storage_controller.locate(tid):
psid = shard["node_id"]
tsid = TenantShardId.parse(shard["shard_id"])
status = env.get_pageserver(psid).http_client().tenant_status(tenant_id=tsid)
assert status["state"]["slug"] == "Active"
log.info(f"Shard {tsid} active on node {psid}")
failpoints = ("api-503", "5%1000*return(1)")
failpoints_str = f"{failpoints[0]}={failpoints[1]}"
for ps in env.pageservers:
ps.http_client().configure_failpoints(failpoints)
def for_all_workloads(callback, timeout=60):
futs = []
with concurrent.futures.ThreadPoolExecutor() as pool:
for _tenant_id, workload in workloads.items():
futs.append(pool.submit(callback, workload))
for f in futs:
f.result(timeout=timeout)
def clean_fail_restore():
"""
Clean shutdown of a node: mark it offline in storage controller, wait for new attachment
locations to activate, then SIGTERM it.
- Endpoints should not fail any queries
- New attach locations should activate within bounded time.
"""
victim = rng.choice(env.pageservers)
env.storage_controller.node_configure(victim.id, {"availability": "Offline"})
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
wait_until(10, 1, attachments_active)
victim.stop(immediate=False)
traffic()
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
# Revert shards to attach at their original locations
# TODO
# env.storage_controller.balance_attached()
wait_until(10, 1, attachments_active)
def hard_fail_restore():
"""
Simulate an unexpected death of a pageserver node
"""
victim = rng.choice(env.pageservers)
victim.stop(immediate=True)
# TODO: once we implement heartbeats detecting node failures, remove this
# explicit marking offline and rely on storage controller to detect it itself.
env.storage_controller.node_configure(victim.id, {"availability": "Offline"})
wait_until(10, 1, lambda node_id=victim.id: node_evacuated(node_id)) # type: ignore[misc]
wait_until(10, 1, attachments_active)
traffic()
victim.start(extra_env_vars={"FAILPOINTS": failpoints_str})
# TODO
# env.storage_controller.balance_attached()
wait_until(10, 1, attachments_active)
def traffic():
"""
Check that all tenants are working for postgres clients
"""
def exercise_one(workload):
workload.churn_rows(100)
workload.validate()
for_all_workloads(exercise_one)
def init_one(workload):
workload.init()
workload.write_rows(100)
for_all_workloads(init_one, timeout=60)
for i in range(0, 20):
mode = rng.choice([0, 1, 2])
log.info(f"Iteration {i}, mode {mode}")
if mode == 0:
# Traffic interval: sometimes, instead of a failure, just let the clients
# write a load of data. This avoids chaos tests ending up with unrealistically
# small quantities of data in flight.
traffic()
elif mode == 1:
clean_fail_restore()
elif mode == 2:
hard_fail_restore()
# Fail and restart: hard-kill one node. Notify the storage controller that it is offline.
# Success criteria:
# - New attach locations should activate within bounded time
# - TODO: once we do heartbeating, we should not have to explicitly mark the node offline
# TODO: fail and remove: fail a node, and remove it from the cluster.
# Success criteria:
# - Endpoints should not fail any queries
# - New attach locations should activate within bounded time
# - New secondary locations should fill up with data within bounded time
# TODO: somehow need to wait for reconciles to complete before doing consistency check
# (or make the check wait).
# Do consistency check on every iteration, not just at the end: this makes it more obvious
# which change caused an issue.
env.storage_controller.consistency_check()