Compare commits

...

8 Commits

Author SHA1 Message Date
Vlad Lazar
71ff8f2433 storcon: implement graceful leader cutover 2024-07-30 17:58:18 +01:00
Vlad Lazar
56c43c4fae storcon: add start-up sequence utilities 2024-07-30 17:58:17 +01:00
Vlad Lazar
4187657082 storcon: refactor building of observed state at start-up 2024-07-30 17:57:09 +01:00
Vlad Lazar
b690ba5838 storcon: decouple initial heartbeat round from location listing 2024-07-30 17:57:09 +01:00
Vlad Lazar
dd7cafdd97 storcon: add storage controller peer client 2024-07-30 17:57:08 +01:00
Vlad Lazar
c501a10612 storcon: gate starting-up as candidate behind a flag 2024-07-30 17:56:30 +01:00
Vlad Lazar
1fdbef9a44 storcon/persistence: add leader table primitives 2024-07-30 17:56:28 +01:00
Vlad Lazar
3ad1221e55 storcon/diesel: add leader table 2024-07-30 17:54:02 +01:00
10 changed files with 433 additions and 44 deletions

13
Cargo.lock generated
View File

@@ -1744,6 +1744,18 @@ dependencies = [
"const-random",
]
[[package]]
name = "dns-lookup"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc"
dependencies = [
"cfg-if",
"libc",
"socket2 0.5.5",
"windows-sys 0.48.0",
]
[[package]]
name = "dsl_auto_type"
version = "0.1.1"
@@ -5724,6 +5736,7 @@ dependencies = [
"control_plane",
"diesel",
"diesel_migrations",
"dns-lookup",
"fail",
"futures",
"git-version",

View File

@@ -53,6 +53,7 @@ diesel = { version = "2.1.4", features = [
] }
diesel_migrations = { version = "2.1.0" }
r2d2 = { version = "0.8.10" }
dns-lookup = { version = "2.0.4" }
utils = { path = "../libs/utils/" }
metrics = { path = "../libs/metrics/" }

View File

@@ -0,0 +1 @@
DROP TABLE leader;

View File

@@ -0,0 +1,6 @@
CREATE TABLE leader (
hostname VARCHAR NOT NULL,
port INTEGER NOT NULL,
started_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY(hostname, port, started_at)
);

View File

@@ -10,6 +10,7 @@ mod id_lock_map;
pub mod metrics;
mod node;
mod pageserver_client;
mod peer_client;
pub mod persistence;
mod reconciler;
mod scheduler;

View File

@@ -81,6 +81,9 @@ struct Cli {
#[arg(long, default_value = "5s")]
db_connect_timeout: humantime::Duration,
#[arg(long, default_value = "false")]
start_as_candidate: bool,
/// `neon_local` sets this to the path of the neon_local repo dir.
/// Only relevant for testing.
// TODO: make `cfg(feature = "testing")`
@@ -273,6 +276,8 @@ async fn async_main() -> anyhow::Result<()> {
.unwrap_or(RECONCILER_CONCURRENCY_DEFAULT),
split_threshold: args.split_threshold,
neon_local_repo_dir: args.neon_local_repo_dir,
start_as_candidate: args.start_as_candidate,
http_service_port: args.listen.port() as i32,
};
// After loading secrets & config, but before starting anything else, apply database migrations

View File

@@ -0,0 +1,104 @@
use crate::tenant_shard::ObservedState;
use pageserver_api::shard::TenantShardId;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio_util::sync::CancellationToken;
use reqwest::{StatusCode, Url};
use utils::{backoff, http::error::HttpErrorBody};
#[derive(Debug, Clone)]
pub(crate) struct PeerClient {
hostname: String,
port: i32,
jwt: Option<String>,
client: reqwest::Client,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum StorageControllerPeerError {
#[error("failed to deserialize error response with status code {0} at {1}: {2}")]
DeserializationError(StatusCode, Url, reqwest::Error),
#[error("storage controller peer API error ({0}): {1}")]
ApiError(StatusCode, String),
#[error("failed to send HTTP request: {0}")]
SendError(reqwest::Error),
#[error("Cancelled")]
Cancelled,
}
pub(crate) type Result<T> = std::result::Result<T, StorageControllerPeerError>;
pub(crate) trait ResponseErrorMessageExt: Sized {
fn error_from_body(self) -> impl std::future::Future<Output = Result<Self>> + Send;
}
impl ResponseErrorMessageExt for reqwest::Response {
async fn error_from_body(self) -> Result<Self> {
let status = self.status();
if !(status.is_client_error() || status.is_server_error()) {
return Ok(self);
}
let url = self.url().to_owned();
Err(match self.json::<HttpErrorBody>().await {
Ok(HttpErrorBody { msg }) => StorageControllerPeerError::ApiError(status, msg),
Err(err) => StorageControllerPeerError::DeserializationError(status, url, err),
})
}
}
#[derive(Serialize, Deserialize, Debug, Default)]
pub(crate) struct GlobalObservedState(pub(crate) HashMap<TenantShardId, ObservedState>);
impl PeerClient {
pub(crate) fn new(hostname: String, port: i32, jwt: Option<String>) -> Self {
Self {
hostname,
port,
jwt,
client: reqwest::Client::new(),
}
}
async fn request_step_down(&self) -> Result<GlobalObservedState> {
let uri = format!("{}:{}/control/v1/step_down", self.hostname, self.port);
let req = self.client.put(uri);
let req = if let Some(jwt) = &self.jwt {
req.header(reqwest::header::AUTHORIZATION, format!("Bearer {jwt}"))
} else {
req
};
let res = req
.send()
.await
.map_err(StorageControllerPeerError::SendError)?;
let response = res.error_from_body().await?;
let status = response.status();
let url = response.url().to_owned();
response
.json()
.await
.map_err(|err| StorageControllerPeerError::DeserializationError(status, url, err))
}
pub(crate) async fn step_down(
&self,
cancel: &CancellationToken,
) -> Result<GlobalObservedState> {
backoff::retry(
|| self.request_step_down(),
|_e| false,
4,
8,
"Send step down request",
cancel,
)
.await
.ok_or_else(|| StorageControllerPeerError::Cancelled)
.and_then(|x| x)
}
}

View File

@@ -95,6 +95,8 @@ pub(crate) enum DatabaseOperation {
ListMetadataHealth,
ListMetadataHealthUnhealthy,
ListMetadataHealthOutdated,
GetLeader,
UpdateLeader,
}
#[must_use]
@@ -785,6 +787,71 @@ impl Persistence {
)
.await
}
/// Get the current entry from the `leader` table if one exists.
/// It is an error for the table to contain more than one entry.
pub(crate) async fn get_leader(&self) -> DatabaseResult<Option<LeaderPersistence>> {
let mut leader: Vec<LeaderPersistence> = self
.with_measured_conn(
DatabaseOperation::GetLeader,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::leader::table.load::<LeaderPersistence>(conn)?)
},
)
.await?;
if leader.len() > 1 {
return Err(DatabaseError::Logical(format!(
"More than one entry present in the leader table: {leader:?}"
)));
}
Ok(leader.pop())
}
/// Update the new leader with compare-exchange semantics. If `prev` does not
/// match the current leader entry, then the update is treated as a failure.
/// When `prev` is not specified, the update is forced.
pub(crate) async fn update_leader(
&self,
prev: Option<LeaderPersistence>,
new: LeaderPersistence,
) -> DatabaseResult<()> {
use crate::schema::leader::dsl::*;
let updated = self
.with_measured_conn(
DatabaseOperation::UpdateLeader,
move |conn| -> DatabaseResult<usize> {
let updated = match &prev {
Some(prev) => diesel::update(leader)
.filter(hostname.eq(prev.hostname.clone()))
.filter(port.eq(prev.port))
.filter(started_at.eq(prev.started_at))
.set((
hostname.eq(new.hostname.clone()),
port.eq(new.port),
started_at.eq(new.started_at),
))
.execute(conn)?,
None => diesel::insert_into(leader)
.values(new.clone())
.execute(conn)?,
};
Ok(updated)
},
)
.await?;
if updated == 0 {
return Err(DatabaseError::Logical(
"Leader table update failed".to_string(),
));
}
Ok(())
}
}
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
@@ -910,3 +977,13 @@ impl From<MetadataHealthPersistence> for MetadataHealthRecord {
}
}
}
#[derive(
Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, Debug, Clone,
)]
#[diesel(table_name = crate::schema::leader)]
pub(crate) struct LeaderPersistence {
pub(crate) hostname: String,
pub(crate) port: i32,
pub(crate) started_at: chrono::DateTime<chrono::Utc>,
}

