storage controller: robustness improvements (#7027)

## Problem


Closes: https://github.com/neondatabase/neon/issues/6847
Closes: https://github.com/neondatabase/neon/issues/7006

## Summary of changes

- Pageserver API calls are wrapped in timeout/retry logic: this prevents
a reconciler getting hung on a pageserver API hang, and prevents
reconcilers having to totally retry if one API call returns a retryable
error (e.g. 503).
- Add a cancellation token to `Node`, so that when we mark a node
offline we will cancel any API calls in progress to that node, and avoid
issuing any more API calls to that offline node.
- If the dirty locations of a shard are all on offline nodes, then don't
spawn a reconciler
- In re-attach, if we have no observed state object for a tenant then
construct one with conf: None (which means "unknown"). Then in
Reconciler, implement a TODO for scanning such locations before running,
so that we will avoid spuriously incrementing a generation in the case
of a node that was offline while we started (this is the case that
tripped up #7006)
- Refactoring: make Node contents private (and thereby guarantee that
updates to availability mode reliably update the cancellation token.)
- Refactoring: don't pass the whole map of nodes into Reconciler (and
thereby remove a bunch of .expect() calls)

Some of this was discovered/tested with a new failure injection test
that will come in a separate PR, once it is stable enough for CI.
This commit is contained in:
John Spray
2024-03-07 17:10:03 +00:00
committed by GitHub
parent 871977f14c
commit d5a6a2a16d
8 changed files with 749 additions and 389 deletions

View File

@@ -1,6 +1,16 @@
use pageserver_api::controller_api::{NodeAvailability, NodeSchedulingPolicy};
use std::{str::FromStr, time::Duration};
use hyper::StatusCode;
use pageserver_api::{
controller_api::{
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, TenantLocateResponseShard,
},
shard::TenantShardId,
};
use pageserver_client::mgmt_api;
use serde::Serialize;
use utils::id::NodeId;
use tokio_util::sync::CancellationToken;
use utils::{backoff, id::NodeId};
use crate::persistence::NodePersistence;
@@ -12,16 +22,29 @@ use crate::persistence::NodePersistence;
/// implementation of serialization on this type is only for debug dumps.
#[derive(Clone, Serialize)]
pub(crate) struct Node {
pub(crate) id: NodeId,
id: NodeId,
pub(crate) availability: NodeAvailability,
pub(crate) scheduling: NodeSchedulingPolicy,
availability: NodeAvailability,
scheduling: NodeSchedulingPolicy,
pub(crate) listen_http_addr: String,
pub(crate) listen_http_port: u16,
listen_http_addr: String,
listen_http_port: u16,
pub(crate) listen_pg_addr: String,
pub(crate) listen_pg_port: u16,
listen_pg_addr: String,
listen_pg_port: u16,
// This cancellation token means "stop any RPCs in flight to this node, and don't start
// any more". It is not related to process shutdown.
#[serde(skip)]
cancel: CancellationToken,
}
/// When updating [`Node::availability`] we use this type to indicate to the caller
/// whether/how they changed it.
pub(crate) enum AvailabilityTransition {
ToActive,
ToOffline,
Unchanged,
}
impl Node {
@@ -29,6 +52,71 @@ impl Node {
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
}
pub(crate) fn get_id(&self) -> NodeId {
self.id
}
pub(crate) fn set_scheduling(&mut self, scheduling: NodeSchedulingPolicy) {
self.scheduling = scheduling
}
/// Does this registration request match `self`? This is used when deciding whether a registration
/// request should be allowed to update an existing record with the same node ID.
pub(crate) fn registration_match(&self, register_req: &NodeRegisterRequest) -> bool {
self.id == register_req.node_id
&& self.listen_http_addr == register_req.listen_http_addr
&& self.listen_http_port == register_req.listen_http_port
&& self.listen_pg_addr == register_req.listen_pg_addr
&& self.listen_pg_port == register_req.listen_pg_port
}
/// For a shard located on this node, populate a response object
/// with this node's address information.
pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard {
TenantLocateResponseShard {
shard_id,
node_id: self.id,
listen_http_addr: self.listen_http_addr.clone(),
listen_http_port: self.listen_http_port,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port,
}
}
pub(crate) fn set_availability(
&mut self,
availability: NodeAvailability,
) -> AvailabilityTransition {
use NodeAvailability::*;
let transition = match (self.availability, availability) {
(Offline, Active) => {
// Give the node a new cancellation token, effectively resetting it to un-cancelled. Any
// users of previously-cloned copies of the node will still see the old cancellation
// state. For example, Reconcilers in flight will have to complete and be spawned
// again to realize that the node has become available.
self.cancel = CancellationToken::new();
AvailabilityTransition::ToActive
}
(Active, Offline) => {
// Fire the node's cancellation token to cancel any in-flight API requests to it
self.cancel.cancel();
AvailabilityTransition::ToOffline
}
_ => AvailabilityTransition::Unchanged,
};
self.availability = availability;
transition
}
/// Whether we may send API requests to this node.
pub(crate) fn is_available(&self) -> bool {
// When we clone a node, [`Self::availability`] is a snapshot, but [`Self::cancel`] holds
// a reference to the original Node's cancellation status. Checking both of these results
// in a "pessimistic" check where we will consider a Node instance unavailable if it was unavailable
// when we cloned it, or if the original Node instance's cancellation token was fired.
matches!(self.availability, NodeAvailability::Active) && !self.cancel.is_cancelled()
}
/// Is this node elegible to have work scheduled onto it?
pub(crate) fn may_schedule(&self) -> bool {
match self.availability {
@@ -44,6 +132,26 @@ impl Node {
}
}
pub(crate) fn new(
id: NodeId,
listen_http_addr: String,
listen_http_port: u16,
listen_pg_addr: String,
listen_pg_port: u16,
) -> Self {
Self {
id,
listen_http_addr,
listen_http_port,
listen_pg_addr,
listen_pg_port,
scheduling: NodeSchedulingPolicy::Filling,
// TODO: we shouldn't really call this Active until we've heartbeated it.
availability: NodeAvailability::Active,
cancel: CancellationToken::new(),
}
}
pub(crate) fn to_persistent(&self) -> NodePersistence {
NodePersistence {
node_id: self.id.0 as i64,
@@ -54,4 +162,96 @@ impl Node {
listen_pg_port: self.listen_pg_port as i32,
}
}
pub(crate) fn from_persistent(np: NodePersistence) -> Self {
Self {
id: NodeId(np.node_id as u64),
// At startup we consider a node offline until proven otherwise.
availability: NodeAvailability::Offline,
scheduling: NodeSchedulingPolicy::from_str(&np.scheduling_policy)
.expect("Bad scheduling policy in DB"),
listen_http_addr: np.listen_http_addr,
listen_http_port: np.listen_http_port as u16,
listen_pg_addr: np.listen_pg_addr,
listen_pg_port: np.listen_pg_port as u16,
cancel: CancellationToken::new(),
}
}
/// Wrapper for issuing requests to pageserver management API: takes care of generic
/// retry/backoff for retryable HTTP status codes.
///
/// This will return None to indicate cancellation. Cancellation may happen from
/// the cancellation token passed in, or from Self's cancellation token (i.e. node
/// going offline).
pub(crate) async fn with_client_retries<T, O, F>(
&self,
mut op: O,
jwt: &Option<String>,
warn_threshold: u32,
max_retries: u32,
timeout: Duration,
cancel: &CancellationToken,
) -> Option<mgmt_api::Result<T>>
where
O: FnMut(mgmt_api::Client) -> F,
F: std::future::Future<Output = mgmt_api::Result<T>>,
{
fn is_fatal(e: &mgmt_api::Error) -> bool {
use mgmt_api::Error::*;
match e {
ReceiveBody(_) | ReceiveErrorBody(_) => false,
ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
| ApiError(StatusCode::GATEWAY_TIMEOUT, _)
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
Cancelled => true,
}
}
backoff::retry(
|| {
let http_client = reqwest::ClientBuilder::new()
.timeout(timeout)
.build()
.expect("Failed to construct HTTP client");
let client =
mgmt_api::Client::from_client(http_client, self.base_url(), jwt.as_deref());
let node_cancel_fut = self.cancel.cancelled();
let op_fut = op(client);
async {
tokio::select! {
r = op_fut=> {r},
_ = node_cancel_fut => {
Err(mgmt_api::Error::Cancelled)
}}
}
},
is_fatal,
warn_threshold,
max_retries,
&format!(
"Call to node {} ({}:{}) management API",
self.id, self.listen_http_addr, self.listen_http_port
),
cancel,
)
.await
}
}
impl std::fmt::Display for Node {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.id, self.listen_http_addr)
}
}
impl std::fmt::Debug for Node {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.id, self.listen_http_addr)
}
}

