storcon: move safekeeper related stuff out of service.rs (#11288)

There is no functional change here. We move safekeeper related code from
`service.rs` to `service/safekeeper_service.rs`, so that safekeeper
related stuff is contained in a single file. This also helps with
preventing `service.rs` from growing even further.

Part of #9011.
This commit is contained in:
Arpad Müller
2025-03-18 10:00:53 +01:00
committed by GitHub
parent 57d51e949d
commit 2cf6ae76fc
2 changed files with 528 additions and 510 deletions

View File

@@ -1,6 +1,7 @@
pub mod chaos_injector;
mod context_iterator;
pub(crate) mod safekeeper_reconciler;
mod safekeeper_service;
use std::borrow::Cow;
use std::cmp::Ordering;
@@ -27,16 +28,15 @@ use itertools::Itertools;
use pageserver_api::controller_api::{
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
SafekeeperDescribeResponse, ShardSchedulingPolicy, ShardsPreferredAzsRequest,
ShardsPreferredAzsResponse, SkSchedulingPolicy, TenantCreateRequest, TenantCreateResponse,
TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard,
TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest,
TenantShardMigrateResponse,
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use pageserver_api::models::{
self, DetachBehavior, LocationConfig, LocationConfigListResponse, LocationConfigMode,
PageserverUtilization, SafekeeperInfo, SafekeepersInfo, SecondaryProgress, ShardParameters,
TenantConfig, TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest,
PageserverUtilization, SecondaryProgress, ShardParameters, TenantConfig,
TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest,
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateResponseStorcon,
@@ -51,18 +51,15 @@ use pageserver_api::upcall_api::{
};
use pageserver_client::{BlockUnblock, mgmt_api};
use reqwest::{Certificate, StatusCode};
use safekeeper_api::membership::{MemberSet, SafekeeperId};
use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_reconciler::{SafekeeperReconcilers, ScheduleRequest};
use safekeeper_reconciler::SafekeeperReconcilers;
use tokio::sync::TryAcquireError;
use tokio::sync::mpsc::error::TrySendError;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
use utils::completion::Barrier;
use utils::generation::Generation;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
use utils::sync::gate::Gate;
use utils::{failpoint_support, pausable_failpoint};
@@ -83,8 +80,8 @@ use crate::peer_client::GlobalObservedState;
use crate::persistence::split_state::SplitState;
use crate::persistence::{
AbortShardSplitStatus, ControllerPersistence, DatabaseError, DatabaseResult,
MetadataHealthPersistence, Persistence, SafekeeperTimelineOpKind, ShardGenerationState,
TenantFilter, TenantShardPersistence, TimelinePendingOpPersistence, TimelinePersistence,
MetadataHealthPersistence, Persistence, ShardGenerationState, TenantFilter,
TenantShardPersistence,
};
use crate::reconciler::{
ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder, ReconcilerPriority,
@@ -3646,281 +3643,6 @@ impl Service {
.await?
}
/// Timeline creation on safekeepers
///
/// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
/// where `left` contains the list of safekeepers that didn't have a successful response.
/// Assumes tenant lock is held while calling this function.
async fn tenant_timeline_create_safekeepers_quorum(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
pg_version: u32,
timeline_persistence: &TimelinePersistence,
) -> Result<Vec<NodeId>, ApiError> {
// If quorum is reached, return if we are outside of a specified timeout
let jwt = self
.config
.safekeeper_jwt_token
.clone()
.map(SecretString::from);
let mut joinset = JoinSet::new();
let safekeepers = {
let locked = self.inner.read().unwrap();
locked.safekeepers.clone()
};
let mut members = Vec::new();
for sk_id in timeline_persistence.sk_set.iter() {
let sk_id = NodeId(*sk_id as u64);
let Some(safekeeper) = safekeepers.get(&sk_id) else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"couldn't find entry for safekeeper with id {sk_id}"
)))?;
};
members.push(SafekeeperId {
id: sk_id,
host: safekeeper.skp.host.clone(),
pg_port: safekeeper.skp.port as u16,
});
}
let mset = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
let mconf = safekeeper_api::membership::Configuration::new(mset);
let req = safekeeper_api::models::TimelineCreateRequest {
commit_lsn: None,
mconf,
pg_version,
start_lsn: timeline_persistence.start_lsn.0,
system_id: None,
tenant_id,
timeline_id,
wal_seg_size: None,
};
const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
for sk in timeline_persistence.sk_set.iter() {
let sk_id = NodeId(*sk as u64);
let safekeepers = safekeepers.clone();
let jwt = jwt.clone();
let ssl_ca_cert = self.config.ssl_ca_cert.clone();
let req = req.clone();
joinset.spawn(async move {
// Unwrap is fine as we already would have returned error above
let sk_p = safekeepers.get(&sk_id).unwrap();
let res = sk_p
.with_client_retries(
|client| {
let req = req.clone();
async move { client.create_timeline(&req).await }
},
&jwt,
&ssl_ca_cert,
3,
3,
SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
&CancellationToken::new(),
)
.await;
(sk_id, sk_p.skp.host.clone(), res)
});
}
// After we have built the joinset, we now wait for the tasks to complete,
// but with a specified timeout to make sure we return swiftly, either with
// a failure or success.
let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
// Wait until all tasks finish or timeout is hit, whichever occurs
// first.
let mut reconcile_results = Vec::new();
loop {
if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
{
let Some(res) = res else { break };
match res {
Ok(res) => {
tracing::info!(
"response from safekeeper id:{} at {}: {:?}",
res.0,
res.1,
res.2
);
reconcile_results.push(res);
}
Err(join_err) => {
tracing::info!("join_err for task in joinset: {join_err}");
}
}
} else {
tracing::info!(
"timeout for creation call after {} responses",
reconcile_results.len()
);
break;
}
}
// Now check now if quorum was reached in reconcile_results.
let total_result_count = reconcile_results.len();
let remaining = reconcile_results
.into_iter()
.filter_map(|res| res.2.is_err().then_some(res.0))
.collect::<Vec<_>>();
tracing::info!(
"Got {} non-successful responses from initial creation request of total {total_result_count} responses",
remaining.len()
);
if remaining.len() >= 2 {
// Failure
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"not enough successful reconciliations to reach quorum, please retry: {} errored",
remaining.len()
)));
}
Ok(remaining)
}
/// Create timeline in controller database and on safekeepers.
/// `timeline_info` is result of timeline creation on pageserver.
///
/// All actions must be idempotent as the call is retried until success. It
/// tries to create timeline in the db and on at least majority of
/// safekeepers + queue creation for safekeepers which missed it in the db
/// for infinite retries; after that, call returns Ok.
///
/// The idea is that once this is reached as long as we have alive majority
/// of safekeepers it is expected to get eventually operational as storcon
/// will be able to seed timeline on nodes which missed creation by making
/// pull_timeline from peers. On the other hand we don't want to fail
/// timeline creation if one safekeeper is down.
async fn tenant_timeline_create_safekeepers(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_info: &TimelineInfo,
create_mode: models::TimelineCreateRequestMode,
) -> Result<SafekeepersInfo, ApiError> {
let timeline_id = timeline_info.timeline_id;
let pg_version = timeline_info.pg_version * 10000;
// Initially start_lsn is determined by last_record_lsn in pageserver
// response as it does initdb. However, later we persist it and in sk
// creation calls replace with the value from the timeline row if it
// previously existed as on retries in theory endpoint might have
// already written some data and advanced last_record_lsn, while we want
// safekeepers to have consistent start_lsn.
let start_lsn = match create_mode {
models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
)))?;
}
};
// Choose initial set of safekeepers respecting affinity
let sks = self.safekeepers_for_new_timeline().await?;
let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
// Add timeline to db
let mut timeline_persist = TimelinePersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
start_lsn: start_lsn.into(),
generation: 0,
sk_set: sks_persistence.clone(),
new_sk_set: None,
cplane_notified_generation: 0,
deleted_at: None,
};
let inserted = self
.persistence
.insert_timeline(timeline_persist.clone())
.await?;
if !inserted {
if let Some(existent_persist) = self
.persistence
.get_timeline(tenant_id, timeline_id)
.await?
{
// Replace with what we have in the db, to get stuff like the generation right.
// We do still repeat the http calls to the safekeepers. After all, we could have
// crashed right after the wrote to the DB.
timeline_persist = existent_persist;
} else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"insertion said timeline already in db, but looking it up, it was gone"
)));
}
}
// Create the timeline on a quorum of safekeepers
let remaining = self
.tenant_timeline_create_safekeepers_quorum(
tenant_id,
timeline_id,
pg_version,
&timeline_persist,
)
.await?;
// For the remaining safekeepers, take care of their reconciliation asynchronously
for &remaining_id in remaining.iter() {
let pending_op = TimelinePendingOpPersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: timeline_persist.generation,
op_kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
sk_id: remaining_id.0 as i64,
};
tracing::info!("writing pending op for sk id {remaining_id}");
self.persistence.insert_pending_op(pending_op).await?;
}
if !remaining.is_empty() {
let mut locked = self.inner.write().unwrap();
for remaining_id in remaining {
let Some(sk) = locked.safekeepers.get(&remaining_id) else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Couldn't find safekeeper with id {remaining_id}"
)));
};
let Ok(host_list) = sks
.iter()
.map(|sk| {
Ok((
sk.id,
locked
.safekeepers
.get(&sk.id)
.ok_or_else(|| {
ApiError::InternalServerError(anyhow::anyhow!(
"Couldn't find safekeeper with id {remaining_id} to pull from"
))
})?
.base_url(),
))
})
.collect::<Result<_, ApiError>>()
else {
continue;
};
let req = ScheduleRequest {
safekeeper: Box::new(sk.clone()),
host_list,
tenant_id,
timeline_id,
generation: timeline_persist.generation as u32,
kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
};
locked.safekeeper_reconcilers.schedule_request(self, req);
}
}
Ok(SafekeepersInfo {
generation: timeline_persist.generation as u32,
safekeepers: sks,
tenant_id,
timeline_id,
})
}
pub(crate) async fn tenant_timeline_create(
self: &Arc<Self>,
tenant_id: TenantId,
@@ -4614,62 +4336,6 @@ impl Service {
status_code
}
/// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
async fn tenant_timeline_delete_safekeepers(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<(), ApiError> {
let tl = self
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let Some(tl) = tl else {
tracing::info!(
"timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed"
);
return Ok(());
};
let all_sks = tl
.new_sk_set
.iter()
.flat_map(|sks| {
sks.iter()
.map(|sk| (*sk, SafekeeperTimelineOpKind::Exclude))
})
.chain(
tl.sk_set
.iter()
.map(|v| (*v, SafekeeperTimelineOpKind::Delete)),
)
.collect::<HashMap<_, _>>();
// Schedule reconciliations
{
let mut locked = self.inner.write().unwrap();
for (sk_id, kind) in all_sks {
let sk_id = NodeId(sk_id as u64);
let Some(sk) = locked.safekeepers.get(&sk_id) else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Couldn't find safekeeper with id {sk_id}"
)));
};
let req = ScheduleRequest {
safekeeper: Box::new(sk.clone()),
// we don't use this for this kind, put a dummy value
host_list: Vec::new(),
tenant_id,
timeline_id,
generation: tl.generation as u32,
kind,
};
locked.safekeeper_reconcilers.schedule_request(self, req);
}
}
Ok(())
}
/// When you know the TenantId but not a specific shard, and would like to get the node holding shard 0.
pub(crate) async fn tenant_shard0_node(
&self,
@@ -8716,172 +8382,6 @@ impl Service {
global_observed
}
/// Choose safekeepers for the new timeline: 3 in different azs.
pub(crate) async fn safekeepers_for_new_timeline(
&self,
) -> Result<Vec<SafekeeperInfo>, ApiError> {
// Number of safekeepers in different AZs we are looking for
let wanted_count = 3;
let mut all_safekeepers = {
let locked = self.inner.read().unwrap();
locked
.safekeepers
.iter()
.filter_map(|sk| {
if sk.1.scheduling_policy() != SkSchedulingPolicy::Active {
// If we don't want to schedule stuff onto the safekeeper, respect that.
return None;
}
let utilization_opt = if let SafekeeperState::Available {
last_seen_at: _,
utilization,
} = sk.1.availability()
{
Some(utilization)
} else {
// non-available safekeepers still get a chance for new timelines,
// but put them last in the list.
None
};
let info = SafekeeperInfo {
hostname: sk.1.skp.host.clone(),
id: NodeId(sk.1.skp.id as u64),
};
Some((utilization_opt, info, sk.1.skp.availability_zone_id.clone()))
})
.collect::<Vec<_>>()
};
all_safekeepers.sort_by_key(|sk| {
(
sk.0.as_ref()
.map(|ut| ut.timeline_count)
.unwrap_or(u64::MAX),
// Use the id to decide on equal scores for reliability
sk.1.id.0,
)
});
let mut sks = Vec::new();
let mut azs = HashSet::new();
for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {
if !azs.insert(az_id) {
continue;
}
sks.push(sk_info.clone());
if sks.len() == wanted_count {
break;
}
}
if sks.len() == wanted_count {
Ok(sks)
} else {
Err(ApiError::InternalServerError(anyhow::anyhow!(
"couldn't find {wanted_count} safekeepers in different AZs for new timeline (found: {}, total active: {})",
sks.len(),
all_safekeepers.len(),
)))
}
}
pub(crate) async fn safekeepers_list(
&self,
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
let locked = self.inner.read().unwrap();
let mut list = locked
.safekeepers
.iter()
.map(|sk| sk.1.describe_response())
.collect::<Result<Vec<_>, _>>()?;
list.sort_by_key(|v| v.id);
Ok(list)
}
pub(crate) async fn get_safekeeper(
&self,
id: i64,
) -> Result<SafekeeperDescribeResponse, DatabaseError> {
let locked = self.inner.read().unwrap();
let sk = locked
.safekeepers
.get(&NodeId(id as u64))
.ok_or(diesel::result::Error::NotFound)?;
sk.describe_response()
}
pub(crate) async fn upsert_safekeeper(
&self,
record: crate::persistence::SafekeeperUpsert,
) -> Result<(), ApiError> {
let node_id = NodeId(record.id as u64);
let use_https = self.config.use_https_safekeeper_api;
if use_https && record.https_port.is_none() {
return Err(ApiError::PreconditionFailed(
format!(
"cannot upsert safekeeper {node_id}: \
https is enabled, but https port is not specified"
)
.into(),
));
}
self.persistence.safekeeper_upsert(record.clone()).await?;
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
match safekeepers.entry(node_id) {
std::collections::hash_map::Entry::Occupied(mut entry) => entry
.get_mut()
.update_from_record(record)
.expect("all preconditions should be checked before upsert to database"),
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(
Safekeeper::from_persistence(
crate::persistence::SafekeeperPersistence::from_upsert(
record,
SkSchedulingPolicy::Pause,
),
CancellationToken::new(),
use_https,
)
.expect("all preconditions should be checked before upsert to database"),
);
}
}
locked.safekeepers = Arc::new(safekeepers);
}
Ok(())
}
pub(crate) async fn set_safekeeper_scheduling_policy(
&self,
id: i64,
scheduling_policy: SkSchedulingPolicy,
) -> Result<(), DatabaseError> {
self.persistence
.set_safekeeper_scheduling_policy(id, scheduling_policy)
.await?;
let node_id = NodeId(id as u64);
// After the change has been persisted successfully, update the in-memory state
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
let sk = safekeepers
.get_mut(&node_id)
.ok_or(DatabaseError::Logical("Not found".to_string()))?;
sk.set_scheduling_policy(scheduling_policy);
match scheduling_policy {
SkSchedulingPolicy::Active => (),
SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => {
locked.safekeeper_reconcilers.cancel_safekeeper(node_id);
}
}
locked.safekeepers = Arc::new(safekeepers);
}
Ok(())
}
pub(crate) async fn update_shards_preferred_azs(
&self,
req: ShardsPreferredAzsRequest,

View File

@@ -0,0 +1,518 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use super::safekeeper_reconciler::ScheduleRequest;
use crate::heartbeater::SafekeeperState;
use crate::persistence::{
DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
};
use crate::safekeeper::Safekeeper;
use http_utils::error::ApiError;
use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo};
use safekeeper_api::membership::{MemberSet, SafekeeperId};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
use super::Service;
impl Service {
/// Timeline creation on safekeepers
///
/// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
/// where `left` contains the list of safekeepers that didn't have a successful response.
/// Assumes tenant lock is held while calling this function.
pub(super) async fn tenant_timeline_create_safekeepers_quorum(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
pg_version: u32,
timeline_persistence: &TimelinePersistence,
) -> Result<Vec<NodeId>, ApiError> {
// If quorum is reached, return if we are outside of a specified timeout
let jwt = self
.config
.safekeeper_jwt_token
.clone()
.map(SecretString::from);
let mut joinset = JoinSet::new();
let safekeepers = {
let locked = self.inner.read().unwrap();
locked.safekeepers.clone()
};
let mut members = Vec::new();
for sk_id in timeline_persistence.sk_set.iter() {
let sk_id = NodeId(*sk_id as u64);
let Some(safekeeper) = safekeepers.get(&sk_id) else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"couldn't find entry for safekeeper with id {sk_id}"
)))?;
};
members.push(SafekeeperId {
id: sk_id,
host: safekeeper.skp.host.clone(),
pg_port: safekeeper.skp.port as u16,
});
}
let mset = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
let mconf = safekeeper_api::membership::Configuration::new(mset);
let req = safekeeper_api::models::TimelineCreateRequest {
commit_lsn: None,
mconf,
pg_version,
start_lsn: timeline_persistence.start_lsn.0,
system_id: None,
tenant_id,
timeline_id,
wal_seg_size: None,
};
const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
for sk in timeline_persistence.sk_set.iter() {
let sk_id = NodeId(*sk as u64);
let safekeepers = safekeepers.clone();
let jwt = jwt.clone();
let ssl_ca_cert = self.config.ssl_ca_cert.clone();
let req = req.clone();
joinset.spawn(async move {
// Unwrap is fine as we already would have returned error above
let sk_p = safekeepers.get(&sk_id).unwrap();
let res = sk_p
.with_client_retries(
|client| {
let req = req.clone();
async move { client.create_timeline(&req).await }
},
&jwt,
&ssl_ca_cert,
3,
3,
SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
&CancellationToken::new(),
)
.await;
(sk_id, sk_p.skp.host.clone(), res)
});
}
// After we have built the joinset, we now wait for the tasks to complete,
// but with a specified timeout to make sure we return swiftly, either with
// a failure or success.
let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
// Wait until all tasks finish or timeout is hit, whichever occurs
// first.
let mut reconcile_results = Vec::new();
loop {
if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
{
let Some(res) = res else { break };
match res {
Ok(res) => {
tracing::info!(
"response from safekeeper id:{} at {}: {:?}",
res.0,
res.1,
res.2
);
reconcile_results.push(res);
}
Err(join_err) => {
tracing::info!("join_err for task in joinset: {join_err}");
}
}
} else {
tracing::info!(
"timeout for creation call after {} responses",
reconcile_results.len()
);
break;
}
}
// Now check now if quorum was reached in reconcile_results.
let total_result_count = reconcile_results.len();
let remaining = reconcile_results
.into_iter()
.filter_map(|res| res.2.is_err().then_some(res.0))
.collect::<Vec<_>>();
tracing::info!(
"Got {} non-successful responses from initial creation request of total {total_result_count} responses",
remaining.len()
);
if remaining.len() >= 2 {
// Failure
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"not enough successful reconciliations to reach quorum, please retry: {} errored",
remaining.len()
)));
}
Ok(remaining)
}
/// Create timeline in controller database and on safekeepers.
/// `timeline_info` is result of timeline creation on pageserver.
///
/// All actions must be idempotent as the call is retried until success. It
/// tries to create timeline in the db and on at least majority of
/// safekeepers + queue creation for safekeepers which missed it in the db
/// for infinite retries; after that, call returns Ok.
///
/// The idea is that once this is reached as long as we have alive majority
/// of safekeepers it is expected to get eventually operational as storcon
/// will be able to seed timeline on nodes which missed creation by making
/// pull_timeline from peers. On the other hand we don't want to fail
/// timeline creation if one safekeeper is down.
pub(super) async fn tenant_timeline_create_safekeepers(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_info: &TimelineInfo,
create_mode: models::TimelineCreateRequestMode,
) -> Result<SafekeepersInfo, ApiError> {
let timeline_id = timeline_info.timeline_id;
let pg_version = timeline_info.pg_version * 10000;
// Initially start_lsn is determined by last_record_lsn in pageserver
// response as it does initdb. However, later we persist it and in sk
// creation calls replace with the value from the timeline row if it
// previously existed as on retries in theory endpoint might have
// already written some data and advanced last_record_lsn, while we want
// safekeepers to have consistent start_lsn.
let start_lsn = match create_mode {
models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
)))?;
}
};
// Choose initial set of safekeepers respecting affinity
let sks = self.safekeepers_for_new_timeline().await?;
let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
// Add timeline to db
let mut timeline_persist = TimelinePersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
start_lsn: start_lsn.into(),
generation: 0,
sk_set: sks_persistence.clone(),
new_sk_set: None,
cplane_notified_generation: 0,
deleted_at: None,
};
let inserted = self
.persistence
.insert_timeline(timeline_persist.clone())
.await?;
if !inserted {
if let Some(existent_persist) = self
.persistence
.get_timeline(tenant_id, timeline_id)
.await?
{
// Replace with what we have in the db, to get stuff like the generation right.
// We do still repeat the http calls to the safekeepers. After all, we could have
// crashed right after the wrote to the DB.
timeline_persist = existent_persist;
} else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"insertion said timeline already in db, but looking it up, it was gone"
)));
}
}
// Create the timeline on a quorum of safekeepers
let remaining = self
.tenant_timeline_create_safekeepers_quorum(
tenant_id,
timeline_id,
pg_version,
&timeline_persist,
)
.await?;
// For the remaining safekeepers, take care of their reconciliation asynchronously
for &remaining_id in remaining.iter() {
let pending_op = TimelinePendingOpPersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: timeline_persist.generation,
op_kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
sk_id: remaining_id.0 as i64,
};
tracing::info!("writing pending op for sk id {remaining_id}");
self.persistence.insert_pending_op(pending_op).await?;
}
if !remaining.is_empty() {
let mut locked = self.inner.write().unwrap();
for remaining_id in remaining {
let Some(sk) = locked.safekeepers.get(&remaining_id) else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Couldn't find safekeeper with id {remaining_id}"
)));
};
let Ok(host_list) = sks
.iter()
.map(|sk| {
Ok((
sk.id,
locked
.safekeepers
.get(&sk.id)
.ok_or_else(|| {
ApiError::InternalServerError(anyhow::anyhow!(
"Couldn't find safekeeper with id {remaining_id} to pull from"
))
})?
.base_url(),
))
})
.collect::<Result<_, ApiError>>()
else {
continue;
};
let req = ScheduleRequest {
safekeeper: Box::new(sk.clone()),
host_list,
tenant_id,
timeline_id,
generation: timeline_persist.generation as u32,
kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
};
locked.safekeeper_reconcilers.schedule_request(self, req);
}
}
Ok(SafekeepersInfo {
generation: timeline_persist.generation as u32,
safekeepers: sks,
tenant_id,
timeline_id,
})
}
/// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
pub(super) async fn tenant_timeline_delete_safekeepers(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<(), ApiError> {
let tl = self
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let Some(tl) = tl else {
tracing::info!(
"timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed"
);
return Ok(());
};
let all_sks = tl
.new_sk_set
.iter()
.flat_map(|sks| {
sks.iter()
.map(|sk| (*sk, SafekeeperTimelineOpKind::Exclude))
})
.chain(
tl.sk_set
.iter()
.map(|v| (*v, SafekeeperTimelineOpKind::Delete)),
)
.collect::<HashMap<_, _>>();
// Schedule reconciliations
{
let mut locked = self.inner.write().unwrap();
for (sk_id, kind) in all_sks {
let sk_id = NodeId(sk_id as u64);
let Some(sk) = locked.safekeepers.get(&sk_id) else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Couldn't find safekeeper with id {sk_id}"
)));
};
let req = ScheduleRequest {
safekeeper: Box::new(sk.clone()),
// we don't use this for this kind, put a dummy value
host_list: Vec::new(),
tenant_id,
timeline_id,
generation: tl.generation as u32,
kind,
};
locked.safekeeper_reconcilers.schedule_request(self, req);
}
}
Ok(())
}
/// Choose safekeepers for the new timeline: 3 in different azs.
pub(crate) async fn safekeepers_for_new_timeline(
&self,
) -> Result<Vec<SafekeeperInfo>, ApiError> {
// Number of safekeepers in different AZs we are looking for
let wanted_count = 3;
let mut all_safekeepers = {
let locked = self.inner.read().unwrap();
locked
.safekeepers
.iter()
.filter_map(|sk| {
if sk.1.scheduling_policy() != SkSchedulingPolicy::Active {
// If we don't want to schedule stuff onto the safekeeper, respect that.
return None;
}
let utilization_opt = if let SafekeeperState::Available {
last_seen_at: _,
utilization,
} = sk.1.availability()
{
Some(utilization)
} else {
// non-available safekeepers still get a chance for new timelines,
// but put them last in the list.
None
};
let info = SafekeeperInfo {
hostname: sk.1.skp.host.clone(),
id: NodeId(sk.1.skp.id as u64),
};
Some((utilization_opt, info, sk.1.skp.availability_zone_id.clone()))
})
.collect::<Vec<_>>()
};
all_safekeepers.sort_by_key(|sk| {
(
sk.0.as_ref()
.map(|ut| ut.timeline_count)
.unwrap_or(u64::MAX),
// Use the id to decide on equal scores for reliability
sk.1.id.0,
)
});
let mut sks = Vec::new();
let mut azs = HashSet::new();
for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {
if !azs.insert(az_id) {
continue;
}
sks.push(sk_info.clone());
if sks.len() == wanted_count {
break;
}
}
if sks.len() == wanted_count {
Ok(sks)
} else {
Err(ApiError::InternalServerError(anyhow::anyhow!(
"couldn't find {wanted_count} safekeepers in different AZs for new timeline (found: {}, total active: {})",
sks.len(),
all_safekeepers.len(),
)))
}
}
pub(crate) async fn safekeepers_list(
&self,
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
let locked = self.inner.read().unwrap();
let mut list = locked
.safekeepers
.iter()
.map(|sk| sk.1.describe_response())
.collect::<Result<Vec<_>, _>>()?;
list.sort_by_key(|v| v.id);
Ok(list)
}
pub(crate) async fn get_safekeeper(
&self,
id: i64,
) -> Result<SafekeeperDescribeResponse, DatabaseError> {
let locked = self.inner.read().unwrap();
let sk = locked
.safekeepers
.get(&NodeId(id as u64))
.ok_or(diesel::result::Error::NotFound)?;
sk.describe_response()
}
pub(crate) async fn upsert_safekeeper(
&self,
record: crate::persistence::SafekeeperUpsert,
) -> Result<(), ApiError> {
let node_id = NodeId(record.id as u64);
let use_https = self.config.use_https_safekeeper_api;
if use_https && record.https_port.is_none() {
return Err(ApiError::PreconditionFailed(
format!(
"cannot upsert safekeeper {node_id}: \
https is enabled, but https port is not specified"
)
.into(),
));
}
self.persistence.safekeeper_upsert(record.clone()).await?;
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
match safekeepers.entry(node_id) {
std::collections::hash_map::Entry::Occupied(mut entry) => entry
.get_mut()
.update_from_record(record)
.expect("all preconditions should be checked before upsert to database"),
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(
Safekeeper::from_persistence(
crate::persistence::SafekeeperPersistence::from_upsert(
record,
SkSchedulingPolicy::Pause,
),
CancellationToken::new(),
use_https,
)
.expect("all preconditions should be checked before upsert to database"),
);
}
}
locked.safekeepers = Arc::new(safekeepers);
}
Ok(())
}
pub(crate) async fn set_safekeeper_scheduling_policy(
&self,
id: i64,
scheduling_policy: SkSchedulingPolicy,
) -> Result<(), DatabaseError> {
self.persistence
.set_safekeeper_scheduling_policy(id, scheduling_policy)
.await?;
let node_id = NodeId(id as u64);
// After the change has been persisted successfully, update the in-memory state
{
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
let sk = safekeepers
.get_mut(&node_id)
.ok_or(DatabaseError::Logical("Not found".to_string()))?;
sk.set_scheduling_policy(scheduling_policy);
match scheduling_policy {
SkSchedulingPolicy::Active => (),
SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => {
locked.safekeeper_reconcilers.cancel_safekeeper(node_id);
}
}
locked.safekeepers = Arc::new(safekeepers);
}
Ok(())
}
}