storage controller: check warmth of secondary before doing proactive migration (#7583)

## Problem

The logic in Service::optimize_all would sometimes choose to migrate a
tenant to a secondary location that was only recently created, resulting
in Reconciler::live_migrate hitting its 5 minute timeout warming up the
location, and proceeding to attach a tenant to a location that doesn't
have a warm enough local set of layer files for good performance.

Closes: #7532 

## Summary of changes

- Add a pageserver API for checking download progress of a secondary
location
- During `optimize_all`, connect to pageservers of candidate
optimization secondary locations, and check they are warm.
- During shard split, do heatmap uploads and start secondary downloads,
so that the new shards' secondary locations start downloading ASAP,
rather than waiting minutes for background downloads to kick in.

I have intentionally not implemented this by continuously reading the
status of locations, to avoid dealing with the scale challenge of
efficiently polling & updating 10k-100k locations status. If we
implement that in the future, then this code can be simplified to act
based on latest state of a location rather than fetching it inline
during optimize_all.
This commit is contained in:
John Spray
2024-05-03 15:28:23 +01:00
committed by GitHub
parent ce0ddd749c
commit b5a6e68e68
6 changed files with 471 additions and 63 deletions

View File

@@ -114,6 +114,27 @@ impl PageserverClient {
)
}
pub(crate) async fn tenant_secondary_status(
&self,
tenant_shard_id: TenantShardId,
) -> Result<SecondaryProgress> {
measured_request!(
"tenant_secondary_status",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner.tenant_secondary_status(tenant_shard_id).await
)
}
pub(crate) async fn tenant_heatmap_upload(&self, tenant_id: TenantShardId) -> Result<()> {
measured_request!(
"tenant_heatmap_upload",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.tenant_heatmap_upload(tenant_id).await
)
}
pub(crate) async fn location_config(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -13,7 +13,9 @@ use crate::{
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{ScheduleContext, ScheduleMode},
tenant_shard::ReconcileNeeded,
tenant_shard::{
MigrateAttachment, ReconcileNeeded, ScheduleOptimization, ScheduleOptimizationAction,
},
};
use anyhow::Context;
use control_plane::storage_controller::{
@@ -709,7 +711,7 @@ impl Service {
let reconciles_spawned = self.reconcile_all();
if reconciles_spawned == 0 {
// Run optimizer only when we didn't find any other work to do
self.optimize_all();
self.optimize_all().await;
}
}
_ = self.cancel.cancelled() => return
@@ -2639,6 +2641,45 @@ impl Service {
Ok(results)
}
/// Concurrently invoke a pageserver API call on many shards at once
pub(crate) async fn tenant_for_shards_api<T, O, F>(
&self,
locations: Vec<(TenantShardId, Node)>,
op: O,
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
cancel: &CancellationToken,
) -> Vec<mgmt_api::Result<T>>
where
O: Fn(TenantShardId, PageserverClient) -> F + Copy,
F: std::future::Future<Output = mgmt_api::Result<T>>,
{
let mut futs = FuturesUnordered::new();
let mut results = Vec::with_capacity(locations.len());
for (tenant_shard_id, node) in locations {
futs.push(async move {
node.with_client_retries(
|client| op(tenant_shard_id, client),
&self.config.jwt_token,
warn_threshold,
max_retries,
timeout,
cancel,
)
.await
});
}
while let Some(r) = futs.next().await {
let r = r.unwrap_or(Err(mgmt_api::Error::Cancelled));
results.push(r);
}
results
}
pub(crate) async fn tenant_timeline_delete(
&self,
tenant_id: TenantId,
@@ -3088,11 +3129,14 @@ impl Service {
) -> (
TenantShardSplitResponse,
Vec<(TenantShardId, NodeId, ShardStripeSize)>,
Vec<ReconcilerWaiter>,
) {
let mut response = TenantShardSplitResponse {
new_shards: Vec::new(),
};
let mut child_locations = Vec::new();
let mut waiters = Vec::new();
{
let mut locked = self.inner.write().unwrap();
@@ -3171,14 +3215,112 @@ impl Service {
tracing::warn!("Failed to schedule child shard {child}: {e}");
}
// In the background, attach secondary locations for the new shards
self.maybe_reconcile_shard(&mut child_state, nodes);
if let Some(waiter) = self.maybe_reconcile_shard(&mut child_state, nodes) {
waiters.push(waiter);
}
tenants.insert(child, child_state);
response.new_shards.push(child);
}
}
(response, child_locations, waiters)
}
}
(response, child_locations)
async fn tenant_shard_split_start_secondaries(
&self,
tenant_id: TenantId,
waiters: Vec<ReconcilerWaiter>,
) {
// Wait for initial reconcile of child shards, this creates the secondary locations
if let Err(e) = self.await_waiters(waiters, RECONCILE_TIMEOUT).await {
// This is not a failure to split: it's some issue reconciling the new child shards, perhaps
// their secondaries couldn't be attached.
tracing::warn!("Failed to reconcile after split: {e}");
return;
}
// Take the state lock to discover the attached & secondary intents for all shards
let (attached, secondary) = {
let locked = self.inner.read().unwrap();
let mut attached = Vec::new();
let mut secondary = Vec::new();
for (tenant_shard_id, shard) in
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
{
let Some(node_id) = shard.intent.get_attached() else {
// Unexpected. Race with a PlacementPolicy change?
tracing::warn!(
"No attached node on {tenant_shard_id} immediately after shard split!"
);
continue;
};
let Some(secondary_node_id) = shard.intent.get_secondary().first() else {
// No secondary location. Nothing for us to do.
continue;
};
let attached_node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
let secondary_node = locked
.nodes
.get(secondary_node_id)
.expect("Pageservers may not be deleted while referenced");
attached.push((*tenant_shard_id, attached_node.clone()));
secondary.push((*tenant_shard_id, secondary_node.clone()));
}
(attached, secondary)
};
if secondary.is_empty() {
// No secondary locations; nothing for us to do
return;
}
for result in self
.tenant_for_shards_api(
attached,
|tenant_shard_id, client| async move {
client.tenant_heatmap_upload(tenant_shard_id).await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await
{
if let Err(e) = result {
tracing::warn!("Error calling heatmap upload after shard split: {e}");
return;
}
}
for result in self
.tenant_for_shards_api(
secondary,
|tenant_shard_id, client| async move {
client
.tenant_secondary_download(tenant_shard_id, Some(Duration::ZERO))
.await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await
{
if let Err(e) = result {
tracing::warn!("Error calling secondary download after shard split: {e}");
return;
}
}
}
@@ -3212,8 +3354,8 @@ impl Service {
.do_tenant_shard_split(tenant_id, shard_split_params)
.await;
match r {
Ok(r) => Ok(r),
let (response, waiters) = match r {
Ok(r) => r,
Err(e) => {
// Split might be part-done, we must do work to abort it.
tracing::warn!("Enqueuing background abort of split on {tenant_id}");
@@ -3226,9 +3368,17 @@ impl Service {
})
// Ignore error sending: that just means we're shutting down: aborts are ephemeral so it's fine to drop it.
.ok();
Err(e)
return Err(e);
}
}
};
// The split is now complete. As an optimization, we will trigger all the child shards to upload
// a heatmap immediately, and all their secondary locations to start downloading: this avoids waiting
// for the background heatmap/download interval before secondaries get warm enough to migrate shards
// in [`Self::optimize_all`]
self.tenant_shard_split_start_secondaries(tenant_id, waiters)
.await;
Ok(response)
}
fn prepare_tenant_shard_split(
@@ -3378,7 +3528,7 @@ impl Service {
&self,
tenant_id: TenantId,
params: ShardSplitParams,
) -> Result<TenantShardSplitResponse, ApiError> {
) -> Result<(TenantShardSplitResponse, Vec<ReconcilerWaiter>), ApiError> {
// FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
// request could occur here, deleting or mutating the tenant. begin_shard_split checks that the
// parent shards exist as expected, but it would be neater to do the above pre-checks within the
@@ -3580,7 +3730,7 @@ impl Service {
));
// Replace all the shards we just split with their children: this phase is infallible.
let (response, child_locations) =
let (response, child_locations, waiters) =
self.tenant_shard_split_commit_inmem(tenant_id, new_shard_count, new_stripe_size);
// Send compute notifications for all the new shards
@@ -3607,7 +3757,7 @@ impl Service {
}
}
Ok(response)
Ok((response, waiters))
}
pub(crate) async fn tenant_shard_migrate(
@@ -4373,25 +4523,68 @@ impl Service {
/// To put it more briefly: whereas the scheduler respects soft constraints in a ScheduleContext at
/// the time of scheduling, this function looks for cases where a better-scoring location is available
/// according to those same soft constraints.
fn optimize_all(&self) -> usize {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
let pageservers = nodes.clone();
let mut schedule_context = ScheduleContext::default();
let mut reconciles_spawned = 0;
let mut tenant_shards: Vec<&TenantShard> = Vec::new();
async fn optimize_all(&self) -> usize {
// Limit on how many shards' optmizations each call to this function will execute. Combined
// with the frequency of background calls, this acts as an implicit rate limit that runs a small
// trickle of optimizations in the background, rather than executing a large number in parallel
// when a change occurs.
const MAX_OPTIMIZATIONS_PER_PASS: usize = 2;
const MAX_OPTIMIZATIONS_EXEC_PER_PASS: usize = 2;
// Synchronous prepare: scan shards for possible scheduling optimizations
let candidate_work = self.optimize_all_plan();
let candidate_work_len = candidate_work.len();
// Asynchronous validate: I/O to pageservers to make sure shards are in a good state to apply validation
let validated_work = self.optimize_all_validate(candidate_work).await;
let was_work_filtered = validated_work.len() != candidate_work_len;
// Synchronous apply: update the shards' intent states according to validated optimisations
let mut reconciles_spawned = 0;
let mut optimizations_applied = 0;
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
for (tenant_shard_id, optimization) in validated_work {
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
// Shard was dropped between planning and execution;
continue;
};
if shard.apply_optimization(scheduler, optimization) {
optimizations_applied += 1;
if self.maybe_reconcile_shard(shard, nodes).is_some() {
reconciles_spawned += 1;
}
}
if optimizations_applied >= MAX_OPTIMIZATIONS_EXEC_PER_PASS {
break;
}
}
if was_work_filtered {
// If we filtered any work out during validation, ensure we return a nonzero value to indicate
// to callers that the system is not in a truly quiet state, it's going to do some work as soon
// as these validations start passing.
reconciles_spawned = std::cmp::max(reconciles_spawned, 1);
}
reconciles_spawned
}
fn optimize_all_plan(&self) -> Vec<(TenantShardId, ScheduleOptimization)> {
let mut schedule_context = ScheduleContext::default();
let mut tenant_shards: Vec<&TenantShard> = Vec::new();
// How many candidate optimizations we will generate, before evaluating them for readniess: setting
// this higher than the execution limit gives us a chance to execute some work even if the first
// few optimizations we find are not ready.
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8;
let mut work = Vec::new();
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
for (tenant_shard_id, shard) in tenants.iter() {
if tenant_shard_id.is_shard_zero() {
// Reset accumulators on the first shard in a tenant
@@ -4400,7 +4593,7 @@ impl Service {
tenant_shards.clear();
}
if work.len() >= MAX_OPTIMIZATIONS_PER_PASS {
if work.len() >= MAX_OPTIMIZATIONS_PLAN_PER_PASS {
break;
}
@@ -4472,18 +4665,105 @@ impl Service {
}
}
for (tenant_shard_id, optimization) in work {
let shard = tenants
.get_mut(&tenant_shard_id)
.expect("We held lock from place we got this ID");
shard.apply_optimization(scheduler, optimization);
work
}
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
reconciles_spawned += 1;
async fn optimize_all_validate(
&self,
candidate_work: Vec<(TenantShardId, ScheduleOptimization)>,
) -> Vec<(TenantShardId, ScheduleOptimization)> {
// Take a clone of the node map to use outside the lock in async validation phase
let validation_nodes = { self.inner.read().unwrap().nodes.clone() };
let mut want_secondary_status = Vec::new();
// Validate our plans: this is an async phase where we may do I/O to pageservers to
// check that the state of locations is acceptable to run the optimization, such as
// checking that a secondary location is sufficiently warmed-up to cleanly cut over
// in a live migration.
let mut validated_work = Vec::new();
for (tenant_shard_id, optimization) in candidate_work {
match optimization.action {
ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: _,
new_attached_node_id,
}) => {
match validation_nodes.get(&new_attached_node_id) {
None => {
// Node was dropped between planning and validation
}
Some(node) => {
if !node.is_available() {
tracing::info!("Skipping optimization migration of {tenant_shard_id} to {new_attached_node_id} because node unavailable");
} else {
// Accumulate optimizations that require fetching secondary status, so that we can execute these
// remote API requests concurrently.
want_secondary_status.push((
tenant_shard_id,
node.clone(),
optimization,
));
}
}
}
}
ScheduleOptimizationAction::ReplaceSecondary(_) => {
// No extra checks needed to replace a secondary: this does not interrupt client access
validated_work.push((tenant_shard_id, optimization))
}
};
}
// Call into pageserver API to find out if the destination secondary location is warm enough for a reasonably smooth migration: we
// do this so that we avoid spawning a Reconciler that would have to wait minutes/hours for a destination to warm up: that reconciler
// would hold a precious reconcile semaphore unit the whole time it was waiting for the destination to warm up.
let results = self
.tenant_for_shards_api(
want_secondary_status
.iter()
.map(|i| (i.0, i.1.clone()))
.collect(),
|tenant_shard_id, client| async move {
client.tenant_secondary_status(tenant_shard_id).await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
for ((tenant_shard_id, node, optimization), secondary_status) in
want_secondary_status.into_iter().zip(results.into_iter())
{
match secondary_status {
Err(e) => {
tracing::info!("Skipping migration of {tenant_shard_id} to {node}, error querying secondary: {e}");
}
Ok(progress) => {
// We require secondary locations to have less than 10GiB of downloads pending before we will use
// them in an optimization
const DOWNLOAD_FRESHNESS_THRESHOLD: u64 = 10 * 1024 * 1024 * 1024;
if progress.bytes_total == 0
|| progress.bytes_total < DOWNLOAD_FRESHNESS_THRESHOLD
&& progress.bytes_downloaded != progress.bytes_total
|| progress.bytes_total - progress.bytes_downloaded
> DOWNLOAD_FRESHNESS_THRESHOLD
{
tracing::info!("Skipping migration of {tenant_shard_id} to {node} because secondary isn't ready: {progress:?}");
} else {
// Location looks ready: proceed
tracing::info!(
"{tenant_shard_id} secondary on {node} is warm enough for migration: {progress:?}"
);
validated_work.push((tenant_shard_id, optimization))
}
}
}
}
reconciles_spawned
validated_work
}
/// Useful for tests: run whatever work a background [`Self::reconcile_all`] would have done, but
@@ -4491,10 +4771,12 @@ impl Service {
/// put the system into a quiescent state where future background reconciliations won't do anything.
pub(crate) async fn reconcile_all_now(&self) -> Result<usize, ReconcileWaitError> {
let reconciles_spawned = self.reconcile_all();
if reconciles_spawned == 0 {
let reconciles_spawned = if reconciles_spawned == 0 {
// Only optimize when we are otherwise idle
self.optimize_all();
}
self.optimize_all().await
} else {
reconciles_spawned
};
let waiters = {
let mut waiters = Vec::new();

View File

@@ -325,18 +325,28 @@ pub(crate) struct ReplaceSecondary {
#[derive(Eq, PartialEq, Debug)]
pub(crate) struct MigrateAttachment {
old_attached_node_id: NodeId,
new_attached_node_id: NodeId,
pub(crate) old_attached_node_id: NodeId,
pub(crate) new_attached_node_id: NodeId,
}
#[derive(Eq, PartialEq, Debug)]
pub(crate) enum ScheduleOptimization {
pub(crate) enum ScheduleOptimizationAction {
// Replace one of our secondary locations with a different node
ReplaceSecondary(ReplaceSecondary),
// Migrate attachment to an existing secondary location
MigrateAttachment(MigrateAttachment),
}
#[derive(Eq, PartialEq, Debug)]
pub(crate) struct ScheduleOptimization {
// What was the reconcile sequence when we generated this optimization? The optimization
// should only be applied if the shard's sequence is still at this value, in case other changes
// happened between planning the optimization and applying it.
sequence: Sequence,
pub(crate) action: ScheduleOptimizationAction,
}
impl ReconcilerWaiter {
pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
tokio::select! {
@@ -675,10 +685,13 @@ impl TenantShard {
"Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
self.intent.get_secondary()
);
return Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
old_attached_node_id: attached,
new_attached_node_id: *preferred_node,
}));
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: attached,
new_attached_node_id: *preferred_node,
}),
});
}
} else {
tracing::debug!(
@@ -736,28 +749,37 @@ impl TenantShard {
"Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
self.intent.get_secondary()
);
return Some(ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
old_node_id: *secondary,
new_node_id: candidate_node,
}));
return Some(ScheduleOptimization {
sequence: self.sequence,
action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
old_node_id: *secondary,
new_node_id: candidate_node,
}),
});
}
}
None
}
/// Return true if the optimization was really applied: it will not be applied if the optimization's
/// sequence is behind this tenant shard's
pub(crate) fn apply_optimization(
&mut self,
scheduler: &mut Scheduler,
optimization: ScheduleOptimization,
) {
) -> bool {
if optimization.sequence != self.sequence {
return false;
}
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_schedule_optimization
.inc();
match optimization {
ScheduleOptimization::MigrateAttachment(MigrateAttachment {
match optimization.action {
ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id,
new_attached_node_id,
}) => {
@@ -765,7 +787,7 @@ impl TenantShard {
self.intent
.promote_attached(scheduler, new_attached_node_id);
}
ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
old_node_id,
new_node_id,
}) => {
@@ -773,6 +795,8 @@ impl TenantShard {
self.intent.push_secondary(scheduler, new_node_id);
}
}
true
}
/// Query whether the tenant's observed state for attached node matches its intent state, and if so,
@@ -1428,10 +1452,13 @@ pub(crate) mod tests {
// would be no other shards from the same tenant, and request to do so.
assert_eq!(
optimization_a,
Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(2)
}))
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(2)
})
})
);
// Note that these optimizing two shards in the same tenant with the same ScheduleContext is
@@ -1442,10 +1469,13 @@ pub(crate) mod tests {
let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
assert_eq!(
optimization_b,
Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(3)
}))
Some(ScheduleOptimization {
sequence: shard_b.sequence,
action: ScheduleOptimizationAction::MigrateAttachment(MigrateAttachment {
old_attached_node_id: NodeId(1),
new_attached_node_id: NodeId(3)
})
})
);
// Applying these optimizations should result in the end state proposed
@@ -1489,10 +1519,13 @@ pub(crate) mod tests {
// same tenant should generate an optimization to move one away
assert_eq!(
optimization_a,
Some(ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
old_node_id: NodeId(3),
new_node_id: NodeId(4)
}))
Some(ScheduleOptimization {
sequence: shard_a.sequence,
action: ScheduleOptimizationAction::ReplaceSecondary(ReplaceSecondary {
old_node_id: NodeId(3),
new_node_id: NodeId(4)
})
})
);
shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());