storcon: timetime table, creation and deletion (#11058)

This PR extends the storcon with basic safekeeper management of
timelines, mainly timeline creation and deletion. We want to make the
storcon manage safekeepers in the future. Timeline creation is
controlled by the `--timelines-onto-safekeepers` flag.

1. it adds the `timelines` and `safekeeper_timeline_pending_ops` tables
to the storcon db
2. extend code for the timeline creation and deletion
4. it adds per-safekeeper reconciler tasks 

TODO:

* maybe not immediately schedule reconciliations for deletions but have
a prior manual step
* tenant deletions
* add exclude API definitions (probably separate PR)
* how to choose safekeeper to do exclude on vs deletion? this can be a
bit hairy because the safekeeper might go offline in the meantime.
* error/failure case handling
* tests (cc test_explicit_timeline_creation from #11002)
* single safekeeper mode: we often only have one SK (in tests for
example)
* `notify-safekeepers` hook:
https://github.com/neondatabase/neon/issues/11163

TODOs implemented:

* cancellations of enqueued reconciliations on a per-timeline basis,
helpful if there is an ongoing deletion
* implement pending ops overwrite behavior
* load pending operations from db

RFC section for important reading:
[link](https://github.com/neondatabase/neon/blob/main/docs/rfcs/035-safekeeper-dynamic-membership-change.md#storage_controller-implementation)

Implements the bulk of #9011
Successor of #10440.

---------

Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
This commit is contained in:
Arpad Müller
2025-03-11 03:31:22 +01:00
committed by GitHub
parent 3451bdd3d2
commit 4d3c477689
16 changed files with 1248 additions and 26 deletions

1
Cargo.lock generated
View File

@@ -6621,6 +6621,7 @@ dependencies = [
"bytes",
"chrono",
"clap",
"clashmap",
"control_plane",
"cron",
"diesel",

View File

@@ -177,6 +177,8 @@ pub struct NeonStorageControllerConf {
#[serde(default)]
pub use_https_pageserver_api: bool,
pub timelines_onto_safekeepers: bool,
}
impl NeonStorageControllerConf {
@@ -201,6 +203,7 @@ impl Default for NeonStorageControllerConf {
heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
long_reconcile_threshold: None,
use_https_pageserver_api: false,
timelines_onto_safekeepers: false,
}
}
}

View File

@@ -584,6 +584,10 @@ impl StorageController {
self.env.base_data_dir.display()
));
if self.config.timelines_onto_safekeepers {
args.push("--timelines-onto-safekeepers".to_string());
}
background_process::start_process(
COMMAND,
&instance_dir,

View File

@@ -274,6 +274,31 @@ pub struct TimelineCreateRequest {
pub mode: TimelineCreateRequestMode,
}
/// Storage controller specific extensions to [`TimelineInfo`].
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateResponseStorcon {
#[serde(flatten)]
pub timeline_info: TimelineInfo,
pub safekeepers: Option<SafekeepersInfo>,
}
/// Safekeepers as returned in timeline creation request to storcon or pushed to
/// cplane in the post migration hook.
#[derive(Serialize, Deserialize, Clone)]
pub struct SafekeepersInfo {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub generation: u32,
pub safekeepers: Vec<SafekeeperInfo>,
}
#[derive(Serialize, Deserialize, Clone)]
pub struct SafekeeperInfo {
pub id: NodeId,
pub hostname: String,
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(untagged)]
pub enum TimelineCreateRequestMode {

View File

@@ -131,6 +131,14 @@ impl Configuration {
}
}
pub fn new(members: MemberSet) -> Self {
Configuration {
generation: INITIAL_GENERATION,
members,
new_members: None,
}
}
/// Is `sk_id` member of the configuration?
pub fn contains(&self, sk_id: NodeId) -> bool {
self.members.contains(sk_id) || self.new_members.as_ref().is_some_and(|m| m.contains(sk_id))

View File

@@ -18,7 +18,7 @@ pub struct SafekeeperStatus {
pub id: NodeId,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
@@ -283,7 +283,7 @@ pub struct SafekeeperUtilization {
}
/// pull_timeline request body.
#[derive(Debug, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct PullTimelineRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,

View File

@@ -19,6 +19,7 @@ bytes.workspace = true
chrono.workspace = true
clap.workspace = true
cron.workspace = true
clashmap.workspace = true
fail.workspace = true
futures.workspace = true
governor.workspace = true

View File

@@ -0,0 +1,2 @@
DROP TABLE timelines;
DROP TABLE safekeeper_timeline_pending_ops;

View File

@@ -0,0 +1,19 @@
CREATE TABLE timelines (
tenant_id VARCHAR NOT NULL,
timeline_id VARCHAR NOT NULL,
start_lsn pg_lsn NOT NULL,
generation INTEGER NOT NULL,
sk_set BIGINT[] NOT NULL,
new_sk_set BIGINT[],
cplane_notified_generation INTEGER NOT NULL,
deleted_at timestamptz,
PRIMARY KEY(tenant_id, timeline_id)
);
CREATE TABLE safekeeper_timeline_pending_ops (
sk_id BIGINT NOT NULL,
tenant_id VARCHAR NOT NULL,
timeline_id VARCHAR NOT NULL,
generation INTEGER NOT NULL,
op_kind VARCHAR NOT NULL,
PRIMARY KEY(tenant_id, timeline_id, sk_id)
);

View File

@@ -144,6 +144,11 @@ struct Cli {
/// Flag to use https for requests to pageserver API.
#[arg(long, default_value = "false")]
use_https_pageserver_api: bool,
// Whether to put timelines onto safekeepers
#[arg(long, default_value = "false")]
timelines_onto_safekeepers: bool,
/// Flag to use https for requests to safekeeper API.
#[arg(long, default_value = "false")]
use_https_safekeeper_api: bool,
@@ -370,6 +375,7 @@ async fn async_main() -> anyhow::Result<()> {
use_https_pageserver_api: args.use_https_pageserver_api,
use_https_safekeeper_api: args.use_https_safekeeper_api,
ssl_ca_cert,
timelines_onto_safekeepers: args.timelines_onto_safekeepers,
};
// Validate that we can connect to the database

View File

@@ -1,12 +1,15 @@
pub(crate) mod split_state;
use std::collections::HashMap;
use std::io::Write;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use diesel::deserialize::{FromSql, FromSqlRow};
use diesel::expression::AsExpression;
use diesel::pg::Pg;
use diesel::prelude::*;
use diesel::serialize::{IsNull, ToSql};
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
use diesel_async::pooled_connection::bb8::Pool;
use diesel_async::pooled_connection::{AsyncDieselConnectionManager, ManagerConfig};
@@ -29,7 +32,8 @@ use rustls::crypto::ring;
use scoped_futures::ScopedBoxFuture;
use serde::{Deserialize, Serialize};
use utils::generation::Generation;
use utils::id::{NodeId, TenantId};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use self::split_state::SplitState;
use crate::metrics::{
@@ -117,6 +121,11 @@ pub(crate) enum DatabaseOperation {
GetLeader,
UpdateLeader,
SetPreferredAzs,
InsertTimeline,
GetTimeline,
InsertTimelineReconcile,
RemoveTimelineReconcile,
ListTimelineReconcile,
}
#[must_use]
@@ -1276,6 +1285,166 @@ impl Persistence {
})
.await
}
/// Persist timeline. Returns if the timeline was newly inserted. If it wasn't, we haven't done any writes.
pub(crate) async fn insert_timeline(&self, entry: TimelinePersistence) -> DatabaseResult<bool> {
use crate::schema::timelines;
let entry = &entry;
self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| {
Box::pin(async move {
let inserted_updated = diesel::insert_into(timelines::table)
.values(entry)
.on_conflict((timelines::tenant_id, timelines::timeline_id))
.do_nothing()
.execute(conn)
.await?;
match inserted_updated {
0 => Ok(false),
1 => Ok(true),
_ => Err(DatabaseError::Logical(format!(
"unexpected number of rows ({})",
inserted_updated
))),
}
})
})
.await
}
/// Load timeline from db. Returns `None` if not present.
pub(crate) async fn get_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<Option<TimelinePersistence>> {
use crate::schema::timelines::dsl;
let tenant_id = &tenant_id;
let timeline_id = &timeline_id;
let timeline_from_db = self
.with_measured_conn(DatabaseOperation::GetTimeline, move |conn| {
Box::pin(async move {
let mut from_db: Vec<TimelineFromDb> = dsl::timelines
.filter(
dsl::tenant_id
.eq(&tenant_id.to_string())
.and(dsl::timeline_id.eq(&timeline_id.to_string())),
)
.load(conn)
.await?;
if from_db.is_empty() {
return Ok(None);
}
if from_db.len() != 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({})",
from_db.len()
)));
}
Ok(Some(from_db.pop().unwrap().into_persistence()))
})
})
.await?;
Ok(timeline_from_db)
}
/// Persist pending op. Returns if it was newly inserted. If it wasn't, we haven't done any writes.
pub(crate) async fn insert_pending_op(
&self,
entry: TimelinePendingOpPersistence,
) -> DatabaseResult<bool> {
use crate::schema::safekeeper_timeline_pending_ops as skpo;
// This overrides the `filter` fn used in other functions, so contain the mayhem via a function-local use
use diesel::query_dsl::methods::FilterDsl;
let entry = &entry;
self.with_measured_conn(DatabaseOperation::InsertTimelineReconcile, move |conn| {
Box::pin(async move {
// For simplicity it makes sense to keep only the last operation
// per (tenant, timeline, sk) tuple: if we migrated a timeline
// from node and adding it back it is not necessary to remove
// data on it. Hence, generation is not part of primary key and
// we override any rows with lower generations here.
let inserted_updated = diesel::insert_into(skpo::table)
.values(entry)
.on_conflict((skpo::tenant_id, skpo::timeline_id, skpo::sk_id))
.do_update()
.set(entry)
.filter(skpo::generation.lt(entry.generation))
.execute(conn)
.await?;
match inserted_updated {
0 => Ok(false),
1 => Ok(true),
_ => Err(DatabaseError::Logical(format!(
"unexpected number of rows ({})",
inserted_updated
))),
}
})
})
.await
}
/// Remove persisted pending op.
pub(crate) async fn remove_pending_op(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
sk_id: NodeId,
generation: u32,
) -> DatabaseResult<()> {
use crate::schema::safekeeper_timeline_pending_ops::dsl;
let tenant_id = &tenant_id;
let timeline_id = &timeline_id;
self.with_measured_conn(DatabaseOperation::RemoveTimelineReconcile, move |conn| {
Box::pin(async move {
diesel::delete(dsl::safekeeper_timeline_pending_ops)
.filter(dsl::tenant_id.eq(tenant_id.to_string()))
.filter(dsl::timeline_id.eq(timeline_id.to_string()))
.filter(dsl::sk_id.eq(sk_id.0 as i64))
.filter(dsl::generation.eq(generation as i32))
.execute(conn)
.await?;
Ok(())
})
})
.await
}
/// Load pending operations from db.
pub(crate) async fn list_pending_ops(
&self,
filter_for_sk: Option<NodeId>,
) -> DatabaseResult<Vec<TimelinePendingOpPersistence>> {
use crate::schema::safekeeper_timeline_pending_ops::dsl;
const FILTER_VAL_1: i64 = 1;
const FILTER_VAL_2: i64 = 2;
let filter_opt = filter_for_sk.map(|id| id.0 as i64);
let timeline_from_db = self
.with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| {
Box::pin(async move {
let from_db: Vec<TimelinePendingOpPersistence> =
dsl::safekeeper_timeline_pending_ops
.filter(
dsl::sk_id
.eq(filter_opt.unwrap_or(FILTER_VAL_1))
.and(dsl::sk_id.eq(filter_opt.unwrap_or(FILTER_VAL_2))),
)
.load(conn)
.await?;
Ok(from_db)
})
})
.await?;
Ok(timeline_from_db)
}
}
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
@@ -1671,3 +1840,139 @@ struct InsertUpdateSafekeeper<'a> {
availability_zone_id: &'a str,
scheduling_policy: Option<&'a str>,
}
#[derive(Serialize, Deserialize, FromSqlRow, AsExpression, Eq, PartialEq, Debug, Copy, Clone)]
#[diesel(sql_type = crate::schema::sql_types::PgLsn)]
pub(crate) struct LsnWrapper(pub(crate) Lsn);
impl From<Lsn> for LsnWrapper {
fn from(value: Lsn) -> Self {
LsnWrapper(value)
}
}
impl FromSql<crate::schema::sql_types::PgLsn, Pg> for LsnWrapper {
fn from_sql(
bytes: <Pg as diesel::backend::Backend>::RawValue<'_>,
) -> diesel::deserialize::Result<Self> {
let byte_arr: diesel::deserialize::Result<[u8; 8]> = bytes
.as_bytes()
.try_into()
.map_err(|_| "Can't obtain lsn from sql".into());
Ok(LsnWrapper(Lsn(u64::from_be_bytes(byte_arr?))))
}
}
impl ToSql<crate::schema::sql_types::PgLsn, Pg> for LsnWrapper {
fn to_sql<'b>(
&'b self,
out: &mut diesel::serialize::Output<'b, '_, Pg>,
) -> diesel::serialize::Result {
out.write_all(&u64::to_be_bytes(self.0.0))
.map(|_| IsNull::No)
.map_err(Into::into)
}
}
#[derive(Insertable, AsChangeset, Queryable, Selectable, Clone)]
#[diesel(table_name = crate::schema::timelines)]
pub(crate) struct TimelinePersistence {
pub(crate) tenant_id: String,
pub(crate) timeline_id: String,
pub(crate) start_lsn: LsnWrapper,
pub(crate) generation: i32,
pub(crate) sk_set: Vec<i64>,
pub(crate) new_sk_set: Option<Vec<i64>>,
pub(crate) cplane_notified_generation: i32,
pub(crate) deleted_at: Option<chrono::DateTime<chrono::Utc>>,
}
/// This is separate from [TimelinePersistence] only because postgres allows NULLs
/// in arrays and there is no way to forbid that at schema level. Hence diesel
/// wants `sk_set` to be `Vec<Option<i64>>` instead of `Vec<i64>` for
/// Queryable/Selectable. It does however allow insertions without redundant
/// Option(s), so [TimelinePersistence] doesn't have them.
#[derive(Queryable, Selectable)]
#[diesel(table_name = crate::schema::timelines)]
pub(crate) struct TimelineFromDb {
pub(crate) tenant_id: String,
pub(crate) timeline_id: String,
pub(crate) start_lsn: LsnWrapper,
pub(crate) generation: i32,
pub(crate) sk_set: Vec<Option<i64>>,
pub(crate) new_sk_set: Option<Vec<Option<i64>>>,
pub(crate) cplane_notified_generation: i32,
pub(crate) deleted_at: Option<chrono::DateTime<chrono::Utc>>,
}
impl TimelineFromDb {
fn into_persistence(self) -> TimelinePersistence {
// We should never encounter null entries in the sets, but we need to filter them out.
// There is no way to forbid this in the schema that diesel recognizes (to our knowledge).
let sk_set = self.sk_set.into_iter().flatten().collect::<Vec<_>>();
let new_sk_set = self
.new_sk_set
.map(|s| s.into_iter().flatten().collect::<Vec<_>>());
TimelinePersistence {
tenant_id: self.tenant_id,
timeline_id: self.timeline_id,
start_lsn: self.start_lsn,
generation: self.generation,
sk_set,
new_sk_set,
cplane_notified_generation: self.cplane_notified_generation,
deleted_at: self.deleted_at,
}
}
}
#[derive(Insertable, AsChangeset, Queryable, Selectable, Clone)]
#[diesel(table_name = crate::schema::safekeeper_timeline_pending_ops)]
pub(crate) struct TimelinePendingOpPersistence {
pub(crate) sk_id: i64,
pub(crate) tenant_id: String,
pub(crate) timeline_id: String,
pub(crate) generation: i32,
pub(crate) op_kind: SafekeeperTimelineOpKind,
}
#[derive(Serialize, Deserialize, FromSqlRow, AsExpression, Eq, PartialEq, Debug, Copy, Clone)]
#[diesel(sql_type = diesel::sql_types::VarChar)]
pub(crate) enum SafekeeperTimelineOpKind {
Pull,
Exclude,
Delete,
}
impl FromSql<diesel::sql_types::VarChar, Pg> for SafekeeperTimelineOpKind {
fn from_sql(
bytes: <Pg as diesel::backend::Backend>::RawValue<'_>,
) -> diesel::deserialize::Result<Self> {
let bytes = bytes.as_bytes();
match core::str::from_utf8(bytes) {
Ok(s) => match s {
"pull" => Ok(SafekeeperTimelineOpKind::Pull),
"exclude" => Ok(SafekeeperTimelineOpKind::Exclude),
"delete" => Ok(SafekeeperTimelineOpKind::Delete),
_ => Err(format!("can't parse: {s}").into()),
},
Err(e) => Err(format!("invalid UTF-8 for op_kind: {e}").into()),
}
}
}
impl ToSql<diesel::sql_types::VarChar, Pg> for SafekeeperTimelineOpKind {
fn to_sql<'b>(
&'b self,
out: &mut diesel::serialize::Output<'b, '_, Pg>,
) -> diesel::serialize::Result {
let kind_str = match self {
SafekeeperTimelineOpKind::Pull => "pull",
SafekeeperTimelineOpKind::Exclude => "exclude",
SafekeeperTimelineOpKind::Delete => "delete",
};
out.write_all(kind_str.as_bytes())
.map(|_| IsNull::No)
.map_err(Into::into)
}
}