View File

@@ -1,5 +1,13 @@
// @generated automatically by Diesel CLI.
diesel::table! {
leader (hostname, port, started_at) {
hostname -> Varchar,
port -> Int4,
started_at -> Timestamptz,
}
}
diesel::table! {
metadata_health (tenant_id, shard_number, shard_count) {
tenant_id -> Varchar,
@@ -36,4 +44,4 @@ diesel::table! {
}
}
diesel::allow_tables_to_appear_in_same_query!(metadata_health, nodes, tenant_shards,);
diesel::allow_tables_to_appear_in_same_query!(leader, metadata_health, nodes, tenant_shards,);

View File

@@ -16,7 +16,10 @@ use crate::{
compute_hook::NotifyError,
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
metrics::LeadershipStatusGroup,
persistence::{AbortShardSplitStatus, MetadataHealthPersistence, TenantFilter},
peer_client::{GlobalObservedState, PeerClient},
persistence::{
AbortShardSplitStatus, LeaderPersistence, MetadataHealthPersistence, TenantFilter,
},
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
tenant_shard::{
@@ -82,7 +85,6 @@ use crate::{
ReconcilerWaiter, TenantShard,
},
};
use serde::{Deserialize, Serialize};
// For operations that should be quick, like attaching a new tenant
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
@@ -223,6 +225,7 @@ impl ServiceState {
tenants: BTreeMap<TenantShardId, TenantShard>,
scheduler: Scheduler,
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
initial_leadership_status: LeadershipStatus,
) -> Self {
let status = &crate::metrics::METRICS_REGISTRY
.metrics_group
@@ -230,15 +233,13 @@ impl ServiceState {
status.set(
LeadershipStatusGroup {
status: LeadershipStatus::Leader,
status: initial_leadership_status,
},
1,
);
Self {
// TODO: Starting up as Leader is a transient state. Once we enable rolling
// upgrades on the k8s side, we should start up as Candidate.
leadership_status: LeadershipStatus::Leader,
leadership_status: initial_leadership_status,
tenants,
nodes: Arc::new(nodes),
scheduler,
@@ -287,6 +288,33 @@ impl ServiceState {
0,
);
}
fn become_leader(&mut self) {
self.leadership_status = LeadershipStatus::Leader;
let status = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_leadership_status;
status.set(
LeadershipStatusGroup {
status: LeadershipStatus::Leader,
},
1,
);
status.set(
LeadershipStatusGroup {
status: LeadershipStatus::SteppedDown,
},
0,
);
status.set(
LeadershipStatusGroup {
status: LeadershipStatus::Candidate,
},
0,
);
}
}
#[derive(Clone)]
@@ -323,6 +351,10 @@ pub struct Config {
// TODO: make this cfg(feature = "testing")
pub neon_local_repo_dir: Option<PathBuf>,
pub start_as_candidate: bool,
pub http_service_port: i32,
}
impl From<DatabaseError> for ApiError {
@@ -490,9 +522,10 @@ pub(crate) enum ReconcileResultRequest {
Stop,
}
// TODO: move this into the storcon peer client when that gets added
#[derive(Serialize, Deserialize, Debug, Default)]
pub(crate) struct GlobalObservedState(HashMap<TenantShardId, ObservedState>);
struct LeaderStepDownState {
observed: GlobalObservedState,
leader: LeaderPersistence,
}
impl Service {
pub fn get_config(&self) -> &Config {
@@ -504,15 +537,11 @@ impl Service {
#[instrument(skip_all)]
async fn startup_reconcile(
self: &Arc<Service>,
leader_step_down_state: Option<LeaderStepDownState>,
bg_compute_notify_result_tx: tokio::sync::mpsc::Sender<
Result<(), (TenantShardId, NotifyError)>,
>,
) {
// For all tenant shards, a vector of observed states on nodes (where None means
// indeterminate, same as in [`ObservedStateLocation`])
let mut observed: HashMap<TenantShardId, Vec<(NodeId, Option<LocationConfig>)>> =
HashMap::new();
// Startup reconciliation does I/O to other services: whether they
// are responsive or not, we should aim to finish within our deadline, because:
// - If we don't, a k8s readiness hook watching /ready will kill us.
@@ -526,26 +555,29 @@ impl Service {
.checked_add(STARTUP_RECONCILE_TIMEOUT / 2)
.expect("Reconcile timeout is a modest constant");
let (observed, current_leader) = if let Some(state) = leader_step_down_state {
tracing::info!(
"Using observed received from leader at {}:{}",
state.leader.hostname,
state.leader.port
);
(state.observed, Some(state.leader))
} else {
(
self.build_global_observed_state(node_scan_deadline).await,
None,
)
};
// Accumulate a list of any tenant locations that ought to be detached
let mut cleanup = Vec::new();
let node_listings = self.scan_node_locations(node_scan_deadline).await;
// Send initial heartbeat requests to nodes that replied to the location listing above.
let nodes_online = self.initial_heartbeat_round(node_listings.keys()).await;
for (node_id, list_response) in node_listings {
let tenant_shards = list_response.tenant_shards;
tracing::info!(
"Received {} shard statuses from pageserver {}, setting it to Active",
tenant_shards.len(),
node_id
);
for (tenant_shard_id, conf_opt) in tenant_shards {
let shard_observations = observed.entry(tenant_shard_id).or_default();
shard_observations.push((node_id, conf_opt));
}
}
// Send initial heartbeat requests to all nodes loaded from the database
let all_nodes = {
let locked = self.inner.read().unwrap();
locked.nodes.clone()
};
let nodes_online = self.initial_heartbeat_round(all_nodes.keys()).await;
// List of tenants for which we will attempt to notify compute of their location at startup
let mut compute_notifications = Vec::new();
@@ -568,17 +600,16 @@ impl Service {
}
*nodes = Arc::new(new_nodes);
for (tenant_shard_id, shard_observations) in observed {
for (node_id, observed_loc) in shard_observations {
let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else {
cleanup.push((tenant_shard_id, node_id));
continue;
};
tenant_shard
.observed
.locations
.insert(node_id, ObservedStateLocation { conf: observed_loc });
}
for (tenant_shard_id, observed_state) in observed.0 {
let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else {
for node_id in observed_state.locations.keys() {
cleanup.push((tenant_shard_id, *node_id));
}
continue;
};
tenant_shard.observed = observed_state;
}
// Populate each tenant's intent state
@@ -612,6 +643,22 @@ impl Service {
tenants.len()
};
// Before making any obeservable changes to the cluster, persist self
// as leader in database and memory.
let proposed_leader = self.get_proposed_leader_info();
if let Err(err) = self
.persistence
.update_leader(current_leader, proposed_leader)
.await
{
tracing::error!("Failed to persist self as leader: {err}. Aborting start-up ...");
std::process::exit(1);
}
self.inner.write().unwrap().become_leader();
// TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
// generation_pageserver in the database.
@@ -777,6 +824,31 @@ impl Service {
node_results
}
async fn build_global_observed_state(&self, deadline: Instant) -> GlobalObservedState {
let node_listings = self.scan_node_locations(deadline).await;
let mut observed = GlobalObservedState::default();
for (node_id, location_confs) in node_listings {
tracing::info!(
"Received {} shard statuses from pageserver {}",
location_confs.tenant_shards.len(),
node_id
);
for (tid, location_conf) in location_confs.tenant_shards {
let entry = observed.0.entry(tid).or_default();
entry.locations.insert(
node_id,
ObservedStateLocation {
conf: location_conf,
},
);
}
}
observed
}
/// Used during [`Self::startup_reconcile`]: detach a list of unknown-to-us tenants from pageservers.
///
/// This is safe to run in the background, because if we don't have this TenantShardId in our map of
@@ -1255,12 +1327,20 @@ impl Service {
config.max_warming_up_interval,
cancel.clone(),
);
let initial_leadership_status = if config.start_as_candidate {
LeadershipStatus::Candidate
} else {
LeadershipStatus::Leader
};
let this = Arc::new(Self {
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
nodes,
tenants,
scheduler,
delayed_reconcile_rx,
initial_leadership_status,
))),
config: config.clone(),
persistence,
@@ -1329,7 +1409,16 @@ impl Service {
return;
};
this.startup_reconcile(bg_compute_notify_result_tx).await;
let leadership_status = this.inner.read().unwrap().get_leadership_status();
let peer_observed_state = match leadership_status {
LeadershipStatus::Candidate => this.request_step_down().await,
LeadershipStatus::Leader => None,
LeadershipStatus::SteppedDown => unreachable!(),
};
this.startup_reconcile(peer_observed_state, bg_compute_notify_result_tx)
.await;
drop(startup_completion);
}
});
@@ -6179,4 +6268,88 @@ impl Service {
global_observed
}
/// Collect the details for the current proccess wishing to become the storage controller
/// leader.
///
/// On failures to discover and resolve the hostname the process is killed and we rely on k8s to retry.
fn get_proposed_leader_info(&self) -> LeaderPersistence {
let hostname = match dns_lookup::get_hostname() {
Ok(name) => name,
Err(err) => {
tracing::error!("Failed to discover hostname: {err}. Aborting start-up ...");
std::process::exit(1);
}
};
let mut addrs = match dns_lookup::lookup_host(&hostname) {
Ok(addrs) => addrs,
Err(err) => {
tracing::error!("Failed to resolve hostname: {err}. Aborting start-up ...");
std::process::exit(1);
}
};
let addr = addrs
.pop()
.expect("k8s configured hostname always resolves");
let proposed = LeaderPersistence {
hostname: addr.to_string(),
port: self.get_config().http_service_port,
started_at: chrono::Utc::now(),
};
tracing::info!("Proposed leader details are: {proposed:?}");
proposed
}
/// Request step down from the currently registered leader in the database
///
/// If such an entry is persisted, the success path returns the observed
/// state and details of the leader. Otherwise, None is returned indicating
/// there is no leader currently.
///
/// On failures to query the database or step down error responses the process is killed
/// and we rely on k8s to retry.
async fn request_step_down(&self) -> Option<LeaderStepDownState> {
let leader = match self.persistence.get_leader().await {
Ok(leader) => leader,
Err(err) => {
tracing::error!(
"Failed to query database for current leader: {err}. Aborting start-up ..."
);
std::process::exit(1);
}
};
match leader {
Some(leader) => {
// TODO: jwt token
let client = PeerClient::new(
leader.hostname.to_owned(),
leader.port,
self.config.jwt_token.clone(),
);
let state = client.step_down(&self.cancel).await;
match state {
Ok(state) => Some(LeaderStepDownState {
observed: state,
leader: leader.clone(),
}),
Err(err) => {
tracing::error!(
"Leader ({}:{}) did not respond to step-down request: {}",
leader.hostname,
leader.port,
err
);
None
}
}
}
None => None,
}
}
}