View File

@@ -1,6 +1,5 @@
use crate::persistence::Persistence;
use crate::service;
use pageserver_api::controller_api::NodeAvailability;
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
};
@@ -28,15 +27,16 @@ pub(super) struct Reconciler {
pub(crate) shard: ShardIdentity,
pub(crate) generation: Option<Generation>,
pub(crate) intent: TargetState,
/// Nodes not referenced by [`Self::intent`], from which we should try
/// to detach this tenant shard.
pub(crate) detach: Vec<Node>,
pub(crate) config: TenantConfig,
pub(crate) observed: ObservedState,
pub(crate) service_config: service::Config,
/// A snapshot of the pageservers as they were when we were asked
/// to reconcile.
pub(crate) pageservers: Arc<HashMap<NodeId, Node>>,
/// A hook to notify the running postgres instances when we change the location
/// of a tenant. Use this via [`Self::compute_notify`] to update our failure flag
/// and guarantee eventual retries.
@@ -67,29 +67,37 @@ pub(super) struct Reconciler {
/// and the TargetState is just the instruction for a particular Reconciler run.
#[derive(Debug)]
pub(crate) struct TargetState {
pub(crate) attached: Option<NodeId>,
pub(crate) secondary: Vec<NodeId>,
pub(crate) attached: Option<Node>,
pub(crate) secondary: Vec<Node>,
}
impl TargetState {
pub(crate) fn from_intent(intent: &IntentState) -> Self {
pub(crate) fn from_intent(nodes: &HashMap<NodeId, Node>, intent: &IntentState) -> Self {
Self {
attached: *intent.get_attached(),
secondary: intent.get_secondary().clone(),
attached: intent.get_attached().map(|n| {
nodes
.get(&n)
.expect("Intent attached referenced non-existent node")
.clone()
}),
secondary: intent
.get_secondary()
.iter()
.map(|n| {
nodes
.get(n)
.expect("Intent secondary referenced non-existent node")
.clone()
})
.collect(),
}
}
fn all_pageservers(&self) -> Vec<NodeId> {
let mut result = self.secondary.clone();
if let Some(node_id) = &self.attached {
result.push(*node_id);
}
result
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum ReconcileError {
#[error(transparent)]
Remote(#[from] mgmt_api::Error),
#[error(transparent)]
Notify(#[from] NotifyError),
#[error("Cancelled")]
@@ -101,45 +109,83 @@ pub(crate) enum ReconcileError {
impl Reconciler {
async fn location_config(
&mut self,
node_id: NodeId,
node: &Node,
config: LocationConfig,
flush_ms: Option<Duration>,
lazy: bool,
) -> anyhow::Result<()> {
let node = self
.pageservers
.get(&node_id)
.expect("Pageserver may not be removed while referenced");
) -> Result<(), ReconcileError> {
self.observed
.locations
.insert(node.get_id(), ObservedStateLocation { conf: None });
// TODO: amend locations that use long-polling: they will hit this timeout.
let timeout = Duration::from_secs(25);
tracing::info!("location_config({node}) calling: {:?}", config);
let tenant_shard_id = self.tenant_shard_id;
let config_ref = &config;
match node
.with_client_retries(
|client| async move {
let config = config_ref.clone();
client
.location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
.await
},
&self.service_config.jwt_token,
1,
3,
timeout,
&self.cancel,
)
.await
{
Some(Ok(_)) => {}
Some(Err(e)) => return Err(e.into()),
None => return Err(ReconcileError::Cancel),
};
tracing::info!("location_config({node}) complete: {:?}", config);
self.observed
.locations
.insert(node.id, ObservedStateLocation { conf: None });
tracing::info!("location_config({}) calling: {:?}", node_id, config);
let client =
mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
client
.location_config(self.tenant_shard_id, config.clone(), flush_ms, lazy)
.await?;
tracing::info!("location_config({}) complete: {:?}", node_id, config);
self.observed
.locations
.insert(node.id, ObservedStateLocation { conf: Some(config) });
.insert(node.get_id(), ObservedStateLocation { conf: Some(config) });
Ok(())
}
fn get_node(&self, node_id: &NodeId) -> Option<&Node> {
if let Some(node) = self.intent.attached.as_ref() {
if node.get_id() == *node_id {
return Some(node);
}
}
if let Some(node) = self
.intent
.secondary
.iter()
.find(|n| n.get_id() == *node_id)
{
return Some(node);
}
if let Some(node) = self.detach.iter().find(|n| n.get_id() == *node_id) {
return Some(node);
}
None
}
async fn maybe_live_migrate(&mut self) -> Result<(), ReconcileError> {
let destination = if let Some(node_id) = self.intent.attached {
match self.observed.locations.get(&node_id) {
let destination = if let Some(node) = &self.intent.attached {
match self.observed.locations.get(&node.get_id()) {
Some(conf) => {
// We will do a live migration only if the intended destination is not
// currently in an attached state.
match &conf.conf {
Some(conf) if conf.mode == LocationConfigMode::Secondary => {
// Fall through to do a live migration
node_id
node
}
None | Some(_) => {
// Attached or uncertain: don't do a live migration, proceed
@@ -152,7 +198,7 @@ impl Reconciler {
None => {
// Our destination is not attached: maybe live migrate if some other
// node is currently attached. Fall through.
node_id
node
}
}
} else {
@@ -165,15 +211,13 @@ impl Reconciler {
for (node_id, state) in &self.observed.locations {
if let Some(observed_conf) = &state.conf {
if observed_conf.mode == LocationConfigMode::AttachedSingle {
let node = self
.pageservers
.get(node_id)
.expect("Nodes may not be removed while referenced");
// We will only attempt live migration if the origin is not offline: this
// avoids trying to do it while reconciling after responding to an HA failover.
if !matches!(node.availability, NodeAvailability::Offline) {
origin = Some(*node_id);
break;
if let Some(node) = self.get_node(node_id) {
if node.is_available() {
origin = Some(node.clone());
break;
}
}
}
}
@@ -186,7 +230,7 @@ impl Reconciler {
// We have an origin and a destination: proceed to do the live migration
tracing::info!("Live migrating {}->{}", origin, destination);
self.live_migrate(origin, destination).await?;
self.live_migrate(origin, destination.clone()).await?;
Ok(())
}
@@ -194,13 +238,8 @@ impl Reconciler {
async fn get_lsns(
&self,
tenant_shard_id: TenantShardId,
node_id: &NodeId,
node: &Node,
) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
let node = self
.pageservers
.get(node_id)
.expect("Pageserver may not be removed while referenced");
let client =
mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
@@ -211,19 +250,27 @@ impl Reconciler {
.collect())
}
async fn secondary_download(&self, tenant_shard_id: TenantShardId, node_id: &NodeId) {
let node = self
.pageservers
.get(node_id)
.expect("Pageserver may not be removed while referenced");
let client =
mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
match client.tenant_secondary_download(tenant_shard_id).await {
Ok(()) => {}
Err(_) => {
tracing::info!(" (skipping, destination wasn't in secondary mode)")
async fn secondary_download(
&self,
tenant_shard_id: TenantShardId,
node: &Node,
) -> Result<(), ReconcileError> {
match node
.with_client_retries(
|client| async move { client.tenant_secondary_download(tenant_shard_id).await },
&self.service_config.jwt_token,
1,
1,
Duration::from_secs(60),
&self.cancel,
)
.await
{
None => Err(ReconcileError::Cancel),
Some(Ok(_)) => Ok(()),
Some(Err(e)) => {
tracing::info!(" (skipping destination download: {})", e);
Ok(())
}
}
}
@@ -231,17 +278,14 @@ impl Reconciler {
async fn await_lsn(
&self,
tenant_shard_id: TenantShardId,
pageserver_id: &NodeId,
node: &Node,
baseline: HashMap<TimelineId, Lsn>,
) -> anyhow::Result<()> {
loop {
let latest = match self.get_lsns(tenant_shard_id, pageserver_id).await {
let latest = match self.get_lsns(tenant_shard_id, node).await {
Ok(l) => l,
Err(e) => {
println!(
"🕑 Can't get LSNs on pageserver {} yet, waiting ({e})",
pageserver_id
);
tracing::info!("🕑 Can't get LSNs on node {node} yet, waiting ({e})",);
std::thread::sleep(Duration::from_millis(500));
continue;
}
@@ -251,7 +295,7 @@ impl Reconciler {
for (timeline_id, baseline_lsn) in &baseline {
match latest.get(timeline_id) {
Some(latest_lsn) => {
println!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
tracing::info!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
if latest_lsn < baseline_lsn {
any_behind = true;
}
@@ -266,7 +310,7 @@ impl Reconciler {
}
if !any_behind {
println!("✅ LSN caught up. Proceeding...");
tracing::info!("✅ LSN caught up. Proceeding...");
break;
} else {
std::thread::sleep(Duration::from_millis(500));
@@ -278,11 +322,11 @@ impl Reconciler {
pub async fn live_migrate(
&mut self,
origin_ps_id: NodeId,
dest_ps_id: NodeId,
) -> anyhow::Result<()> {
origin_ps: Node,
dest_ps: Node,
) -> Result<(), ReconcileError> {
// `maybe_live_migrate` is responsibble for sanity of inputs
assert!(origin_ps_id != dest_ps_id);
assert!(origin_ps.get_id() != dest_ps.get_id());
fn build_location_config(
shard: &ShardIdentity,
@@ -302,10 +346,7 @@ impl Reconciler {
}
}
tracing::info!(
"🔁 Switching origin pageserver {} to stale mode",
origin_ps_id
);
tracing::info!("🔁 Switching origin node {origin_ps} to stale mode",);
// FIXME: it is incorrect to use self.generation here, we should use the generation
// from the ObservedState of the origin pageserver (it might be older than self.generation)
@@ -316,26 +357,18 @@ impl Reconciler {
self.generation,
None,
);
self.location_config(
origin_ps_id,
stale_conf,
Some(Duration::from_secs(10)),
false,
)
.await?;
self.location_config(&origin_ps, stale_conf, Some(Duration::from_secs(10)), false)
.await?;
let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps_id).await?);
let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps).await?);
// If we are migrating to a destination that has a secondary location, warm it up first
if let Some(destination_conf) = self.observed.locations.get(&dest_ps_id) {
if let Some(destination_conf) = self.observed.locations.get(&dest_ps.get_id()) {
if let Some(destination_conf) = &destination_conf.conf {
if destination_conf.mode == LocationConfigMode::Secondary {
tracing::info!(
"🔁 Downloading latest layers to destination pageserver {}",
dest_ps_id,
);
self.secondary_download(self.tenant_shard_id, &dest_ps_id)
.await;
tracing::info!("🔁 Downloading latest layers to destination node {dest_ps}",);
self.secondary_download(self.tenant_shard_id, &dest_ps)
.await?;
}
}
}
@@ -343,7 +376,7 @@ impl Reconciler {
// Increment generation before attaching to new pageserver
self.generation = Some(
self.persistence
.increment_generation(self.tenant_shard_id, dest_ps_id)
.increment_generation(self.tenant_shard_id, dest_ps.get_id())
.await?,
);
@@ -355,23 +388,23 @@ impl Reconciler {
None,
);
tracing::info!("🔁 Attaching to pageserver {}", dest_ps_id);
self.location_config(dest_ps_id, dest_conf, None, false)
tracing::info!("🔁 Attaching to pageserver {dest_ps}");
self.location_config(&dest_ps, dest_conf, None, false)
.await?;
if let Some(baseline) = baseline_lsns {
tracing::info!("🕑 Waiting for LSN to catch up...");
self.await_lsn(self.tenant_shard_id, &dest_ps_id, baseline)
self.await_lsn(self.tenant_shard_id, &dest_ps, baseline)
.await?;
}
tracing::info!("🔁 Notifying compute to use pageserver {}", dest_ps_id);
tracing::info!("🔁 Notifying compute to use pageserver {dest_ps}");
// During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach
// the origin without notifying compute, we will render the tenant unavailable.
while let Err(e) = self.compute_notify().await {
match e {
NotifyError::Fatal(_) => return Err(anyhow::anyhow!(e)),
NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)),
_ => {
tracing::warn!(
"Live migration blocked by compute notification error, retrying: {e}"
@@ -389,22 +422,19 @@ impl Reconciler {
None,
Some(LocationConfigSecondary { warm: true }),
);
self.location_config(origin_ps_id, origin_secondary_conf.clone(), None, false)
self.location_config(&origin_ps, origin_secondary_conf.clone(), None, false)
.await?;
// TODO: we should also be setting the ObservedState on earlier API calls, in case we fail
// partway through. In fact, all location conf API calls should be in a wrapper that sets
// the observed state to None, then runs, then sets it to what we wrote.
self.observed.locations.insert(
origin_ps_id,
origin_ps.get_id(),
ObservedStateLocation {
conf: Some(origin_secondary_conf),
},
);
println!(
"🔁 Switching to AttachedSingle mode on pageserver {}",
dest_ps_id
);
tracing::info!("🔁 Switching to AttachedSingle mode on node {dest_ps}",);
let dest_final_conf = build_location_config(
&self.shard,
&self.config,
@@ -412,16 +442,61 @@ impl Reconciler {
self.generation,
None,
);
self.location_config(dest_ps_id, dest_final_conf.clone(), None, false)
self.location_config(&dest_ps, dest_final_conf.clone(), None, false)
.await?;
self.observed.locations.insert(
dest_ps_id,
dest_ps.get_id(),
ObservedStateLocation {
conf: Some(dest_final_conf),
},
);
println!("✅ Migration complete");
tracing::info!("✅ Migration complete");
Ok(())
}
async fn maybe_refresh_observed(&mut self) -> Result<(), ReconcileError> {
// If the attached node has uncertain state, read it from the pageserver before proceeding: this
// is important to avoid spurious generation increments.
//
// We don't need to do this for secondary/detach locations because it's harmless to just PUT their
// location conf, whereas for attached locations it can interrupt clients if we spuriously destroy/recreate
// the `Timeline` object in the pageserver.
let Some(attached_node) = self.intent.attached.as_ref() else {
// Nothing to do
return Ok(());
};
if matches!(
self.observed.locations.get(&attached_node.get_id()),
Some(ObservedStateLocation { conf: None })
) {
let tenant_shard_id = self.tenant_shard_id;
let observed_conf = match attached_node
.with_client_retries(
|client| async move { client.get_location_config(tenant_shard_id).await },
&self.service_config.jwt_token,
1,
1,
Duration::from_secs(5),
&self.cancel,
)
.await
{
Some(Ok(observed)) => observed,
Some(Err(e)) => return Err(e.into()),
None => return Err(ReconcileError::Cancel),
};
tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
self.observed.locations.insert(
attached_node.get_id(),
ObservedStateLocation {
conf: observed_conf,
},
);
}
Ok(())
}
@@ -433,14 +508,14 @@ impl Reconciler {
/// general case reconciliation where we walk through the intent by pageserver
/// and call out to the pageserver to apply the desired state.
pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
// TODO: if any of self.observed is None, call to remote pageservers
// to learn correct state.
// Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it
self.maybe_refresh_observed().await?;
// Special case: live migration
self.maybe_live_migrate().await?;
// If the attached pageserver is not attached, do so now.
if let Some(node_id) = self.intent.attached {
if let Some(node) = self.intent.attached.as_ref() {
// If we are in an attached policy, then generation must have been set (null generations
// are only present when a tenant is initially loaded with a secondary policy)
debug_assert!(self.generation.is_some());
@@ -451,10 +526,10 @@ impl Reconciler {
};
let mut wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
match self.observed.locations.get(&node_id) {
match self.observed.locations.get(&node.get_id()) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
// Nothing to do
tracing::info!(%node_id, "Observed configuration already correct.")
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
}
observed => {
// In all cases other than a matching observed configuration, we will
@@ -492,16 +567,21 @@ impl Reconciler {
if increment_generation {
let generation = self
.persistence
.increment_generation(self.tenant_shard_id, node_id)
.increment_generation(self.tenant_shard_id, node.get_id())
.await?;
self.generation = Some(generation);
wanted_conf.generation = generation.into();
}
tracing::info!(%node_id, "Observed configuration requires update.");
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
// Because `node` comes from a ref to &self, clone it before calling into a &mut self
// function: this could be avoided by refactoring the state mutated by location_config into
// a separate type to Self.
let node = node.clone();
// Use lazy=true, because we may run many of Self concurrently, and do not want to
// overload the pageserver with logical size calculations.
self.location_config(node_id, wanted_conf, None, true)
.await?;
self.location_config(&node, wanted_conf, None, true).await?;
self.compute_notify().await?;
}
}
@@ -510,33 +590,27 @@ impl Reconciler {
// Configure secondary locations: if these were previously attached this
// implicitly downgrades them from attached to secondary.
let mut changes = Vec::new();
for node_id in &self.intent.secondary {
for node in &self.intent.secondary {
let wanted_conf = secondary_location_conf(&self.shard, &self.config);
match self.observed.locations.get(node_id) {
match self.observed.locations.get(&node.get_id()) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
// Nothing to do
tracing::info!(%node_id, "Observed configuration already correct.")
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
}
_ => {
// In all cases other than a matching observed configuration, we will
// reconcile this location.
tracing::info!(%node_id, "Observed configuration requires update.");
changes.push((*node_id, wanted_conf))
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
changes.push((node.clone(), wanted_conf))
}
}
}
// Detach any extraneous pageservers that are no longer referenced
// by our intent.
let all_pageservers = self.intent.all_pageservers();
for node_id in self.observed.locations.keys() {
if all_pageservers.contains(node_id) {
// We are only detaching pageservers that aren't used at all.
continue;
}
for node in &self.detach {
changes.push((
*node_id,
node.clone(),
LocationConfig {
mode: LocationConfigMode::Detached,
generation: None,
@@ -549,11 +623,11 @@ impl Reconciler {
));
}
for (node_id, conf) in changes {
for (node, conf) in changes {
if self.cancel.is_cancelled() {
return Err(ReconcileError::Cancel);
}
self.location_config(node_id, conf, None, false).await?;
self.location_config(&node, conf, None, false).await?;
}
Ok(())
@@ -562,12 +636,12 @@ impl Reconciler {
pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> {
// Whenever a particular Reconciler emits a notification, it is always notifying for the intended
// destination.
if let Some(node_id) = self.intent.attached {
if let Some(node) = &self.intent.attached {
let result = self
.compute_hook
.notify(
self.tenant_shard_id,
node_id,
node.get_id(),
self.shard.stripe_size,
&self.cancel,
)
@@ -576,7 +650,7 @@ impl Reconciler {
// It is up to the caller whether they want to drop out on this error, but they don't have to:
// in general we should avoid letting unavailability of the cloud control plane stop us from
// making progress.
tracing::warn!("Failed to notify compute of attached pageserver {node_id}: {e}");
tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
// Set this flag so that in our ReconcileResult we will set the flag on the shard that it
// needs to retry at some point.
self.compute_notify_failure = true;

View File

@@ -43,7 +43,7 @@ impl Scheduler {
let mut scheduler_nodes = HashMap::new();
for node in nodes {
scheduler_nodes.insert(
node.id,
node.get_id(),
SchedulerNode {
shard_count: 0,
may_schedule: node.may_schedule(),
@@ -68,7 +68,7 @@ impl Scheduler {
let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
for node in nodes {
expect_nodes.insert(
node.id,
node.get_id(),
SchedulerNode {
shard_count: 0,
may_schedule: node.may_schedule(),
@@ -156,7 +156,7 @@ impl Scheduler {
pub(crate) fn node_upsert(&mut self, node: &Node) {
use std::collections::hash_map::Entry::*;
match self.nodes.entry(node.id) {
match self.nodes.entry(node.get_id()) {
Occupied(mut entry) => {
entry.get_mut().may_schedule = node.may_schedule();
}
@@ -255,7 +255,6 @@ impl Scheduler {
pub(crate) mod test_utils {
use crate::node::Node;
use pageserver_api::controller_api::{NodeAvailability, NodeSchedulingPolicy};
use std::collections::HashMap;
use utils::id::NodeId;
/// Test helper: synthesize the requested number of nodes, all in active state.
@@ -264,18 +263,17 @@ pub(crate) mod test_utils {
pub(crate) fn make_test_nodes(n: u64) -> HashMap<NodeId, Node> {
(1..n + 1)
.map(|i| {
(
NodeId(i),
Node {
id: NodeId(i),
availability: NodeAvailability::Active,
scheduling: NodeSchedulingPolicy::Active,
listen_http_addr: format!("httphost-{i}"),
listen_http_port: 80 + i as u16,
listen_pg_addr: format!("pghost-{i}"),
listen_pg_port: 5432 + i as u16,
},
)
(NodeId(i), {
let node = Node::new(
NodeId(i),
format!("httphost-{i}"),
80 + i as u16,
format!("pghost-{i}"),
5432 + i as u16,
);
assert!(node.is_available());
node
})
})
.collect()
}

View File

@@ -16,9 +16,9 @@ use futures::{stream::FuturesUnordered, StreamExt};
use hyper::StatusCode;
use pageserver_api::{
controller_api::{
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy,
TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse,
TenantLocateResponseShard, TenantShardMigrateRequest, TenantShardMigrateResponse,
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantCreateResponse,
TenantCreateResponseShard, TenantLocateResponse, TenantShardMigrateRequest,
TenantShardMigrateResponse,
},
models::TenantConfigRequest,
};
@@ -39,7 +39,6 @@ use pageserver_client::mgmt_api;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use utils::{
backoff,
completion::Barrier,
generation::Generation,
http::error::ApiError,
@@ -50,7 +49,7 @@ use utils::{
use crate::{
compute_hook::{self, ComputeHook},
node::Node,
node::{AvailabilityTransition, Node},
persistence::{split_state::SplitState, DatabaseError, Persistence, TenantShardPersistence},
reconciler::attached_location_conf,
scheduler::Scheduler,
@@ -201,7 +200,8 @@ impl Service {
async fn startup_reconcile(self: &Arc<Service>) {
// For all tenant shards, a vector of observed states on nodes (where None means
// indeterminate, same as in [`ObservedStateLocation`])
let mut observed = HashMap::new();
let mut observed: HashMap<TenantShardId, Vec<(NodeId, Option<LocationConfig>)>> =
HashMap::new();
let mut nodes_online = HashSet::new();
@@ -236,7 +236,8 @@ impl Service {
nodes_online.insert(node_id);
for (tenant_shard_id, conf_opt) in tenant_shards {
observed.insert(tenant_shard_id, (node_id, conf_opt));
let shard_observations = observed.entry(tenant_shard_id).or_default();
shard_observations.push((node_id, conf_opt));
}
}
@@ -252,27 +253,28 @@ impl Service {
let mut new_nodes = (**nodes).clone();
for (node_id, node) in new_nodes.iter_mut() {
if nodes_online.contains(node_id) {
node.availability = NodeAvailability::Active;
node.set_availability(NodeAvailability::Active);
scheduler.node_upsert(node);
}
}
*nodes = Arc::new(new_nodes);
for (tenant_shard_id, (node_id, observed_loc)) in observed {
let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else {
cleanup.push((tenant_shard_id, node_id));
continue;
};
tenant_state
.observed
.locations
.insert(node_id, ObservedStateLocation { conf: observed_loc });
for (tenant_shard_id, shard_observations) in observed {
for (node_id, observed_loc) in shard_observations {
let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else {
cleanup.push((tenant_shard_id, node_id));
continue;
};
tenant_state
.observed
.locations
.insert(node_id, ObservedStateLocation { conf: observed_loc });
}
}
// Populate each tenant's intent state
for (tenant_shard_id, tenant_state) in tenants.iter_mut() {
tenant_state.intent_from_observed();
tenant_state.intent_from_observed(scheduler);
if let Err(e) = tenant_state.schedule(scheduler) {
// Non-fatal error: we are unable to properly schedule the tenant, perhaps because
// not enough pageservers are available. The tenant may well still be available
@@ -359,40 +361,19 @@ impl Service {
for node in nodes.values() {
node_list_futs.push({
async move {
let http_client = reqwest::ClientBuilder::new()
.timeout(Duration::from_secs(5))
.build()
.expect("Failed to construct HTTP client");
let client = mgmt_api::Client::from_client(
http_client,
node.base_url(),
self.config.jwt_token.as_deref(),
);
fn is_fatal(e: &mgmt_api::Error) -> bool {
use mgmt_api::Error::*;
match e {
ReceiveBody(_) | ReceiveErrorBody(_) => false,
ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
| ApiError(StatusCode::GATEWAY_TIMEOUT, _)
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
ApiError(_, _) => true,
}
}
tracing::info!("Scanning shards on node {}...", node.id);
let description = format!("List locations on {}", node.id);
let response = backoff::retry(
|| client.list_location_config(),
is_fatal,
1,
5,
&description,
&self.cancel,
)
.await;
(node.id, response)
tracing::info!("Scanning shards on node {node}...");
let timeout = Duration::from_secs(5);
let response = node
.with_client_retries(
|client| async move { client.list_location_config().await },
&self.config.jwt_token,
1,
5,
timeout,
&self.cancel,
)
.await;
(node.get_id(), response)
}
});
}
@@ -662,19 +643,9 @@ impl Service {
.list_nodes()
.await?
.into_iter()
.map(|n| Node {
id: NodeId(n.node_id as u64),
// At startup we consider a node offline until proven otherwise.
availability: NodeAvailability::Offline,
scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
.expect("Bad scheduling policy in DB"),
listen_http_addr: n.listen_http_addr,
listen_http_port: n.listen_http_port as u16,
listen_pg_addr: n.listen_pg_addr,
listen_pg_port: n.listen_pg_port as u16,
})
.map(Node::from_persistent)
.collect::<Vec<_>>();
let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.id, n)).collect();
let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.get_id(), n)).collect();
tracing::info!("Loaded {} nodes from database.", nodes.len());
tracing::info!("Loading shards from database...");
@@ -701,15 +672,13 @@ impl Service {
}
for node_id in node_ids {
tracing::info!("Creating node {} in scheduler for tests", node_id);
let node = Node {
id: NodeId(node_id as u64),
availability: NodeAvailability::Active,
scheduling: NodeSchedulingPolicy::Active,
listen_http_addr: "".to_string(),
listen_http_port: 123,
listen_pg_addr: "".to_string(),
listen_pg_port: 123,
};
let node = Node::new(
NodeId(node_id as u64),
"".to_string(),
123,
"".to_string(),
123,
);
scheduler.node_upsert(&node);
}
@@ -975,6 +944,12 @@ impl Service {
// Ordering: we must persist generation number updates before making them visible in the in-memory state
let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;
tracing::info!(
node_id=%reattach_req.node_id,
"Incremented {} tenant shards' generations",
incremented_generations.len()
);
// Apply the updated generation to our in-memory state
let mut locked = self.inner.write().unwrap();
@@ -987,7 +962,6 @@ impl Service {
id: tenant_shard_id,
gen: new_gen.into().unwrap(),
});
// Apply the new generation number to our in-memory state
let shard_state = locked.tenants.get_mut(&tenant_shard_id);
let Some(shard_state) = shard_state else {
@@ -1023,6 +997,14 @@ impl Service {
if let Some(conf) = observed.conf.as_mut() {
conf.generation = new_gen.into();
}
} else {
// This node has no observed state for the shard: perhaps it was offline
// when the pageserver restarted. Insert a None, so that the Reconciler
// will be prompted to learn the location's state before it makes changes.
shard_state
.observed
.locations
.insert(reattach_req.node_id, ObservedStateLocation { conf: None });
}
// TODO: cancel/restart any running reconciliation for this tenant, it might be trying
@@ -1685,7 +1667,7 @@ impl Service {
.map_err(|e| {
ApiError::InternalServerError(anyhow::anyhow!(
"Error doing time travel recovery for shard {tenant_shard_id} on node {}: {e}",
node.id
node
))
})?;
}
@@ -1739,10 +1721,7 @@ impl Service {
// Secondary downloads are always advisory: if something fails, we nevertheless report success, so that whoever
// is calling us will proceed with whatever migration they're doing, albeit with a slightly less warm cache
// than they had hoped for.
tracing::warn!(
"Ignoring tenant secondary download error from pageserver {}: {e}",
node.id,
);
tracing::warn!("Ignoring tenant secondary download error from pageserver {node}: {e}",);
}
Ok(())
@@ -1780,13 +1759,11 @@ impl Service {
// surface immediately as an error to our caller.
let status = client.tenant_delete(tenant_shard_id).await.map_err(|e| {
ApiError::InternalServerError(anyhow::anyhow!(
"Error deleting shard {tenant_shard_id} on node {}: {e}",
node.id
"Error deleting shard {tenant_shard_id} on node {node}: {e}",
))
})?;
tracing::info!(
"Shard {tenant_shard_id} on node {}, delete returned {}",
node.id,
"Shard {tenant_shard_id} on node {node}, delete returned {}",
status
);
if status == StatusCode::ACCEPTED {
@@ -1885,10 +1862,9 @@ impl Service {
create_req: TimelineCreateRequest,
) -> Result<TimelineInfo, ApiError> {
tracing::info!(
"Creating timeline on shard {}/{}, attached to node {}",
"Creating timeline on shard {}/{}, attached to node {node}",
tenant_shard_id,
create_req.new_timeline_id,
node.id
);
let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
@@ -2012,10 +1988,7 @@ impl Service {
jwt: Option<String>,
) -> Result<StatusCode, ApiError> {
tracing::info!(
"Deleting timeline on shard {}/{}, attached to node {}",
tenant_shard_id,
timeline_id,
node.id
"Deleting timeline on shard {tenant_shard_id}/{timeline_id}, attached to node {node}",
);
let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
@@ -2024,8 +1997,7 @@ impl Service {
.await
.map_err(|e| {
ApiError::InternalServerError(anyhow::anyhow!(
"Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}",
node.id
"Error deleting timeline {timeline_id} on {tenant_shard_id} on node {node}: {e}",
))
})
}
@@ -2126,14 +2098,7 @@ impl Service {
.get(&node_id)
.expect("Pageservers may not be deleted while referenced");
result.push(TenantLocateResponseShard {
shard_id: *tenant_shard_id,
node_id,
listen_http_addr: node.listen_http_addr.clone(),
listen_http_port: node.listen_http_port,
listen_pg_addr: node.listen_pg_addr.clone(),
listen_pg_port: node.listen_pg_port,
});
result.push(node.shard_location(*tenant_shard_id));
match &shard_params {
None => {
@@ -2324,7 +2289,7 @@ impl Service {
// populate the correct generation as part of its transaction, to protect us
// against racing with changes in the state of the parent.
generation: None,
generation_pageserver: Some(target.node.id.0 as i64),
generation_pageserver: Some(target.node.get_id().0 as i64),
placement_policy: serde_json::to_string(&policy).unwrap(),
// TODO: get the config out of the map
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
@@ -2526,10 +2491,10 @@ impl Service {
)));
};
if node.availability != NodeAvailability::Active {
if !node.is_available() {
// Warn but proceed: the caller may intend to manually adjust the placement of
// a shard even if the node is down, e.g. if intervening during an incident.
tracing::warn!("Migrating to an unavailable node ({})", node.id);
tracing::warn!("Migrating to unavailable node {node}");
}
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
@@ -2784,11 +2749,7 @@ impl Service {
if let Some(node) = locked.nodes.get(&register_req.node_id) {
// Note that we do not do a total equality of the struct, because we don't require
// the availability/scheduling states to agree for a POST to be idempotent.
if node.listen_http_addr == register_req.listen_http_addr
&& node.listen_http_port == register_req.listen_http_port
&& node.listen_pg_addr == register_req.listen_pg_addr
&& node.listen_pg_port == register_req.listen_pg_port
{
if node.registration_match(&register_req) {
tracing::info!(
"Node {} re-registered with matching address",
register_req.node_id
@@ -2812,16 +2773,14 @@ impl Service {
// Ordering: we must persist the new node _before_ adding it to in-memory state.
// This ensures that before we use it for anything or expose it via any external
// API, it is guaranteed to be available after a restart.
let new_node = Node {
id: register_req.node_id,
listen_http_addr: register_req.listen_http_addr,
listen_http_port: register_req.listen_http_port,
listen_pg_addr: register_req.listen_pg_addr,
listen_pg_port: register_req.listen_pg_port,
scheduling: NodeSchedulingPolicy::Filling,
// TODO: we shouldn't really call this Active until we've heartbeated it.
availability: NodeAvailability::Active,
};
let new_node = Node::new(
register_req.node_id,
register_req.listen_http_addr,
register_req.listen_http_port,
register_req.listen_pg_addr,
register_req.listen_pg_port,
);
// TODO: idempotency if the node already exists in the database
self.persistence.insert_node(&new_node).await?;
@@ -2866,29 +2825,14 @@ impl Service {
));
};
let mut offline_transition = false;
let mut active_transition = false;
if let Some(availability) = &config_req.availability {
match (availability, &node.availability) {
(NodeAvailability::Offline, NodeAvailability::Active) => {
tracing::info!("Node {} transition to offline", config_req.node_id);
offline_transition = true;
}
(NodeAvailability::Active, NodeAvailability::Offline) => {
tracing::info!("Node {} transition to active", config_req.node_id);
active_transition = true;
}
_ => {
tracing::info!("Node {} no change during config", config_req.node_id);
// No change
}
};
node.availability = *availability;
}
let availability_transition = if let Some(availability) = &config_req.availability {
node.set_availability(*availability)
} else {
AvailabilityTransition::Unchanged
};
if let Some(scheduling) = config_req.scheduling {
node.scheduling = scheduling;
node.set_scheduling(scheduling);
// TODO: once we have a background scheduling ticker for fill/drain, kick it
// to wake up and start working.
@@ -2899,74 +2843,80 @@ impl Service {
let new_nodes = Arc::new(new_nodes);
if offline_transition {
let mut tenants_affected: usize = 0;
for (tenant_shard_id, tenant_state) in tenants {
if let Some(observed_loc) =
tenant_state.observed.locations.get_mut(&config_req.node_id)
{
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
// not assume our knowledge of the node's configuration is accurate until it comes back online
observed_loc.conf = None;
}
match availability_transition {
AvailabilityTransition::ToOffline => {
tracing::info!("Node {} transition to offline", config_req.node_id);
let mut tenants_affected: usize = 0;
for (tenant_shard_id, tenant_state) in tenants {
if let Some(observed_loc) =
tenant_state.observed.locations.get_mut(&config_req.node_id)
{
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
// not assume our knowledge of the node's configuration is accurate until it comes back online
observed_loc.conf = None;
}
if tenant_state.intent.demote_attached(config_req.node_id) {
tenant_state.sequence = tenant_state.sequence.next();
match tenant_state.schedule(scheduler) {
Err(e) => {
// It is possible that some tenants will become unschedulable when too many pageservers
// go offline: in this case there isn't much we can do other than make the issue observable.
// TODO: give TenantState a scheduling error attribute to be queried later.
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id);
}
Ok(()) => {
if tenant_state
.maybe_reconcile(
result_tx.clone(),
&new_nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
)
.is_some()
{
tenants_affected += 1;
};
if tenant_state.intent.demote_attached(config_req.node_id) {
tenant_state.sequence = tenant_state.sequence.next();
match tenant_state.schedule(scheduler) {
Err(e) => {
// It is possible that some tenants will become unschedulable when too many pageservers
// go offline: in this case there isn't much we can do other than make the issue observable.
// TODO: give TenantState a scheduling error attribute to be queried later.
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id);
}
Ok(()) => {
if tenant_state
.maybe_reconcile(
result_tx.clone(),
&new_nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
)
.is_some()
{
tenants_affected += 1;
};
}
}
}
}
tracing::info!(
"Launched {} reconciler tasks for tenants affected by node {} going offline",
tenants_affected,
config_req.node_id
)
}
tracing::info!(
"Launched {} reconciler tasks for tenants affected by node {} going offline",
tenants_affected,
config_req.node_id
)
}
if active_transition {
// When a node comes back online, we must reconcile any tenant that has a None observed
// location on the node.
for tenant_state in locked.tenants.values_mut() {
if let Some(observed_loc) =
tenant_state.observed.locations.get_mut(&config_req.node_id)
{
if observed_loc.conf.is_none() {
tenant_state.maybe_reconcile(
result_tx.clone(),
&new_nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
);
AvailabilityTransition::ToActive => {
tracing::info!("Node {} transition to active", config_req.node_id);
// When a node comes back online, we must reconcile any tenant that has a None observed
// location on the node.
for tenant_state in locked.tenants.values_mut() {
if let Some(observed_loc) =
tenant_state.observed.locations.get_mut(&config_req.node_id)
{
if observed_loc.conf.is_none() {
tenant_state.maybe_reconcile(
result_tx.clone(),
&new_nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
);
}
}
}
}
// TODO: in the background, we should balance work back onto this pageserver
// TODO: in the background, we should balance work back onto this pageserver
}
AvailabilityTransition::Unchanged => {
tracing::info!("Node {} no change during config", config_req.node_id);
}
}
locked.nodes = new_nodes;

View File

@@ -1,7 +1,10 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use crate::{metrics, persistence::TenantShardPersistence};
use pageserver_api::controller_api::NodeAvailability;
use pageserver_api::{
models::{LocationConfig, LocationConfigMode, TenantConfig},
shard::{ShardIdentity, TenantShardId},
@@ -370,7 +373,7 @@ impl TenantState {
/// [`ObservedState`], even if it violates my [`PlacementPolicy`]. Call [`Self::schedule`] next,
/// to get an intent state that complies with placement policy. The overall goal is to do scheduling
/// in a way that makes use of any configured locations that already exist in the outside world.
pub(crate) fn intent_from_observed(&mut self) {
pub(crate) fn intent_from_observed(&mut self, scheduler: &mut Scheduler) {
// Choose an attached location by filtering observed locations, and then sorting to get the highest
// generation
let mut attached_locs = self
@@ -395,7 +398,7 @@ impl TenantState {
attached_locs.sort_by_key(|i| i.1);
if let Some((node_id, _gen)) = attached_locs.into_iter().last() {
self.intent.attached = Some(*node_id);
self.intent.set_attached(scheduler, Some(*node_id));
}
// All remaining observed locations generate secondary intents. This includes None
@@ -406,7 +409,7 @@ impl TenantState {
// will take care of promoting one of these secondaries to be attached.
self.observed.locations.keys().for_each(|node_id| {
if Some(*node_id) != self.intent.attached {
self.intent.secondary.push(*node_id);
self.intent.push_secondary(scheduler, *node_id);
}
});
}
@@ -564,7 +567,9 @@ impl TenantState {
}
}
fn dirty(&self) -> bool {
fn dirty(&self, nodes: &Arc<HashMap<NodeId, Node>>) -> bool {
let mut dirty_nodes = HashSet::new();
if let Some(node_id) = self.intent.attached {
// Maybe panic: it is a severe bug if we try to attach while generation is null.
let generation = self
@@ -575,7 +580,7 @@ impl TenantState {
match self.observed.locations.get(&node_id) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
Some(_) | None => {
return true;
dirty_nodes.insert(node_id);
}
}
}
@@ -585,7 +590,7 @@ impl TenantState {
match self.observed.locations.get(node_id) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
Some(_) | None => {
return true;
dirty_nodes.insert(*node_id);
}
}
}
@@ -593,17 +598,18 @@ impl TenantState {
for node_id in self.observed.locations.keys() {
if self.intent.attached != Some(*node_id) && !self.intent.secondary.contains(node_id) {
// We have observed state that isn't part of our intent: need to clean it up.
return true;
dirty_nodes.insert(*node_id);
}
}
// Even if there is no pageserver work to be done, if we have a pending notification to computes,
// wake up a reconciler to send it.
if self.pending_compute_notification {
return true;
}
dirty_nodes.retain(|node_id| {
nodes
.get(node_id)
.map(|n| n.is_available())
.unwrap_or(false)
});
false
!dirty_nodes.is_empty()
}
#[allow(clippy::too_many_arguments)]
@@ -625,15 +631,20 @@ impl TenantState {
let node = pageservers
.get(node_id)
.expect("Nodes may not be removed while referenced");
if observed_loc.conf.is_none()
&& !matches!(node.availability, NodeAvailability::Offline)
{
if observed_loc.conf.is_none() && node.is_available() {
dirty_observed = true;
break;
}
}
if !self.dirty() && !dirty_observed {
let active_nodes_dirty = self.dirty(pageservers);
// Even if there is no pageserver work to be done, if we have a pending notification to computes,
// wake up a reconciler to send it.
let do_reconcile =
active_nodes_dirty || dirty_observed || self.pending_compute_notification;
if !do_reconcile {
tracing::info!("Not dirty, no reconciliation needed.");
return None;
}
@@ -663,6 +674,21 @@ impl TenantState {
}
}
// Build list of nodes from which the reconciler should detach
let mut detach = Vec::new();
for node_id in self.observed.locations.keys() {
if self.intent.get_attached() != &Some(*node_id)
&& !self.intent.secondary.contains(node_id)
{
detach.push(
pageservers
.get(node_id)
.expect("Intent references non-existent pageserver")
.clone(),
)
}
}
// Reconcile in flight for a stale sequence? Our sequence's task will wait for it before
// doing our sequence's work.
let old_handle = self.reconciler.take();
@@ -677,14 +703,15 @@ impl TenantState {
self.sequence = self.sequence.next();
let reconciler_cancel = cancel.child_token();
let reconciler_intent = TargetState::from_intent(pageservers, &self.intent);
let mut reconciler = Reconciler {
tenant_shard_id: self.tenant_shard_id,
shard: self.shard,
generation: self.generation,
intent: TargetState::from_intent(&self.intent),
intent: reconciler_intent,
detach,
config: self.config.clone(),
observed: self.observed.clone(),
pageservers: pageservers.clone(),
compute_hook: compute_hook.clone(),
service_config: service_config.clone(),
_gate_guard: gate_guard,
@@ -819,7 +846,10 @@ impl TenantState {
#[cfg(test)]
pub(crate) mod tests {
use pageserver_api::shard::{ShardCount, ShardNumber};
use pageserver_api::{
controller_api::NodeAvailability,
shard::{ShardCount, ShardNumber},
};
use utils::id::TenantId;
use crate::scheduler::test_utils::make_test_nodes;
@@ -878,7 +908,10 @@ pub(crate) mod tests {
assert_eq!(tenant_state.intent.secondary.len(), 2);
// Update the scheduler state to indicate the node is offline
nodes.get_mut(&attached_node_id).unwrap().availability = NodeAvailability::Offline;
nodes
.get_mut(&attached_node_id)
.unwrap()
.set_availability(NodeAvailability::Offline);
scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
// Scheduling the node should promote the still-available secondary node to attached
@@ -897,4 +930,54 @@ pub(crate) mod tests {
Ok(())
}
#[test]
fn intent_from_observed() -> anyhow::Result<()> {
let nodes = make_test_nodes(3);
let mut scheduler = Scheduler::new(nodes.values());
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Double(1));
tenant_state.observed.locations.insert(
NodeId(3),
ObservedStateLocation {
conf: Some(LocationConfig {
mode: LocationConfigMode::AttachedMulti,
generation: Some(2),
secondary_conf: None,
shard_number: tenant_state.shard.number.0,
shard_count: tenant_state.shard.count.literal(),
shard_stripe_size: tenant_state.shard.stripe_size.0,
tenant_conf: TenantConfig::default(),
}),
},
);
tenant_state.observed.locations.insert(
NodeId(2),
ObservedStateLocation {
conf: Some(LocationConfig {
mode: LocationConfigMode::AttachedStale,
generation: Some(1),
secondary_conf: None,
shard_number: tenant_state.shard.number.0,
shard_count: tenant_state.shard.count.literal(),
shard_stripe_size: tenant_state.shard.stripe_size.0,
tenant_conf: TenantConfig::default(),
}),
},
);
tenant_state.intent_from_observed(&mut scheduler);
// The highest generationed attached location gets used as attached
assert_eq!(tenant_state.intent.attached, Some(NodeId(3)));
// Other locations get used as secondary
assert_eq!(tenant_state.intent.secondary, vec![NodeId(2)]);
scheduler.consistency_check(nodes.values(), [&tenant_state].into_iter())?;
tenant_state.intent.clear(&mut scheduler);
Ok(())
}
}

View File

@@ -7,7 +7,7 @@ use utils::{
pub mod util;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Client {
mgmt_api_endpoint: String,
authorization_header: Option<String>,
@@ -24,6 +24,9 @@ pub enum Error {
#[error("pageserver API: {1}")]
ApiError(StatusCode, String),
#[error("Cancelled")]
Cancelled,
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -287,6 +290,21 @@ impl Client {
.map_err(Error::ReceiveBody)
}
pub async fn get_location_config(
&self,
tenant_shard_id: TenantShardId,
) -> Result<Option<LocationConfig>> {
let path = format!(
"{}/v1/location_config/{tenant_shard_id}",
self.mgmt_api_endpoint
);
self.request(Method::GET, &path, ())
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn timeline_create(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -14,6 +14,7 @@ use hyper::header;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::LocationConfig;
use pageserver_api::models::LocationConfigListResponse;
use pageserver_api::models::ShardParameters;
use pageserver_api::models::TenantDetails;
@@ -1519,6 +1520,29 @@ async fn list_location_config_handler(
json_response(StatusCode::OK, result)
}
async fn get_location_config_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let state = get_state(&request);
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let slot = state.tenant_manager.get(tenant_shard_id);
let Some(slot) = slot else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant shard not found").into(),
));
};
let result: Option<LocationConfig> = match slot {
TenantSlot::Attached(t) => Some(t.get_location_conf()),
TenantSlot::Secondary(s) => Some(s.get_location_conf()),
TenantSlot::InProgress(_) => None,
};
json_response(StatusCode::OK, result)
}
// Do a time travel recovery on the given tenant/tenant shard. Tenant needs to be detached
// (from all pageservers) as it invalidates consistency assumptions.
async fn tenant_time_travel_remote_storage_handler(
@@ -2223,6 +2247,9 @@ pub fn make_router(
.get("/v1/location_config", |r| {
api_handler(r, list_location_config_handler)
})
.get("/v1/location_config/:tenant_id", |r| {
api_handler(r, get_location_config_handler)
})
.put(
"/v1/tenant/:tenant_shard_id/time_travel_remote_storage",
|r| api_handler(r, tenant_time_travel_remote_storage_handler),

View File

@@ -1358,6 +1358,16 @@ impl TenantManager {
}
}
pub(crate) fn get(&self, tenant_shard_id: TenantShardId) -> Option<TenantSlot> {
let locked = self.tenants.read().unwrap();
match &*locked {
TenantsMap::Initializing => None,
TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
map.get(&tenant_shard_id).cloned()
}
}
}
pub(crate) async fn delete_tenant(
&self,
tenant_shard_id: TenantShardId,