mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 06:22:57 +00:00
storcon: include preferred AZ in compute notifications (#9953)
## Problem It is unreliable for the control plane to infer the AZ for computes from where the tenant is currently attached, because if a tenant happens to be in a degraded state or a release is ongoing while a compute starts, then the tenant's attached AZ can be a different one to where it will run long-term, and the control plane doesn't check back later to restart the compute. This can land in parallel with https://github.com/neondatabase/neon/pull/9947 ## Summary of changes - Thread through the preferred AZ into the compute hook code via the reconciler - Include the preferred AZ in the body of compute hook notifications
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
use std::borrow::Cow;
|
||||
use std::error::Error as _;
|
||||
use std::sync::Arc;
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
@@ -6,6 +7,7 @@ use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use futures::StreamExt;
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::controller_api::AvailabilityZone;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
|
||||
use postgres_connection::parse_host_port;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -28,6 +30,9 @@ struct UnshardedComputeHookTenant {
|
||||
// Which node is this tenant attached to
|
||||
node_id: NodeId,
|
||||
|
||||
// The tenant's preferred AZ, so that we may pass this on to the control plane
|
||||
preferred_az: Option<AvailabilityZone>,
|
||||
|
||||
// Must hold this lock to send a notification.
|
||||
send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
|
||||
}
|
||||
@@ -36,6 +41,9 @@ struct ShardedComputeHookTenant {
|
||||
shard_count: ShardCount,
|
||||
shards: Vec<(ShardNumber, NodeId)>,
|
||||
|
||||
// The tenant's preferred AZ, so that we may pass this on to the control plane
|
||||
preferred_az: Option<AvailabilityZone>,
|
||||
|
||||
// Must hold this lock to send a notification. The contents represent
|
||||
// the last successfully sent notification, and are used to coalesce multiple
|
||||
// updates by only sending when there is a chance since our last successful send.
|
||||
@@ -64,17 +72,24 @@ enum ComputeHookTenant {
|
||||
|
||||
impl ComputeHookTenant {
|
||||
/// Construct with at least one shard's information
|
||||
fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
|
||||
fn new(
|
||||
tenant_shard_id: TenantShardId,
|
||||
stripe_size: ShardStripeSize,
|
||||
preferred_az: Option<AvailabilityZone>,
|
||||
node_id: NodeId,
|
||||
) -> Self {
|
||||
if tenant_shard_id.shard_count.count() > 1 {
|
||||
Self::Sharded(ShardedComputeHookTenant {
|
||||
shards: vec![(tenant_shard_id.shard_number, node_id)],
|
||||
stripe_size,
|
||||
shard_count: tenant_shard_id.shard_count,
|
||||
preferred_az,
|
||||
send_lock: Arc::default(),
|
||||
})
|
||||
} else {
|
||||
Self::Unsharded(UnshardedComputeHookTenant {
|
||||
node_id,
|
||||
preferred_az,
|
||||
send_lock: Arc::default(),
|
||||
})
|
||||
}
|
||||
@@ -120,15 +135,20 @@ impl ComputeHookTenant {
|
||||
|
||||
/// Set one shard's location. If stripe size or shard count have changed, Self is reset
|
||||
/// and drops existing content.
|
||||
fn update(
|
||||
&mut self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
stripe_size: ShardStripeSize,
|
||||
node_id: NodeId,
|
||||
) {
|
||||
fn update(&mut self, shard_update: ShardUpdate) {
|
||||
let tenant_shard_id = shard_update.tenant_shard_id;
|
||||
let node_id = shard_update.node_id;
|
||||
let stripe_size = shard_update.stripe_size;
|
||||
let preferred_az = shard_update.preferred_az;
|
||||
|
||||
match self {
|
||||
Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
|
||||
unsharded_tenant.node_id = node_id
|
||||
unsharded_tenant.node_id = node_id;
|
||||
if unsharded_tenant.preferred_az.as_ref()
|
||||
!= preferred_az.as_ref().map(|az| az.as_ref())
|
||||
{
|
||||
unsharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
|
||||
}
|
||||
}
|
||||
Self::Sharded(sharded_tenant)
|
||||
if sharded_tenant.stripe_size == stripe_size
|
||||
@@ -146,10 +166,21 @@ impl ComputeHookTenant {
|
||||
.push((tenant_shard_id.shard_number, node_id));
|
||||
sharded_tenant.shards.sort_by_key(|s| s.0)
|
||||
}
|
||||
|
||||
if sharded_tenant.preferred_az.as_ref()
|
||||
!= preferred_az.as_ref().map(|az| az.as_ref())
|
||||
{
|
||||
sharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// Shard count changed: reset struct.
|
||||
*self = Self::new(tenant_shard_id, stripe_size, node_id);
|
||||
*self = Self::new(
|
||||
tenant_shard_id,
|
||||
stripe_size,
|
||||
preferred_az.map(|az| az.into_owned()),
|
||||
node_id,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -165,6 +196,7 @@ struct ComputeHookNotifyRequestShard {
|
||||
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
|
||||
struct ComputeHookNotifyRequest {
|
||||
tenant_id: TenantId,
|
||||
preferred_az: Option<String>,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
shards: Vec<ComputeHookNotifyRequestShard>,
|
||||
}
|
||||
@@ -238,6 +270,10 @@ impl ComputeHookTenant {
|
||||
node_id: unsharded_tenant.node_id,
|
||||
}],
|
||||
stripe_size: None,
|
||||
preferred_az: unsharded_tenant
|
||||
.preferred_az
|
||||
.as_ref()
|
||||
.map(|az| az.0.clone()),
|
||||
}),
|
||||
Self::Sharded(sharded_tenant)
|
||||
if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
|
||||
@@ -253,6 +289,7 @@ impl ComputeHookTenant {
|
||||
})
|
||||
.collect(),
|
||||
stripe_size: Some(sharded_tenant.stripe_size),
|
||||
preferred_az: sharded_tenant.preferred_az.as_ref().map(|az| az.0.clone()),
|
||||
})
|
||||
}
|
||||
Self::Sharded(sharded_tenant) => {
|
||||
@@ -313,6 +350,17 @@ pub(super) struct ComputeHook {
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
/// Callers may give us a list of these when asking us to send a bulk batch
|
||||
/// of notifications in the background. This is a 'notification' in the sense of
|
||||
/// other code notifying us of a shard's status, rather than being the final notification
|
||||
/// that we send upwards to the control plane for the whole tenant.
|
||||
pub(crate) struct ShardUpdate<'a> {
|
||||
pub(crate) tenant_shard_id: TenantShardId,
|
||||
pub(crate) node_id: NodeId,
|
||||
pub(crate) stripe_size: ShardStripeSize,
|
||||
pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
|
||||
}
|
||||
|
||||
impl ComputeHook {
|
||||
pub(super) fn new(config: Config) -> Self {
|
||||
let authorization_header = config
|
||||
@@ -363,6 +411,7 @@ impl ComputeHook {
|
||||
tenant_id,
|
||||
shards,
|
||||
stripe_size,
|
||||
preferred_az: _preferred_az,
|
||||
} = reconfigure_request;
|
||||
|
||||
let compute_pageservers = shards
|
||||
@@ -503,24 +552,30 @@ impl ComputeHook {
|
||||
}
|
||||
|
||||
/// Synchronous phase: update the per-tenant state for the next intended notification
|
||||
fn notify_prepare(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
node_id: NodeId,
|
||||
stripe_size: ShardStripeSize,
|
||||
) -> MaybeSendResult {
|
||||
fn notify_prepare(&self, shard_update: ShardUpdate) -> MaybeSendResult {
|
||||
let mut state_locked = self.state.lock().unwrap();
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
let tenant_shard_id = shard_update.tenant_shard_id;
|
||||
|
||||
let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
|
||||
Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
|
||||
tenant_shard_id,
|
||||
stripe_size,
|
||||
node_id,
|
||||
)),
|
||||
Entry::Vacant(e) => {
|
||||
let ShardUpdate {
|
||||
tenant_shard_id,
|
||||
node_id,
|
||||
stripe_size,
|
||||
preferred_az,
|
||||
} = shard_update;
|
||||
e.insert(ComputeHookTenant::new(
|
||||
tenant_shard_id,
|
||||
stripe_size,
|
||||
preferred_az.map(|az| az.into_owned()),
|
||||
node_id,
|
||||
))
|
||||
}
|
||||
Entry::Occupied(e) => {
|
||||
let tenant = e.into_mut();
|
||||
tenant.update(tenant_shard_id, stripe_size, node_id);
|
||||
tenant.update(shard_update);
|
||||
tenant
|
||||
}
|
||||
};
|
||||
@@ -608,13 +663,14 @@ impl ComputeHook {
|
||||
/// if something failed.
|
||||
pub(super) fn notify_background(
|
||||
self: &Arc<Self>,
|
||||
notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
|
||||
notifications: Vec<ShardUpdate>,
|
||||
result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
|
||||
cancel: &CancellationToken,
|
||||
) {
|
||||
let mut maybe_sends = Vec::new();
|
||||
for (tenant_shard_id, node_id, stripe_size) in notifications {
|
||||
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
|
||||
for shard_update in notifications {
|
||||
let tenant_shard_id = shard_update.tenant_shard_id;
|
||||
let maybe_send_result = self.notify_prepare(shard_update);
|
||||
maybe_sends.push((tenant_shard_id, maybe_send_result))
|
||||
}
|
||||
|
||||
@@ -678,15 +734,14 @@ impl ComputeHook {
|
||||
/// periods, but we don't retry forever. The **caller** is responsible for handling failures and
|
||||
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
|
||||
/// the proper pageserver nodes for a tenant.
|
||||
#[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
|
||||
pub(super) async fn notify(
|
||||
#[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
|
||||
pub(super) async fn notify<'a>(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
node_id: NodeId,
|
||||
stripe_size: ShardStripeSize,
|
||||
shard_update: ShardUpdate<'a>,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), NotifyError> {
|
||||
let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
|
||||
let tenant_shard_id = shard_update.tenant_shard_id;
|
||||
let maybe_send_result = self.notify_prepare(shard_update);
|
||||
self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
|
||||
.await
|
||||
}
|
||||
@@ -739,6 +794,7 @@ pub(crate) mod tests {
|
||||
shard_number: ShardNumber(0),
|
||||
},
|
||||
ShardStripeSize(12345),
|
||||
None,
|
||||
NodeId(1),
|
||||
);
|
||||
|
||||
@@ -765,30 +821,32 @@ pub(crate) mod tests {
|
||||
// Writing the first shard of a multi-sharded situation (i.e. in a split)
|
||||
// resets the tenant state and puts it in an non-notifying state (need to
|
||||
// see all shards)
|
||||
tenant_state.update(
|
||||
TenantShardId {
|
||||
tenant_state.update(ShardUpdate {
|
||||
tenant_shard_id: TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount::new(2),
|
||||
shard_number: ShardNumber(1),
|
||||
},
|
||||
ShardStripeSize(32768),
|
||||
NodeId(1),
|
||||
);
|
||||
stripe_size: ShardStripeSize(32768),
|
||||
preferred_az: None,
|
||||
node_id: NodeId(1),
|
||||
});
|
||||
assert!(matches!(
|
||||
tenant_state.maybe_send(tenant_id, None),
|
||||
MaybeSendResult::Noop
|
||||
));
|
||||
|
||||
// Writing the second shard makes it ready to notify
|
||||
tenant_state.update(
|
||||
TenantShardId {
|
||||
tenant_state.update(ShardUpdate {
|
||||
tenant_shard_id: TenantShardId {
|
||||
tenant_id,
|
||||
shard_count: ShardCount::new(2),
|
||||
shard_number: ShardNumber(0),
|
||||
},
|
||||
ShardStripeSize(32768),
|
||||
NodeId(1),
|
||||
);
|
||||
stripe_size: ShardStripeSize(32768),
|
||||
preferred_az: None,
|
||||
node_id: NodeId(1),
|
||||
});
|
||||
|
||||
let send_result = tenant_state.maybe_send(tenant_id, None);
|
||||
let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
use crate::pageserver_client::PageserverClient;
|
||||
use crate::persistence::Persistence;
|
||||
use crate::service;
|
||||
use pageserver_api::controller_api::PlacementPolicy;
|
||||
use crate::{compute_hook, service};
|
||||
use pageserver_api::controller_api::{AvailabilityZone, PlacementPolicy};
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
|
||||
};
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use pageserver_client::mgmt_api;
|
||||
use reqwest::StatusCode;
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -45,6 +46,7 @@ pub(super) struct Reconciler {
|
||||
pub(crate) reconciler_config: ReconcilerConfig,
|
||||
|
||||
pub(crate) config: TenantConfig,
|
||||
pub(crate) preferred_az: Option<AvailabilityZone>,
|
||||
|
||||
/// Observed state from the point of view of the reconciler.
|
||||
/// This gets updated as the reconciliation makes progress.
|
||||
@@ -834,9 +836,12 @@ impl Reconciler {
|
||||
let result = self
|
||||
.compute_hook
|
||||
.notify(
|
||||
self.tenant_shard_id,
|
||||
node.get_id(),
|
||||
self.shard.stripe_size,
|
||||
compute_hook::ShardUpdate {
|
||||
tenant_shard_id: self.tenant_shard_id,
|
||||
node_id: node.get_id(),
|
||||
stripe_size: self.shard.stripe_size,
|
||||
preferred_az: self.preferred_az.as_ref().map(Cow::Borrowed),
|
||||
},
|
||||
&self.cancel,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -18,7 +18,7 @@ use crate::{
|
||||
background_node_operations::{
|
||||
Drain, Fill, Operation, OperationError, OperationHandler, MAX_RECONCILES_PER_OPERATION,
|
||||
},
|
||||
compute_hook::NotifyError,
|
||||
compute_hook::{self, NotifyError},
|
||||
drain_utils::{self, TenantShardDrain, TenantShardIterator},
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
|
||||
leadership::Leadership,
|
||||
@@ -656,11 +656,14 @@ impl Service {
|
||||
// emit a compute notification for this. In the case where our observed state does not
|
||||
// yet match our intent, we will eventually reconcile, and that will emit a compute notification.
|
||||
if let Some(attached_at) = tenant_shard.stably_attached() {
|
||||
compute_notifications.push((
|
||||
*tenant_shard_id,
|
||||
attached_at,
|
||||
tenant_shard.shard.stripe_size,
|
||||
));
|
||||
compute_notifications.push(compute_hook::ShardUpdate {
|
||||
tenant_shard_id: *tenant_shard_id,
|
||||
node_id: attached_at,
|
||||
stripe_size: tenant_shard.shard.stripe_size,
|
||||
preferred_az: tenant_shard
|
||||
.preferred_az()
|
||||
.map(|az| Cow::Owned(az.clone())),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4786,7 +4789,15 @@ impl Service {
|
||||
for (child_id, child_ps, stripe_size) in child_locations {
|
||||
if let Err(e) = self
|
||||
.compute_hook
|
||||
.notify(child_id, child_ps, stripe_size, &self.cancel)
|
||||
.notify(
|
||||
compute_hook::ShardUpdate {
|
||||
tenant_shard_id: child_id,
|
||||
node_id: child_ps,
|
||||
stripe_size,
|
||||
preferred_az: preferred_az_id.as_ref().map(Cow::Borrowed),
|
||||
},
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
|
||||
|
||||
@@ -1198,6 +1198,7 @@ impl TenantShard {
|
||||
detach,
|
||||
reconciler_config,
|
||||
config: self.config.clone(),
|
||||
preferred_az: self.preferred_az_id.clone(),
|
||||
observed: self.observed.clone(),
|
||||
original_observed: self.observed.clone(),
|
||||
compute_hook: compute_hook.clone(),
|
||||
|
||||
Reference in New Issue
Block a user