View File

@@ -21,6 +21,7 @@ pub struct Safekeeper {
listen_https_port: Option<u16>,
scheduling_policy: SkSchedulingPolicy,
id: NodeId,
/// Heartbeating result.
availability: SafekeeperState,
// Flag from storcon's config to use https for safekeeper API.
@@ -85,6 +86,9 @@ impl Safekeeper {
self.scheduling_policy = scheduling_policy;
self.skp.scheduling_policy = scheduling_policy.into();
}
pub(crate) fn availability(&self) -> SafekeeperState {
self.availability.clone()
}
/// Perform an operation (which is given a [`SafekeeperClient`]) with retries
#[allow(clippy::too_many_arguments)]
pub(crate) async fn with_client_retries<T, O, F>(

View File

@@ -57,7 +57,6 @@ impl SafekeeperClient {
}
}
#[allow(dead_code)]
pub(crate) async fn create_timeline(
&self,
req: &TimelineCreateRequest,
@@ -70,7 +69,6 @@ impl SafekeeperClient {
)
}
#[allow(dead_code)]
pub(crate) async fn delete_timeline(
&self,
tenant_id: TenantId,
@@ -84,7 +82,6 @@ impl SafekeeperClient {
)
}
#[allow(dead_code)]
pub(crate) async fn pull_timeline(
&self,
req: &PullTimelineRequest,

View File

@@ -1,5 +1,11 @@
// @generated automatically by Diesel CLI.
pub mod sql_types {
#[derive(diesel::query_builder::QueryId, diesel::sql_types::SqlType)]
#[diesel(postgres_type(name = "pg_lsn", schema = "pg_catalog"))]
pub struct PgLsn;
}
diesel::table! {
controllers (address, started_at) {
address -> Varchar,
@@ -30,6 +36,16 @@ diesel::table! {
}
}
diesel::table! {
safekeeper_timeline_pending_ops (tenant_id, timeline_id, sk_id) {
sk_id -> Int8,
tenant_id -> Varchar,
timeline_id -> Varchar,
generation -> Int4,
op_kind -> Varchar,
}
}
diesel::table! {
safekeepers (id) {
id -> Int8,
@@ -60,10 +76,28 @@ diesel::table! {
}
}
diesel::table! {
use diesel::sql_types::*;
use super::sql_types::PgLsn;
timelines (tenant_id, timeline_id) {
tenant_id -> Varchar,
timeline_id -> Varchar,
start_lsn -> PgLsn,
generation -> Int4,
sk_set -> Array<Nullable<Int8>>,
new_sk_set -> Nullable<Array<Nullable<Int8>>>,
cplane_notified_generation -> Int4,
deleted_at -> Nullable<Timestamptz>,
}
}
diesel::allow_tables_to_appear_in_same_query!(
controllers,
metadata_health,
nodes,
safekeeper_timeline_pending_ops,
safekeepers,
tenant_shards,
timelines,
);

View File

@@ -1,5 +1,6 @@
pub mod chaos_injector;
mod context_iterator;
pub(crate) mod safekeeper_reconciler;
use std::borrow::Cow;
use std::cmp::Ordering;
@@ -34,11 +35,12 @@ use pageserver_api::controller_api::{
};
use pageserver_api::models::{
self, LocationConfig, LocationConfigListResponse, LocationConfigMode, PageserverUtilization,
SecondaryProgress, ShardParameters, TenantConfig, TenantConfigPatchRequest,
TenantConfigRequest, TenantLocationConfigRequest, TenantLocationConfigResponse,
TenantShardLocation, TenantShardSplitRequest, TenantShardSplitResponse, TenantSorting,
TenantTimeTravelRequest, TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineInfo,
TopTenantShardItem, TopTenantShardsRequest,
SafekeeperInfo, SafekeepersInfo, SecondaryProgress, ShardParameters, TenantConfig,
TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest,
TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest,
TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateResponseStorcon,
TimelineInfo, TopTenantShardItem, TopTenantShardsRequest,
};
use pageserver_api::shard::{
ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
@@ -49,14 +51,18 @@ use pageserver_api::upcall_api::{
};
use pageserver_client::{BlockUnblock, mgmt_api};
use reqwest::{Certificate, StatusCode};
use safekeeper_api::membership::{MemberSet, SafekeeperId};
use safekeeper_api::models::SafekeeperUtilization;
use safekeeper_reconciler::{SafekeeperReconcilers, ScheduleRequest};
use tokio::sync::TryAcquireError;
use tokio::sync::mpsc::error::TrySendError;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
use utils::completion::Barrier;
use utils::generation::Generation;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
use utils::sync::gate::Gate;
use utils::{failpoint_support, pausable_failpoint};
@@ -77,8 +83,8 @@ use crate::peer_client::GlobalObservedState;
use crate::persistence::split_state::SplitState;
use crate::persistence::{
AbortShardSplitStatus, ControllerPersistence, DatabaseError, DatabaseResult,
MetadataHealthPersistence, Persistence, ShardGenerationState, TenantFilter,
TenantShardPersistence,
MetadataHealthPersistence, Persistence, SafekeeperTimelineOpKind, ShardGenerationState,
TenantFilter, TenantShardPersistence, TimelinePendingOpPersistence, TimelinePersistence,
};
use crate::reconciler::{
ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder, ReconcilerPriority,
@@ -202,6 +208,8 @@ struct ServiceState {
safekeepers: Arc<HashMap<NodeId, Safekeeper>>,
safekeeper_reconcilers: SafekeeperReconcilers,
scheduler: Scheduler,
/// Ongoing background operation on the cluster if any is running.
@@ -274,6 +282,7 @@ impl ServiceState {
scheduler: Scheduler,
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
initial_leadership_status: LeadershipStatus,
reconcilers_cancel: CancellationToken,
) -> Self {
metrics::update_leadership_status(initial_leadership_status);
@@ -282,6 +291,7 @@ impl ServiceState {
tenants,
nodes: Arc::new(nodes),
safekeepers: Arc::new(safekeepers),
safekeeper_reconcilers: SafekeeperReconcilers::new(reconcilers_cancel),
scheduler,
ongoing_operation: None,
delayed_reconcile_rx,
@@ -401,9 +411,12 @@ pub struct Config {
pub long_reconcile_threshold: Duration,
pub use_https_pageserver_api: bool,
pub use_https_safekeeper_api: bool,
pub ssl_ca_cert: Option<Certificate>,
pub timelines_onto_safekeepers: bool,
}
impl From<DatabaseError> for ApiError {
@@ -742,7 +755,27 @@ impl Service {
std::process::exit(1);
}
self.inner.write().unwrap().become_leader();
let safekeepers = self.inner.read().unwrap().safekeepers.clone();
let sk_schedule_requests =
match safekeeper_reconciler::load_schedule_requests(self, &safekeepers).await {
Ok(v) => v,
Err(e) => {
tracing::warn!(
"Failed to load safekeeper pending ops at startup: {e}." // Don't abort for now: " Aborting start-up..."
);
// std::process::exit(1);
Vec::new()
}
};
{
let mut locked = self.inner.write().unwrap();
locked.become_leader();
locked
.safekeeper_reconcilers
.schedule_request_vec(self, sk_schedule_requests);
}
// TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
// generation_pageserver in the database.
@@ -1059,6 +1092,7 @@ impl Service {
}
}
}
/// Heartbeat all storage nodes once in a while.
#[instrument(skip_all)]
async fn spawn_heartbeat_driver(&self) {
self.startup_complete.clone().wait().await;
@@ -1607,6 +1641,7 @@ impl Service {
scheduler,
delayed_reconcile_rx,
initial_leadership_status,
reconcilers_cancel.clone(),
))),
config: config.clone(),
persistence,
@@ -3448,7 +3483,7 @@ impl Service {
Ok(())
}
pub(crate) async fn tenant_timeline_create(
pub(crate) async fn tenant_timeline_create_pageservers(
&self,
tenant_id: TenantId,
mut create_req: TimelineCreateRequest,
@@ -3459,14 +3494,6 @@ impl Service {
create_req.new_timeline_id,
);
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::TimelineCreate,
)
.await;
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
if targets.0.is_empty() {
return Err(ApiError::NotFound(
@@ -3593,6 +3620,323 @@ impl Service {
.await?
}
/// Timeline creation on safekeepers
///
/// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
/// where `left` contains the list of safekeepers that didn't have a successful response.
/// Assumes tenant lock is held while calling this function.
async fn tenant_timeline_create_safekeepers_quorum(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
pg_version: u32,
timeline_persistence: &TimelinePersistence,
) -> Result<Vec<NodeId>, ApiError> {
// If quorum is reached, return if we are outside of a specified timeout
let jwt = self
.config
.safekeeper_jwt_token
.clone()
.map(SecretString::from);
let mut joinset = JoinSet::new();
let safekeepers = {
let locked = self.inner.read().unwrap();
locked.safekeepers.clone()
};
let mut members = Vec::new();
for sk_id in timeline_persistence.sk_set.iter() {
let sk_id = NodeId(*sk_id as u64);
let Some(safekeeper) = safekeepers.get(&sk_id) else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"couldn't find entry for safekeeper with id {sk_id}"
)))?;
};
members.push(SafekeeperId {
id: sk_id,
host: safekeeper.skp.host.clone(),
pg_port: safekeeper.skp.port as u16,
});
}
let mset = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
let mconf = safekeeper_api::membership::Configuration::new(mset);
let req = safekeeper_api::models::TimelineCreateRequest {
commit_lsn: None,
mconf,
pg_version,
start_lsn: timeline_persistence.start_lsn.0,
system_id: None,
tenant_id,
timeline_id,
wal_seg_size: None,
};
const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
for sk in timeline_persistence.sk_set.iter() {
let sk_id = NodeId(*sk as u64);
let safekeepers = safekeepers.clone();
let jwt = jwt.clone();
let ssl_ca_cert = self.config.ssl_ca_cert.clone();
let req = req.clone();
joinset.spawn(async move {
// Unwrap is fine as we already would have returned error above
let sk_p = safekeepers.get(&sk_id).unwrap();
let res = sk_p
.with_client_retries(
|client| {
let req = req.clone();
async move { client.create_timeline(&req).await }
},
&jwt,
&ssl_ca_cert,
3,
3,
SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
&CancellationToken::new(),
)
.await;
(sk_id, sk_p.skp.host.clone(), res)
});
}
// After we have built the joinset, we now wait for the tasks to complete,
// but with a specified timeout to make sure we return swiftly, either with
// a failure or success.
let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
// Wait until all tasks finish or timeout is hit, whichever occurs
// first.
let mut reconcile_results = Vec::new();
loop {
if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
{
let Some(res) = res else { break };
match res {
Ok(res) => {
tracing::info!(
"response from safekeeper id:{} at {}: {:?}",
res.0,
res.1,
res.2
);
reconcile_results.push(res);
}
Err(join_err) => {
tracing::info!("join_err for task in joinset: {join_err}");
}
}
} else {
tracing::info!(
"timeout for creation call after {} responses",
reconcile_results.len()
);
break;
}
}
// Now check now if quorum was reached in reconcile_results.
let total_result_count = reconcile_results.len();
let remaining = reconcile_results
.into_iter()
.filter_map(|res| res.2.is_err().then_some(res.0))
.collect::<Vec<_>>();
tracing::info!(
"Got {} non-successful responses from initial creation request of total {total_result_count} responses",
remaining.len()
);
if remaining.len() >= 2 {
// Failure
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"not enough successful reconciliations to reach quorum, please retry: {} errored",
remaining.len()
)));
}
Ok(remaining)
}
/// Create timeline in controller database and on safekeepers.
/// `timeline_info` is result of timeline creation on pageserver.
///
/// All actions must be idempotent as the call is retried until success. It
/// tries to create timeline in the db and on at least majority of
/// safekeepers + queue creation for safekeepers which missed it in the db
/// for infinite retries; after that, call returns Ok.
///
/// The idea is that once this is reached as long as we have alive majority
/// of safekeepers it is expected to get eventually operational as storcon
/// will be able to seed timeline on nodes which missed creation by making
/// pull_timeline from peers. On the other hand we don't want to fail
/// timeline creation if one safekeeper is down.
async fn tenant_timeline_create_safekeepers(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_info: &TimelineInfo,
create_mode: models::TimelineCreateRequestMode,
) -> Result<SafekeepersInfo, ApiError> {
let timeline_id = timeline_info.timeline_id;
let pg_version = timeline_info.pg_version;
// Initially start_lsn is determined by last_record_lsn in pageserver
// response as it does initdb. However, later we persist it and in sk
// creation calls replace with the value from the timeline row if it
// previously existed as on retries in theory endpoint might have
// already written some data and advanced last_record_lsn, while we want
// safekeepers to have consistent start_lsn.
let start_lsn = match create_mode {
models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
)))?;
}
};
// Choose initial set of safekeepers respecting affinity
let sks = self.safekeepers_for_new_timeline().await?;
let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
// Add timeline to db
let mut timeline_persist = TimelinePersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
start_lsn: start_lsn.into(),
generation: 0,
sk_set: sks_persistence.clone(),
new_sk_set: None,
cplane_notified_generation: 0,
deleted_at: None,
};
let inserted = self
.persistence
.insert_timeline(timeline_persist.clone())
.await?;
if !inserted {
if let Some(existent_persist) = self
.persistence
.get_timeline(tenant_id, timeline_id)
.await?
{
// Replace with what we have in the db, to get stuff like the generation right.
// We do still repeat the http calls to the safekeepers. After all, we could have
// crashed right after the wrote to the DB.
timeline_persist = existent_persist;
} else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"insertion said timeline already in db, but looking it up, it was gone"
)));
}
}
// Create the timeline on a quorum of safekeepers
let remaining = self
.tenant_timeline_create_safekeepers_quorum(
tenant_id,
timeline_id,
pg_version,
&timeline_persist,
)
.await?;
// For the remaining safekeepers, take care of their reconciliation asynchronously
for &remaining_id in remaining.iter() {
let pending_op = TimelinePendingOpPersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: timeline_persist.generation,
op_kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
sk_id: remaining_id.0 as i64,
};
tracing::info!("writing pending op for sk id {remaining_id}");
self.persistence.insert_pending_op(pending_op).await?;
}
if !remaining.is_empty() {
let mut locked = self.inner.write().unwrap();
for remaining_id in remaining {
let Some(sk) = locked.safekeepers.get(&remaining_id) else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Couldn't find safekeeper with id {remaining_id}"
)));
};
let Ok(host_list) = sks
.iter()
.map(|sk| {
Ok((
sk.id,
locked
.safekeepers
.get(&sk.id)
.ok_or_else(|| {
ApiError::InternalServerError(anyhow::anyhow!(
"Couldn't find safekeeper with id {remaining_id} to pull from"
))
})?
.base_url(),
))
})
.collect::<Result<_, ApiError>>()
else {
continue;
};
let req = ScheduleRequest {
safekeeper: Box::new(sk.clone()),
host_list,
tenant_id,
timeline_id,
generation: timeline_persist.generation as u32,
kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
};
locked.safekeeper_reconcilers.schedule_request(self, req);
}
}
Ok(SafekeepersInfo {
generation: timeline_persist.generation as u32,
safekeepers: sks,
tenant_id,
timeline_id,
})
}
pub(crate) async fn tenant_timeline_create(
self: &Arc<Self>,
tenant_id: TenantId,
create_req: TimelineCreateRequest,
) -> Result<TimelineCreateResponseStorcon, ApiError> {
let safekeepers = self.config.timelines_onto_safekeepers;
tracing::info!(
%safekeepers,
"Creating timeline {}/{}",
tenant_id,
create_req.new_timeline_id,
);
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::TimelineCreate,
)
.await;
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
let create_mode = create_req.mode.clone();
let timeline_info = self
.tenant_timeline_create_pageservers(tenant_id, create_req)
.await?;
let safekeepers = if safekeepers {
let res = self
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, create_mode)
.instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id))
.await?;
Some(res)
} else {
None
};
Ok(TimelineCreateResponseStorcon {
timeline_info,
safekeepers,
})
}
pub(crate) async fn tenant_timeline_archival_config(
&self,
tenant_id: TenantId,
@@ -4138,7 +4482,7 @@ impl Service {
}
pub(crate) async fn tenant_timeline_delete(
&self,
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<StatusCode, ApiError> {
@@ -4150,7 +4494,7 @@ impl Service {
)
.await;
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
let status_code = self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
if targets.0.is_empty() {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant not found").into(),
@@ -4226,7 +4570,67 @@ impl Service {
)
.await?;
Ok(shard_zero_status)
}).await?
}).await?;
self.tenant_timeline_delete_safekeepers(tenant_id, timeline_id)
.await?;
status_code
}
/// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
async fn tenant_timeline_delete_safekeepers(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<(), ApiError> {
let tl = self
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let Some(tl) = tl else {
tracing::info!(
"timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed"
);
return Ok(());
};
let all_sks = tl
.new_sk_set
.iter()
.flat_map(|sks| {
sks.iter()
.map(|sk| (*sk, SafekeeperTimelineOpKind::Exclude))
})
.chain(
tl.sk_set
.iter()
.map(|v| (*v, SafekeeperTimelineOpKind::Delete)),
)
.collect::<HashMap<_, _>>();
// Schedule reconciliations
{
let mut locked = self.inner.write().unwrap();
for (sk_id, kind) in all_sks {
let sk_id = NodeId(sk_id as u64);
let Some(sk) = locked.safekeepers.get(&sk_id) else {
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Couldn't find safekeeper with id {sk_id}"
)));
};
let req = ScheduleRequest {
safekeeper: Box::new(sk.clone()),
// we don't use this for this kind, put a dummy value
host_list: Vec::new(),
tenant_id,
timeline_id,
generation: tl.generation as u32,
kind,
};
locked.safekeeper_reconcilers.schedule_request(self, req);
}
}
Ok(())
}
/// When you know the TenantId but not a specific shard, and would like to get the node holding shard 0.
@@ -8262,6 +8666,68 @@ impl Service {
global_observed
}
/// Choose safekeepers for the new timeline: 3 in different azs.
pub(crate) async fn safekeepers_for_new_timeline(
&self,
) -> Result<Vec<SafekeeperInfo>, ApiError> {
let mut all_safekeepers = {
let locked = self.inner.read().unwrap();
locked
.safekeepers
.iter()
.filter_map(|sk| {
if sk.1.scheduling_policy() != SkSchedulingPolicy::Active {
// If we don't want to schedule stuff onto the safekeeper, respect that.
return None;
}
let utilization_opt = if let SafekeeperState::Available {
last_seen_at: _,
utilization,
} = sk.1.availability()
{
Some(utilization)
} else {
// non-available safekeepers still get a chance for new timelines,
// but put them last in the list.
None
};
let info = SafekeeperInfo {
hostname: sk.1.skp.host.clone(),
id: NodeId(sk.1.skp.id as u64),
};
Some((utilization_opt, info, sk.1.skp.availability_zone_id.clone()))
})
.collect::<Vec<_>>()
};
all_safekeepers.sort_by_key(|sk| {
(
sk.0.as_ref()
.map(|ut| ut.timeline_count)
.unwrap_or(u64::MAX),
// Use the id to decide on equal scores for reliability
sk.1.id.0,
)
});
let mut sks = Vec::new();
let mut azs = HashSet::new();
for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {
if !azs.insert(az_id) {
continue;
}
sks.push(sk_info.clone());
if sks.len() == 3 {
break;
}
}
if sks.len() == 3 {
Ok(sks)
} else {
Err(ApiError::InternalServerError(anyhow::anyhow!(
"couldn't find three safekeepers in different AZs for new timeline"
)))
}
}
pub(crate) async fn safekeepers_list(
&self,
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
@@ -8350,6 +8816,13 @@ impl Service {
.ok_or(DatabaseError::Logical("Not found".to_string()))?;
sk.set_scheduling_policy(scheduling_policy);
match scheduling_policy {
SkSchedulingPolicy::Active => (),
SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => {
locked.safekeeper_reconcilers.cancel_safekeeper(node_id);
}
}
locked.safekeepers = Arc::new(safekeepers);
}
Ok(())

View File

@@ -0,0 +1,340 @@
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use clashmap::{ClashMap, Entry};
use safekeeper_api::models::PullTimelineRequest;
use safekeeper_client::mgmt_api;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
};
use crate::{
persistence::SafekeeperTimelineOpKind, safekeeper::Safekeeper,
safekeeper_client::SafekeeperClient,
};
use super::Service;
pub(crate) struct SafekeeperReconcilers {
cancel: CancellationToken,
reconcilers: HashMap<NodeId, ReconcilerHandle>,
}
impl SafekeeperReconcilers {
pub fn new(cancel: CancellationToken) -> Self {
SafekeeperReconcilers {
cancel,
reconcilers: HashMap::new(),
}
}
pub(crate) fn schedule_request_vec(
&mut self,
service: &Arc<Service>,
reqs: Vec<ScheduleRequest>,
) {
for req in reqs {
self.schedule_request(service, req);
}
}
pub(crate) fn schedule_request(&mut self, service: &Arc<Service>, req: ScheduleRequest) {
let node_id = req.safekeeper.get_id();
let reconciler_handle = self.reconcilers.entry(node_id).or_insert_with(|| {
SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone())
});
reconciler_handle.schedule_reconcile(req);
}
pub(crate) fn cancel_safekeeper(&mut self, node_id: NodeId) {
if let Some(handle) = self.reconcilers.remove(&node_id) {
handle.cancel.cancel();
}
}
}
/// Initial load of the pending operations from the db
pub(crate) async fn load_schedule_requests(
service: &Arc<Service>,
safekeepers: &HashMap<NodeId, Safekeeper>,
) -> anyhow::Result<Vec<ScheduleRequest>> {
let pending_ops = service.persistence.list_pending_ops(None).await?;
let mut res = Vec::with_capacity(pending_ops.len());
for op_persist in pending_ops {
let node_id = NodeId(op_persist.sk_id as u64);
let Some(sk) = safekeepers.get(&node_id) else {
// This shouldn't happen, at least the safekeeper should exist as decomissioned.
tracing::warn!(
tenant_id = op_persist.tenant_id,
timeline_id = op_persist.timeline_id,
"couldn't find safekeeper with pending op id {node_id} in list of stored safekeepers"
);
continue;
};
let sk = Box::new(sk.clone());
let tenant_id = TenantId::from_str(&op_persist.tenant_id)?;
let timeline_id = TimelineId::from_str(&op_persist.timeline_id)?;
let host_list = match op_persist.op_kind {
SafekeeperTimelineOpKind::Delete => Vec::new(),
SafekeeperTimelineOpKind::Exclude => Vec::new(),
SafekeeperTimelineOpKind::Pull => {
// TODO this code is super hacky, it doesn't take migrations into account
let timeline_persist = service
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let Some(timeline_persist) = timeline_persist else {
// This shouldn't happen, the timeline should still exist
tracing::warn!(
tenant_id = op_persist.tenant_id,
timeline_id = op_persist.timeline_id,
"couldn't find timeline for corresponding pull op"
);
continue;
};
timeline_persist
.sk_set
.iter()
.filter_map(|sk_id| {
let other_node_id = NodeId(*sk_id as u64);
if node_id == other_node_id {
// We obviously don't want to pull from ourselves
return None;
}
let Some(sk) = safekeepers.get(&other_node_id) else {
tracing::warn!(
"couldnt find safekeeper with pending op id {other_node_id}, not pulling from it"
);
return None;
};
Some((other_node_id, sk.base_url()))
})
.collect::<Vec<_>>()
}
};
let req = ScheduleRequest {
safekeeper: sk,
host_list,
tenant_id,
timeline_id,
generation: op_persist.generation as u32,
kind: op_persist.op_kind,
};
res.push(req);
}
Ok(res)
}
pub(crate) struct ScheduleRequest {
pub(crate) safekeeper: Box<Safekeeper>,
pub(crate) host_list: Vec<(NodeId, String)>,
pub(crate) tenant_id: TenantId,
pub(crate) timeline_id: TimelineId,
pub(crate) generation: u32,
pub(crate) kind: SafekeeperTimelineOpKind,
}
struct ReconcilerHandle {
tx: UnboundedSender<(ScheduleRequest, Arc<CancellationToken>)>,
ongoing_tokens: Arc<ClashMap<(TenantId, TimelineId), Arc<CancellationToken>>>,
cancel: CancellationToken,
}
impl ReconcilerHandle {
/// Obtain a new token slot, cancelling any existing reconciliations for that timeline
fn new_token_slot(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Arc<CancellationToken> {
let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
if let Entry::Occupied(entry) = &entry {
let cancel: &CancellationToken = entry.get();
cancel.cancel();
}
entry.insert(Arc::new(self.cancel.child_token())).clone()
}
fn schedule_reconcile(&self, req: ScheduleRequest) {
let cancel = self.new_token_slot(req.tenant_id, req.timeline_id);
let hostname = req.safekeeper.skp.host.clone();
if let Err(err) = self.tx.send((req, cancel)) {
tracing::info!("scheduling request onto {hostname} returned error: {err}");
}
}
}
pub(crate) struct SafekeeperReconciler {
service: Arc<Service>,
rx: UnboundedReceiver<(ScheduleRequest, Arc<CancellationToken>)>,
cancel: CancellationToken,
}
impl SafekeeperReconciler {
fn spawn(cancel: CancellationToken, service: Arc<Service>) -> ReconcilerHandle {
// We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking.
let (tx, rx) = mpsc::unbounded_channel();
let mut reconciler = SafekeeperReconciler {
service,
rx,
cancel: cancel.clone(),
};
let handle = ReconcilerHandle {
tx,
ongoing_tokens: Arc::new(ClashMap::new()),
cancel,
};
tokio::spawn(async move { reconciler.run().await });
handle
}
async fn run(&mut self) {
loop {
// TODO add parallelism with semaphore here
let req = tokio::select! {
req = self.rx.recv() => req,
_ = self.cancel.cancelled() => break,
};
let Some((req, req_cancel)) = req else { break };
if req_cancel.is_cancelled() {
continue;
}
let kind = req.kind;
let tenant_id = req.tenant_id;
let timeline_id = req.timeline_id;
self.reconcile_one(req, req_cancel)
.instrument(tracing::info_span!(
"reconcile_one",
?kind,
%tenant_id,
%timeline_id
))
.await;
}
}
async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: Arc<CancellationToken>) {
let req_host = req.safekeeper.skp.host.clone();
match req.kind {
SafekeeperTimelineOpKind::Pull => {
let our_id = req.safekeeper.get_id();
let http_hosts = req
.host_list
.iter()
.filter(|(node_id, _hostname)| *node_id != our_id)
.map(|(_, hostname)| hostname.clone())
.collect::<Vec<_>>();
let pull_req = PullTimelineRequest {
http_hosts,
tenant_id: req.tenant_id,
timeline_id: req.timeline_id,
};
self.reconcile_inner(
req,
async |client| client.pull_timeline(&pull_req).await,
|resp| {
tracing::info!(
"pulled timeline from {} onto {req_host}",
resp.safekeeper_host,
);
},
req_cancel,
)
.await;
}
SafekeeperTimelineOpKind::Exclude => {
// TODO actually exclude instead of delete here
let tenant_id = req.tenant_id;
let timeline_id = req.timeline_id;
self.reconcile_inner(
req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
},
req_cancel,
)
.await;
}
SafekeeperTimelineOpKind::Delete => {
let tenant_id = req.tenant_id;
let timeline_id = req.timeline_id;
self.reconcile_inner(
req,
async |client| client.delete_timeline(tenant_id, timeline_id).await,
|_resp| {
tracing::info!("deleted timeline from {req_host}");
},
req_cancel,
)
.await;
}
}
}
async fn reconcile_inner<T, F, U>(
&self,
req: ScheduleRequest,
closure: impl Fn(SafekeeperClient) -> F,
log_success: impl FnOnce(T) -> U,
req_cancel: Arc<CancellationToken>,
) where
F: Future<Output = Result<T, safekeeper_client::mgmt_api::Error>>,
{
let jwt = self
.service
.config
.safekeeper_jwt_token
.clone()
.map(SecretString::from);
let ssl_ca_cert = self.service.config.ssl_ca_cert.clone();
loop {
let res = req
.safekeeper
.with_client_retries(
|client| {
let closure = &closure;
async move { closure(client).await }
},
&jwt,
&ssl_ca_cert,
3,
10,
Duration::from_secs(10),
&req_cancel,
)
.await;
match res {
Ok(resp) => {
log_success(resp);
let res = self
.service
.persistence
.remove_pending_op(
req.tenant_id,
req.timeline_id,
req.safekeeper.get_id(),
req.generation,
)
.await;
if let Err(err) = res {
tracing::info!(
"couldn't remove reconciliation request onto {} from persistence: {err:?}",
req.safekeeper.skp.host
);
}
return;
}
Err(mgmt_api::Error::Cancelled) => {
// On cancellation, the code that issued it will take care of removing db entries (if needed)
return;
}
Err(e) => {
tracing::info!(
"Reconcile attempt for safekeeper {} failed, retrying after sleep: {e:?}",
req.safekeeper.skp.host
);
const SLEEP_TIME: Duration = Duration::from_secs(1);
tokio::time::sleep(SLEEP_TIME).await;
}
}
}
}
}