diff --git a/Cargo.lock b/Cargo.lock index fa40009769..778ff19fec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6621,6 +6621,7 @@ dependencies = [ "bytes", "chrono", "clap", + "clashmap", "control_plane", "cron", "diesel", diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 2e8fb8f07b..ec9eb74e6f 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -177,6 +177,8 @@ pub struct NeonStorageControllerConf { #[serde(default)] pub use_https_pageserver_api: bool, + + pub timelines_onto_safekeepers: bool, } impl NeonStorageControllerConf { @@ -201,6 +203,7 @@ impl Default for NeonStorageControllerConf { heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL, long_reconcile_threshold: None, use_https_pageserver_api: false, + timelines_onto_safekeepers: false, } } } diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 1df50e211c..439d7936a7 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -584,6 +584,10 @@ impl StorageController { self.env.base_data_dir.display() )); + if self.config.timelines_onto_safekeepers { + args.push("--timelines-onto-safekeepers".to_string()); + } + background_process::start_process( COMMAND, &instance_dir, diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 749a8acc4e..13a9b5d89e 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -274,6 +274,31 @@ pub struct TimelineCreateRequest { pub mode: TimelineCreateRequestMode, } +/// Storage controller specific extensions to [`TimelineInfo`]. +#[derive(Serialize, Deserialize, Clone)] +pub struct TimelineCreateResponseStorcon { + #[serde(flatten)] + pub timeline_info: TimelineInfo, + + pub safekeepers: Option, +} + +/// Safekeepers as returned in timeline creation request to storcon or pushed to +/// cplane in the post migration hook. +#[derive(Serialize, Deserialize, Clone)] +pub struct SafekeepersInfo { + pub tenant_id: TenantId, + pub timeline_id: TimelineId, + pub generation: u32, + pub safekeepers: Vec, +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct SafekeeperInfo { + pub id: NodeId, + pub hostname: String, +} + #[derive(Serialize, Deserialize, Clone)] #[serde(untagged)] pub enum TimelineCreateRequestMode { diff --git a/libs/safekeeper_api/src/membership.rs b/libs/safekeeper_api/src/membership.rs index bb8934744a..3d4d17096e 100644 --- a/libs/safekeeper_api/src/membership.rs +++ b/libs/safekeeper_api/src/membership.rs @@ -131,6 +131,14 @@ impl Configuration { } } + pub fn new(members: MemberSet) -> Self { + Configuration { + generation: INITIAL_GENERATION, + members, + new_members: None, + } + } + /// Is `sk_id` member of the configuration? pub fn contains(&self, sk_id: NodeId) -> bool { self.members.contains(sk_id) || self.new_members.as_ref().is_some_and(|m| m.contains(sk_id)) diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 2f2aeaa429..10c703395f 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -18,7 +18,7 @@ pub struct SafekeeperStatus { pub id: NodeId, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct TimelineCreateRequest { pub tenant_id: TenantId, pub timeline_id: TimelineId, @@ -283,7 +283,7 @@ pub struct SafekeeperUtilization { } /// pull_timeline request body. -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct PullTimelineRequest { pub tenant_id: TenantId, pub timeline_id: TimelineId, diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index 6b657b5ea0..8211bdce62 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -19,6 +19,7 @@ bytes.workspace = true chrono.workspace = true clap.workspace = true cron.workspace = true +clashmap.workspace = true fail.workspace = true futures.workspace = true governor.workspace = true diff --git a/storage_controller/migrations/2025-02-14-160526_safekeeper_timelines/down.sql b/storage_controller/migrations/2025-02-14-160526_safekeeper_timelines/down.sql new file mode 100644 index 0000000000..8f75e8947e --- /dev/null +++ b/storage_controller/migrations/2025-02-14-160526_safekeeper_timelines/down.sql @@ -0,0 +1,2 @@ +DROP TABLE timelines; +DROP TABLE safekeeper_timeline_pending_ops; diff --git a/storage_controller/migrations/2025-02-14-160526_safekeeper_timelines/up.sql b/storage_controller/migrations/2025-02-14-160526_safekeeper_timelines/up.sql new file mode 100644 index 0000000000..82003ab292 --- /dev/null +++ b/storage_controller/migrations/2025-02-14-160526_safekeeper_timelines/up.sql @@ -0,0 +1,19 @@ +CREATE TABLE timelines ( + tenant_id VARCHAR NOT NULL, + timeline_id VARCHAR NOT NULL, + start_lsn pg_lsn NOT NULL, + generation INTEGER NOT NULL, + sk_set BIGINT[] NOT NULL, + new_sk_set BIGINT[], + cplane_notified_generation INTEGER NOT NULL, + deleted_at timestamptz, + PRIMARY KEY(tenant_id, timeline_id) +); +CREATE TABLE safekeeper_timeline_pending_ops ( + sk_id BIGINT NOT NULL, + tenant_id VARCHAR NOT NULL, + timeline_id VARCHAR NOT NULL, + generation INTEGER NOT NULL, + op_kind VARCHAR NOT NULL, + PRIMARY KEY(tenant_id, timeline_id, sk_id) +); diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 1d49cd85ca..46ac1cd7ca 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -144,6 +144,11 @@ struct Cli { /// Flag to use https for requests to pageserver API. #[arg(long, default_value = "false")] use_https_pageserver_api: bool, + + // Whether to put timelines onto safekeepers + #[arg(long, default_value = "false")] + timelines_onto_safekeepers: bool, + /// Flag to use https for requests to safekeeper API. #[arg(long, default_value = "false")] use_https_safekeeper_api: bool, @@ -370,6 +375,7 @@ async fn async_main() -> anyhow::Result<()> { use_https_pageserver_api: args.use_https_pageserver_api, use_https_safekeeper_api: args.use_https_safekeeper_api, ssl_ca_cert, + timelines_onto_safekeepers: args.timelines_onto_safekeepers, }; // Validate that we can connect to the database diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 939b8c6cd8..5146fe472e 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1,12 +1,15 @@ pub(crate) mod split_state; use std::collections::HashMap; +use std::io::Write; use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant}; use diesel::deserialize::{FromSql, FromSqlRow}; +use diesel::expression::AsExpression; use diesel::pg::Pg; use diesel::prelude::*; +use diesel::serialize::{IsNull, ToSql}; use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; use diesel_async::pooled_connection::bb8::Pool; use diesel_async::pooled_connection::{AsyncDieselConnectionManager, ManagerConfig}; @@ -29,7 +32,8 @@ use rustls::crypto::ring; use scoped_futures::ScopedBoxFuture; use serde::{Deserialize, Serialize}; use utils::generation::Generation; -use utils::id::{NodeId, TenantId}; +use utils::id::{NodeId, TenantId, TimelineId}; +use utils::lsn::Lsn; use self::split_state::SplitState; use crate::metrics::{ @@ -117,6 +121,11 @@ pub(crate) enum DatabaseOperation { GetLeader, UpdateLeader, SetPreferredAzs, + InsertTimeline, + GetTimeline, + InsertTimelineReconcile, + RemoveTimelineReconcile, + ListTimelineReconcile, } #[must_use] @@ -1276,6 +1285,166 @@ impl Persistence { }) .await } + + /// Persist timeline. Returns if the timeline was newly inserted. If it wasn't, we haven't done any writes. + pub(crate) async fn insert_timeline(&self, entry: TimelinePersistence) -> DatabaseResult { + use crate::schema::timelines; + + let entry = &entry; + self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| { + Box::pin(async move { + let inserted_updated = diesel::insert_into(timelines::table) + .values(entry) + .on_conflict((timelines::tenant_id, timelines::timeline_id)) + .do_nothing() + .execute(conn) + .await?; + + match inserted_updated { + 0 => Ok(false), + 1 => Ok(true), + _ => Err(DatabaseError::Logical(format!( + "unexpected number of rows ({})", + inserted_updated + ))), + } + }) + }) + .await + } + + /// Load timeline from db. Returns `None` if not present. + pub(crate) async fn get_timeline( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> DatabaseResult> { + use crate::schema::timelines::dsl; + + let tenant_id = &tenant_id; + let timeline_id = &timeline_id; + let timeline_from_db = self + .with_measured_conn(DatabaseOperation::GetTimeline, move |conn| { + Box::pin(async move { + let mut from_db: Vec = dsl::timelines + .filter( + dsl::tenant_id + .eq(&tenant_id.to_string()) + .and(dsl::timeline_id.eq(&timeline_id.to_string())), + ) + .load(conn) + .await?; + if from_db.is_empty() { + return Ok(None); + } + if from_db.len() != 1 { + return Err(DatabaseError::Logical(format!( + "unexpected number of rows ({})", + from_db.len() + ))); + } + + Ok(Some(from_db.pop().unwrap().into_persistence())) + }) + }) + .await?; + + Ok(timeline_from_db) + } + /// Persist pending op. Returns if it was newly inserted. If it wasn't, we haven't done any writes. + pub(crate) async fn insert_pending_op( + &self, + entry: TimelinePendingOpPersistence, + ) -> DatabaseResult { + use crate::schema::safekeeper_timeline_pending_ops as skpo; + // This overrides the `filter` fn used in other functions, so contain the mayhem via a function-local use + use diesel::query_dsl::methods::FilterDsl; + + let entry = &entry; + self.with_measured_conn(DatabaseOperation::InsertTimelineReconcile, move |conn| { + Box::pin(async move { + // For simplicity it makes sense to keep only the last operation + // per (tenant, timeline, sk) tuple: if we migrated a timeline + // from node and adding it back it is not necessary to remove + // data on it. Hence, generation is not part of primary key and + // we override any rows with lower generations here. + let inserted_updated = diesel::insert_into(skpo::table) + .values(entry) + .on_conflict((skpo::tenant_id, skpo::timeline_id, skpo::sk_id)) + .do_update() + .set(entry) + .filter(skpo::generation.lt(entry.generation)) + .execute(conn) + .await?; + + match inserted_updated { + 0 => Ok(false), + 1 => Ok(true), + _ => Err(DatabaseError::Logical(format!( + "unexpected number of rows ({})", + inserted_updated + ))), + } + }) + }) + .await + } + /// Remove persisted pending op. + pub(crate) async fn remove_pending_op( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + sk_id: NodeId, + generation: u32, + ) -> DatabaseResult<()> { + use crate::schema::safekeeper_timeline_pending_ops::dsl; + + let tenant_id = &tenant_id; + let timeline_id = &timeline_id; + self.with_measured_conn(DatabaseOperation::RemoveTimelineReconcile, move |conn| { + Box::pin(async move { + diesel::delete(dsl::safekeeper_timeline_pending_ops) + .filter(dsl::tenant_id.eq(tenant_id.to_string())) + .filter(dsl::timeline_id.eq(timeline_id.to_string())) + .filter(dsl::sk_id.eq(sk_id.0 as i64)) + .filter(dsl::generation.eq(generation as i32)) + .execute(conn) + .await?; + Ok(()) + }) + }) + .await + } + + /// Load pending operations from db. + pub(crate) async fn list_pending_ops( + &self, + filter_for_sk: Option, + ) -> DatabaseResult> { + use crate::schema::safekeeper_timeline_pending_ops::dsl; + + const FILTER_VAL_1: i64 = 1; + const FILTER_VAL_2: i64 = 2; + let filter_opt = filter_for_sk.map(|id| id.0 as i64); + let timeline_from_db = self + .with_measured_conn(DatabaseOperation::ListTimelineReconcile, move |conn| { + Box::pin(async move { + let from_db: Vec = + dsl::safekeeper_timeline_pending_ops + .filter( + dsl::sk_id + .eq(filter_opt.unwrap_or(FILTER_VAL_1)) + .and(dsl::sk_id.eq(filter_opt.unwrap_or(FILTER_VAL_2))), + ) + .load(conn) + .await?; + Ok(from_db) + }) + }) + .await?; + + Ok(timeline_from_db) + } } pub(crate) fn load_certs() -> anyhow::Result> { @@ -1671,3 +1840,139 @@ struct InsertUpdateSafekeeper<'a> { availability_zone_id: &'a str, scheduling_policy: Option<&'a str>, } + +#[derive(Serialize, Deserialize, FromSqlRow, AsExpression, Eq, PartialEq, Debug, Copy, Clone)] +#[diesel(sql_type = crate::schema::sql_types::PgLsn)] +pub(crate) struct LsnWrapper(pub(crate) Lsn); + +impl From for LsnWrapper { + fn from(value: Lsn) -> Self { + LsnWrapper(value) + } +} + +impl FromSql for LsnWrapper { + fn from_sql( + bytes: ::RawValue<'_>, + ) -> diesel::deserialize::Result { + let byte_arr: diesel::deserialize::Result<[u8; 8]> = bytes + .as_bytes() + .try_into() + .map_err(|_| "Can't obtain lsn from sql".into()); + Ok(LsnWrapper(Lsn(u64::from_be_bytes(byte_arr?)))) + } +} + +impl ToSql for LsnWrapper { + fn to_sql<'b>( + &'b self, + out: &mut diesel::serialize::Output<'b, '_, Pg>, + ) -> diesel::serialize::Result { + out.write_all(&u64::to_be_bytes(self.0.0)) + .map(|_| IsNull::No) + .map_err(Into::into) + } +} + +#[derive(Insertable, AsChangeset, Queryable, Selectable, Clone)] +#[diesel(table_name = crate::schema::timelines)] +pub(crate) struct TimelinePersistence { + pub(crate) tenant_id: String, + pub(crate) timeline_id: String, + pub(crate) start_lsn: LsnWrapper, + pub(crate) generation: i32, + pub(crate) sk_set: Vec, + pub(crate) new_sk_set: Option>, + pub(crate) cplane_notified_generation: i32, + pub(crate) deleted_at: Option>, +} + +/// This is separate from [TimelinePersistence] only because postgres allows NULLs +/// in arrays and there is no way to forbid that at schema level. Hence diesel +/// wants `sk_set` to be `Vec>` instead of `Vec` for +/// Queryable/Selectable. It does however allow insertions without redundant +/// Option(s), so [TimelinePersistence] doesn't have them. +#[derive(Queryable, Selectable)] +#[diesel(table_name = crate::schema::timelines)] +pub(crate) struct TimelineFromDb { + pub(crate) tenant_id: String, + pub(crate) timeline_id: String, + pub(crate) start_lsn: LsnWrapper, + pub(crate) generation: i32, + pub(crate) sk_set: Vec>, + pub(crate) new_sk_set: Option>>, + pub(crate) cplane_notified_generation: i32, + pub(crate) deleted_at: Option>, +} + +impl TimelineFromDb { + fn into_persistence(self) -> TimelinePersistence { + // We should never encounter null entries in the sets, but we need to filter them out. + // There is no way to forbid this in the schema that diesel recognizes (to our knowledge). + let sk_set = self.sk_set.into_iter().flatten().collect::>(); + let new_sk_set = self + .new_sk_set + .map(|s| s.into_iter().flatten().collect::>()); + TimelinePersistence { + tenant_id: self.tenant_id, + timeline_id: self.timeline_id, + start_lsn: self.start_lsn, + generation: self.generation, + sk_set, + new_sk_set, + cplane_notified_generation: self.cplane_notified_generation, + deleted_at: self.deleted_at, + } + } +} + +#[derive(Insertable, AsChangeset, Queryable, Selectable, Clone)] +#[diesel(table_name = crate::schema::safekeeper_timeline_pending_ops)] +pub(crate) struct TimelinePendingOpPersistence { + pub(crate) sk_id: i64, + pub(crate) tenant_id: String, + pub(crate) timeline_id: String, + pub(crate) generation: i32, + pub(crate) op_kind: SafekeeperTimelineOpKind, +} + +#[derive(Serialize, Deserialize, FromSqlRow, AsExpression, Eq, PartialEq, Debug, Copy, Clone)] +#[diesel(sql_type = diesel::sql_types::VarChar)] +pub(crate) enum SafekeeperTimelineOpKind { + Pull, + Exclude, + Delete, +} + +impl FromSql for SafekeeperTimelineOpKind { + fn from_sql( + bytes: ::RawValue<'_>, + ) -> diesel::deserialize::Result { + let bytes = bytes.as_bytes(); + match core::str::from_utf8(bytes) { + Ok(s) => match s { + "pull" => Ok(SafekeeperTimelineOpKind::Pull), + "exclude" => Ok(SafekeeperTimelineOpKind::Exclude), + "delete" => Ok(SafekeeperTimelineOpKind::Delete), + _ => Err(format!("can't parse: {s}").into()), + }, + Err(e) => Err(format!("invalid UTF-8 for op_kind: {e}").into()), + } + } +} + +impl ToSql for SafekeeperTimelineOpKind { + fn to_sql<'b>( + &'b self, + out: &mut diesel::serialize::Output<'b, '_, Pg>, + ) -> diesel::serialize::Result { + let kind_str = match self { + SafekeeperTimelineOpKind::Pull => "pull", + SafekeeperTimelineOpKind::Exclude => "exclude", + SafekeeperTimelineOpKind::Delete => "delete", + }; + out.write_all(kind_str.as_bytes()) + .map(|_| IsNull::No) + .map_err(Into::into) + } +} diff --git a/storage_controller/src/safekeeper.rs b/storage_controller/src/safekeeper.rs index 16f72ef4bc..2bd28f29af 100644 --- a/storage_controller/src/safekeeper.rs +++ b/storage_controller/src/safekeeper.rs @@ -21,6 +21,7 @@ pub struct Safekeeper { listen_https_port: Option, scheduling_policy: SkSchedulingPolicy, id: NodeId, + /// Heartbeating result. availability: SafekeeperState, // Flag from storcon's config to use https for safekeeper API. @@ -85,6 +86,9 @@ impl Safekeeper { self.scheduling_policy = scheduling_policy; self.skp.scheduling_policy = scheduling_policy.into(); } + pub(crate) fn availability(&self) -> SafekeeperState { + self.availability.clone() + } /// Perform an operation (which is given a [`SafekeeperClient`]) with retries #[allow(clippy::too_many_arguments)] pub(crate) async fn with_client_retries( diff --git a/storage_controller/src/safekeeper_client.rs b/storage_controller/src/safekeeper_client.rs index 662f6d43be..1533b6c086 100644 --- a/storage_controller/src/safekeeper_client.rs +++ b/storage_controller/src/safekeeper_client.rs @@ -57,7 +57,6 @@ impl SafekeeperClient { } } - #[allow(dead_code)] pub(crate) async fn create_timeline( &self, req: &TimelineCreateRequest, @@ -70,7 +69,6 @@ impl SafekeeperClient { ) } - #[allow(dead_code)] pub(crate) async fn delete_timeline( &self, tenant_id: TenantId, @@ -84,7 +82,6 @@ impl SafekeeperClient { ) } - #[allow(dead_code)] pub(crate) async fn pull_timeline( &self, req: &PullTimelineRequest, diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index ebfe630173..9b36376fcb 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -1,5 +1,11 @@ // @generated automatically by Diesel CLI. +pub mod sql_types { + #[derive(diesel::query_builder::QueryId, diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "pg_lsn", schema = "pg_catalog"))] + pub struct PgLsn; +} + diesel::table! { controllers (address, started_at) { address -> Varchar, @@ -30,6 +36,16 @@ diesel::table! { } } +diesel::table! { + safekeeper_timeline_pending_ops (tenant_id, timeline_id, sk_id) { + sk_id -> Int8, + tenant_id -> Varchar, + timeline_id -> Varchar, + generation -> Int4, + op_kind -> Varchar, + } +} + diesel::table! { safekeepers (id) { id -> Int8, @@ -60,10 +76,28 @@ diesel::table! { } } +diesel::table! { + use diesel::sql_types::*; + use super::sql_types::PgLsn; + + timelines (tenant_id, timeline_id) { + tenant_id -> Varchar, + timeline_id -> Varchar, + start_lsn -> PgLsn, + generation -> Int4, + sk_set -> Array>, + new_sk_set -> Nullable>>, + cplane_notified_generation -> Int4, + deleted_at -> Nullable, + } +} + diesel::allow_tables_to_appear_in_same_query!( controllers, metadata_health, nodes, + safekeeper_timeline_pending_ops, safekeepers, tenant_shards, + timelines, ); diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index b79f223a24..a06748abc6 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1,5 +1,6 @@ pub mod chaos_injector; mod context_iterator; +pub(crate) mod safekeeper_reconciler; use std::borrow::Cow; use std::cmp::Ordering; @@ -34,11 +35,12 @@ use pageserver_api::controller_api::{ }; use pageserver_api::models::{ self, LocationConfig, LocationConfigListResponse, LocationConfigMode, PageserverUtilization, - SecondaryProgress, ShardParameters, TenantConfig, TenantConfigPatchRequest, - TenantConfigRequest, TenantLocationConfigRequest, TenantLocationConfigResponse, - TenantShardLocation, TenantShardSplitRequest, TenantShardSplitResponse, TenantSorting, - TenantTimeTravelRequest, TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineInfo, - TopTenantShardItem, TopTenantShardsRequest, + SafekeeperInfo, SafekeepersInfo, SecondaryProgress, ShardParameters, TenantConfig, + TenantConfigPatchRequest, TenantConfigRequest, TenantLocationConfigRequest, + TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest, + TenantShardSplitResponse, TenantSorting, TenantTimeTravelRequest, + TimelineArchivalConfigRequest, TimelineCreateRequest, TimelineCreateResponseStorcon, + TimelineInfo, TopTenantShardItem, TopTenantShardsRequest, }; use pageserver_api::shard::{ ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId, @@ -49,14 +51,18 @@ use pageserver_api::upcall_api::{ }; use pageserver_client::{BlockUnblock, mgmt_api}; use reqwest::{Certificate, StatusCode}; +use safekeeper_api::membership::{MemberSet, SafekeeperId}; use safekeeper_api::models::SafekeeperUtilization; +use safekeeper_reconciler::{SafekeeperReconcilers, ScheduleRequest}; use tokio::sync::TryAcquireError; use tokio::sync::mpsc::error::TrySendError; +use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use utils::completion::Barrier; use utils::generation::Generation; use utils::id::{NodeId, TenantId, TimelineId}; +use utils::logging::SecretString; use utils::sync::gate::Gate; use utils::{failpoint_support, pausable_failpoint}; @@ -77,8 +83,8 @@ use crate::peer_client::GlobalObservedState; use crate::persistence::split_state::SplitState; use crate::persistence::{ AbortShardSplitStatus, ControllerPersistence, DatabaseError, DatabaseResult, - MetadataHealthPersistence, Persistence, ShardGenerationState, TenantFilter, - TenantShardPersistence, + MetadataHealthPersistence, Persistence, SafekeeperTimelineOpKind, ShardGenerationState, + TenantFilter, TenantShardPersistence, TimelinePendingOpPersistence, TimelinePersistence, }; use crate::reconciler::{ ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder, ReconcilerPriority, @@ -202,6 +208,8 @@ struct ServiceState { safekeepers: Arc>, + safekeeper_reconcilers: SafekeeperReconcilers, + scheduler: Scheduler, /// Ongoing background operation on the cluster if any is running. @@ -274,6 +282,7 @@ impl ServiceState { scheduler: Scheduler, delayed_reconcile_rx: tokio::sync::mpsc::Receiver, initial_leadership_status: LeadershipStatus, + reconcilers_cancel: CancellationToken, ) -> Self { metrics::update_leadership_status(initial_leadership_status); @@ -282,6 +291,7 @@ impl ServiceState { tenants, nodes: Arc::new(nodes), safekeepers: Arc::new(safekeepers), + safekeeper_reconcilers: SafekeeperReconcilers::new(reconcilers_cancel), scheduler, ongoing_operation: None, delayed_reconcile_rx, @@ -401,9 +411,12 @@ pub struct Config { pub long_reconcile_threshold: Duration, pub use_https_pageserver_api: bool, + pub use_https_safekeeper_api: bool, pub ssl_ca_cert: Option, + + pub timelines_onto_safekeepers: bool, } impl From for ApiError { @@ -742,7 +755,27 @@ impl Service { std::process::exit(1); } - self.inner.write().unwrap().become_leader(); + let safekeepers = self.inner.read().unwrap().safekeepers.clone(); + let sk_schedule_requests = + match safekeeper_reconciler::load_schedule_requests(self, &safekeepers).await { + Ok(v) => v, + Err(e) => { + tracing::warn!( + "Failed to load safekeeper pending ops at startup: {e}." // Don't abort for now: " Aborting start-up..." + ); + // std::process::exit(1); + Vec::new() + } + }; + + { + let mut locked = self.inner.write().unwrap(); + locked.become_leader(); + + locked + .safekeeper_reconcilers + .schedule_request_vec(self, sk_schedule_requests); + } // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that // generation_pageserver in the database. @@ -1059,6 +1092,7 @@ impl Service { } } } + /// Heartbeat all storage nodes once in a while. #[instrument(skip_all)] async fn spawn_heartbeat_driver(&self) { self.startup_complete.clone().wait().await; @@ -1607,6 +1641,7 @@ impl Service { scheduler, delayed_reconcile_rx, initial_leadership_status, + reconcilers_cancel.clone(), ))), config: config.clone(), persistence, @@ -3448,7 +3483,7 @@ impl Service { Ok(()) } - pub(crate) async fn tenant_timeline_create( + pub(crate) async fn tenant_timeline_create_pageservers( &self, tenant_id: TenantId, mut create_req: TimelineCreateRequest, @@ -3459,14 +3494,6 @@ impl Service { create_req.new_timeline_id, ); - let _tenant_lock = trace_shared_lock( - &self.tenant_op_locks, - tenant_id, - TenantOperations::TimelineCreate, - ) - .await; - failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock"); - self.tenant_remote_mutation(tenant_id, move |mut targets| async move { if targets.0.is_empty() { return Err(ApiError::NotFound( @@ -3593,6 +3620,323 @@ impl Service { .await? } + /// Timeline creation on safekeepers + /// + /// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers, + /// where `left` contains the list of safekeepers that didn't have a successful response. + /// Assumes tenant lock is held while calling this function. + async fn tenant_timeline_create_safekeepers_quorum( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + pg_version: u32, + timeline_persistence: &TimelinePersistence, + ) -> Result, ApiError> { + // If quorum is reached, return if we are outside of a specified timeout + let jwt = self + .config + .safekeeper_jwt_token + .clone() + .map(SecretString::from); + let mut joinset = JoinSet::new(); + + let safekeepers = { + let locked = self.inner.read().unwrap(); + locked.safekeepers.clone() + }; + + let mut members = Vec::new(); + for sk_id in timeline_persistence.sk_set.iter() { + let sk_id = NodeId(*sk_id as u64); + let Some(safekeeper) = safekeepers.get(&sk_id) else { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "couldn't find entry for safekeeper with id {sk_id}" + )))?; + }; + members.push(SafekeeperId { + id: sk_id, + host: safekeeper.skp.host.clone(), + pg_port: safekeeper.skp.port as u16, + }); + } + let mset = MemberSet::new(members).map_err(ApiError::InternalServerError)?; + let mconf = safekeeper_api::membership::Configuration::new(mset); + + let req = safekeeper_api::models::TimelineCreateRequest { + commit_lsn: None, + mconf, + pg_version, + start_lsn: timeline_persistence.start_lsn.0, + system_id: None, + tenant_id, + timeline_id, + wal_seg_size: None, + }; + const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); + for sk in timeline_persistence.sk_set.iter() { + let sk_id = NodeId(*sk as u64); + let safekeepers = safekeepers.clone(); + let jwt = jwt.clone(); + let ssl_ca_cert = self.config.ssl_ca_cert.clone(); + let req = req.clone(); + joinset.spawn(async move { + // Unwrap is fine as we already would have returned error above + let sk_p = safekeepers.get(&sk_id).unwrap(); + let res = sk_p + .with_client_retries( + |client| { + let req = req.clone(); + async move { client.create_timeline(&req).await } + }, + &jwt, + &ssl_ca_cert, + 3, + 3, + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT, + &CancellationToken::new(), + ) + .await; + (sk_id, sk_p.skp.host.clone(), res) + }); + } + // After we have built the joinset, we now wait for the tasks to complete, + // but with a specified timeout to make sure we return swiftly, either with + // a failure or success. + let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT; + + // Wait until all tasks finish or timeout is hit, whichever occurs + // first. + let mut reconcile_results = Vec::new(); + loop { + if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await + { + let Some(res) = res else { break }; + match res { + Ok(res) => { + tracing::info!( + "response from safekeeper id:{} at {}: {:?}", + res.0, + res.1, + res.2 + ); + reconcile_results.push(res); + } + Err(join_err) => { + tracing::info!("join_err for task in joinset: {join_err}"); + } + } + } else { + tracing::info!( + "timeout for creation call after {} responses", + reconcile_results.len() + ); + break; + } + } + + // Now check now if quorum was reached in reconcile_results. + let total_result_count = reconcile_results.len(); + let remaining = reconcile_results + .into_iter() + .filter_map(|res| res.2.is_err().then_some(res.0)) + .collect::>(); + tracing::info!( + "Got {} non-successful responses from initial creation request of total {total_result_count} responses", + remaining.len() + ); + if remaining.len() >= 2 { + // Failure + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "not enough successful reconciliations to reach quorum, please retry: {} errored", + remaining.len() + ))); + } + + Ok(remaining) + } + + /// Create timeline in controller database and on safekeepers. + /// `timeline_info` is result of timeline creation on pageserver. + /// + /// All actions must be idempotent as the call is retried until success. It + /// tries to create timeline in the db and on at least majority of + /// safekeepers + queue creation for safekeepers which missed it in the db + /// for infinite retries; after that, call returns Ok. + /// + /// The idea is that once this is reached as long as we have alive majority + /// of safekeepers it is expected to get eventually operational as storcon + /// will be able to seed timeline on nodes which missed creation by making + /// pull_timeline from peers. On the other hand we don't want to fail + /// timeline creation if one safekeeper is down. + async fn tenant_timeline_create_safekeepers( + self: &Arc, + tenant_id: TenantId, + timeline_info: &TimelineInfo, + create_mode: models::TimelineCreateRequestMode, + ) -> Result { + let timeline_id = timeline_info.timeline_id; + let pg_version = timeline_info.pg_version; + // Initially start_lsn is determined by last_record_lsn in pageserver + // response as it does initdb. However, later we persist it and in sk + // creation calls replace with the value from the timeline row if it + // previously existed as on retries in theory endpoint might have + // already written some data and advanced last_record_lsn, while we want + // safekeepers to have consistent start_lsn. + let start_lsn = match create_mode { + models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn, + models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn, + models::TimelineCreateRequestMode::ImportPgdata { .. } => { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "import pgdata doesn't specify the start lsn, aborting creation on safekeepers" + )))?; + } + }; + // Choose initial set of safekeepers respecting affinity + let sks = self.safekeepers_for_new_timeline().await?; + let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::>(); + // Add timeline to db + let mut timeline_persist = TimelinePersistence { + tenant_id: tenant_id.to_string(), + timeline_id: timeline_id.to_string(), + start_lsn: start_lsn.into(), + generation: 0, + sk_set: sks_persistence.clone(), + new_sk_set: None, + cplane_notified_generation: 0, + deleted_at: None, + }; + let inserted = self + .persistence + .insert_timeline(timeline_persist.clone()) + .await?; + if !inserted { + if let Some(existent_persist) = self + .persistence + .get_timeline(tenant_id, timeline_id) + .await? + { + // Replace with what we have in the db, to get stuff like the generation right. + // We do still repeat the http calls to the safekeepers. After all, we could have + // crashed right after the wrote to the DB. + timeline_persist = existent_persist; + } else { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "insertion said timeline already in db, but looking it up, it was gone" + ))); + } + } + // Create the timeline on a quorum of safekeepers + let remaining = self + .tenant_timeline_create_safekeepers_quorum( + tenant_id, + timeline_id, + pg_version, + &timeline_persist, + ) + .await?; + + // For the remaining safekeepers, take care of their reconciliation asynchronously + for &remaining_id in remaining.iter() { + let pending_op = TimelinePendingOpPersistence { + tenant_id: tenant_id.to_string(), + timeline_id: timeline_id.to_string(), + generation: timeline_persist.generation, + op_kind: crate::persistence::SafekeeperTimelineOpKind::Pull, + sk_id: remaining_id.0 as i64, + }; + tracing::info!("writing pending op for sk id {remaining_id}"); + self.persistence.insert_pending_op(pending_op).await?; + } + if !remaining.is_empty() { + let mut locked = self.inner.write().unwrap(); + for remaining_id in remaining { + let Some(sk) = locked.safekeepers.get(&remaining_id) else { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Couldn't find safekeeper with id {remaining_id}" + ))); + }; + let Ok(host_list) = sks + .iter() + .map(|sk| { + Ok(( + sk.id, + locked + .safekeepers + .get(&sk.id) + .ok_or_else(|| { + ApiError::InternalServerError(anyhow::anyhow!( + "Couldn't find safekeeper with id {remaining_id} to pull from" + )) + })? + .base_url(), + )) + }) + .collect::>() + else { + continue; + }; + let req = ScheduleRequest { + safekeeper: Box::new(sk.clone()), + host_list, + tenant_id, + timeline_id, + generation: timeline_persist.generation as u32, + kind: crate::persistence::SafekeeperTimelineOpKind::Pull, + }; + locked.safekeeper_reconcilers.schedule_request(self, req); + } + } + + Ok(SafekeepersInfo { + generation: timeline_persist.generation as u32, + safekeepers: sks, + tenant_id, + timeline_id, + }) + } + + pub(crate) async fn tenant_timeline_create( + self: &Arc, + tenant_id: TenantId, + create_req: TimelineCreateRequest, + ) -> Result { + let safekeepers = self.config.timelines_onto_safekeepers; + tracing::info!( + %safekeepers, + "Creating timeline {}/{}", + tenant_id, + create_req.new_timeline_id, + ); + + let _tenant_lock = trace_shared_lock( + &self.tenant_op_locks, + tenant_id, + TenantOperations::TimelineCreate, + ) + .await; + failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock"); + let create_mode = create_req.mode.clone(); + + let timeline_info = self + .tenant_timeline_create_pageservers(tenant_id, create_req) + .await?; + + let safekeepers = if safekeepers { + let res = self + .tenant_timeline_create_safekeepers(tenant_id, &timeline_info, create_mode) + .instrument(tracing::info_span!("timeline_create_safekeepers", %tenant_id, timeline_id=%timeline_info.timeline_id)) + .await?; + Some(res) + } else { + None + }; + + Ok(TimelineCreateResponseStorcon { + timeline_info, + safekeepers, + }) + } + pub(crate) async fn tenant_timeline_archival_config( &self, tenant_id: TenantId, @@ -4138,7 +4482,7 @@ impl Service { } pub(crate) async fn tenant_timeline_delete( - &self, + self: &Arc, tenant_id: TenantId, timeline_id: TimelineId, ) -> Result { @@ -4150,7 +4494,7 @@ impl Service { ) .await; - self.tenant_remote_mutation(tenant_id, move |mut targets| async move { + let status_code = self.tenant_remote_mutation(tenant_id, move |mut targets| async move { if targets.0.is_empty() { return Err(ApiError::NotFound( anyhow::anyhow!("Tenant not found").into(), @@ -4226,7 +4570,67 @@ impl Service { ) .await?; Ok(shard_zero_status) - }).await? + }).await?; + + self.tenant_timeline_delete_safekeepers(tenant_id, timeline_id) + .await?; + + status_code + } + /// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler. + async fn tenant_timeline_delete_safekeepers( + self: &Arc, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Result<(), ApiError> { + let tl = self + .persistence + .get_timeline(tenant_id, timeline_id) + .await?; + let Some(tl) = tl else { + tracing::info!( + "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed" + ); + return Ok(()); + }; + let all_sks = tl + .new_sk_set + .iter() + .flat_map(|sks| { + sks.iter() + .map(|sk| (*sk, SafekeeperTimelineOpKind::Exclude)) + }) + .chain( + tl.sk_set + .iter() + .map(|v| (*v, SafekeeperTimelineOpKind::Delete)), + ) + .collect::>(); + + // Schedule reconciliations + { + let mut locked = self.inner.write().unwrap(); + for (sk_id, kind) in all_sks { + let sk_id = NodeId(sk_id as u64); + let Some(sk) = locked.safekeepers.get(&sk_id) else { + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Couldn't find safekeeper with id {sk_id}" + ))); + }; + + let req = ScheduleRequest { + safekeeper: Box::new(sk.clone()), + // we don't use this for this kind, put a dummy value + host_list: Vec::new(), + tenant_id, + timeline_id, + generation: tl.generation as u32, + kind, + }; + locked.safekeeper_reconcilers.schedule_request(self, req); + } + } + Ok(()) } /// When you know the TenantId but not a specific shard, and would like to get the node holding shard 0. @@ -8262,6 +8666,68 @@ impl Service { global_observed } + /// Choose safekeepers for the new timeline: 3 in different azs. + pub(crate) async fn safekeepers_for_new_timeline( + &self, + ) -> Result, ApiError> { + let mut all_safekeepers = { + let locked = self.inner.read().unwrap(); + locked + .safekeepers + .iter() + .filter_map(|sk| { + if sk.1.scheduling_policy() != SkSchedulingPolicy::Active { + // If we don't want to schedule stuff onto the safekeeper, respect that. + return None; + } + let utilization_opt = if let SafekeeperState::Available { + last_seen_at: _, + utilization, + } = sk.1.availability() + { + Some(utilization) + } else { + // non-available safekeepers still get a chance for new timelines, + // but put them last in the list. + None + }; + let info = SafekeeperInfo { + hostname: sk.1.skp.host.clone(), + id: NodeId(sk.1.skp.id as u64), + }; + Some((utilization_opt, info, sk.1.skp.availability_zone_id.clone())) + }) + .collect::>() + }; + all_safekeepers.sort_by_key(|sk| { + ( + sk.0.as_ref() + .map(|ut| ut.timeline_count) + .unwrap_or(u64::MAX), + // Use the id to decide on equal scores for reliability + sk.1.id.0, + ) + }); + let mut sks = Vec::new(); + let mut azs = HashSet::new(); + for (_sk_util, sk_info, az_id) in all_safekeepers.iter() { + if !azs.insert(az_id) { + continue; + } + sks.push(sk_info.clone()); + if sks.len() == 3 { + break; + } + } + if sks.len() == 3 { + Ok(sks) + } else { + Err(ApiError::InternalServerError(anyhow::anyhow!( + "couldn't find three safekeepers in different AZs for new timeline" + ))) + } + } + pub(crate) async fn safekeepers_list( &self, ) -> Result, DatabaseError> { @@ -8350,6 +8816,13 @@ impl Service { .ok_or(DatabaseError::Logical("Not found".to_string()))?; sk.set_scheduling_policy(scheduling_policy); + match scheduling_policy { + SkSchedulingPolicy::Active => (), + SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => { + locked.safekeeper_reconcilers.cancel_safekeeper(node_id); + } + } + locked.safekeepers = Arc::new(safekeepers); } Ok(()) diff --git a/storage_controller/src/service/safekeeper_reconciler.rs b/storage_controller/src/service/safekeeper_reconciler.rs new file mode 100644 index 0000000000..4fa465c307 --- /dev/null +++ b/storage_controller/src/service/safekeeper_reconciler.rs @@ -0,0 +1,340 @@ +use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; + +use clashmap::{ClashMap, Entry}; +use safekeeper_api::models::PullTimelineRequest; +use safekeeper_client::mgmt_api; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio_util::sync::CancellationToken; +use tracing::Instrument; +use utils::{ + id::{NodeId, TenantId, TimelineId}, + logging::SecretString, +}; + +use crate::{ + persistence::SafekeeperTimelineOpKind, safekeeper::Safekeeper, + safekeeper_client::SafekeeperClient, +}; + +use super::Service; + +pub(crate) struct SafekeeperReconcilers { + cancel: CancellationToken, + reconcilers: HashMap, +} + +impl SafekeeperReconcilers { + pub fn new(cancel: CancellationToken) -> Self { + SafekeeperReconcilers { + cancel, + reconcilers: HashMap::new(), + } + } + pub(crate) fn schedule_request_vec( + &mut self, + service: &Arc, + reqs: Vec, + ) { + for req in reqs { + self.schedule_request(service, req); + } + } + pub(crate) fn schedule_request(&mut self, service: &Arc, req: ScheduleRequest) { + let node_id = req.safekeeper.get_id(); + let reconciler_handle = self.reconcilers.entry(node_id).or_insert_with(|| { + SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone()) + }); + reconciler_handle.schedule_reconcile(req); + } + pub(crate) fn cancel_safekeeper(&mut self, node_id: NodeId) { + if let Some(handle) = self.reconcilers.remove(&node_id) { + handle.cancel.cancel(); + } + } +} + +/// Initial load of the pending operations from the db +pub(crate) async fn load_schedule_requests( + service: &Arc, + safekeepers: &HashMap, +) -> anyhow::Result> { + let pending_ops = service.persistence.list_pending_ops(None).await?; + let mut res = Vec::with_capacity(pending_ops.len()); + for op_persist in pending_ops { + let node_id = NodeId(op_persist.sk_id as u64); + let Some(sk) = safekeepers.get(&node_id) else { + // This shouldn't happen, at least the safekeeper should exist as decomissioned. + tracing::warn!( + tenant_id = op_persist.tenant_id, + timeline_id = op_persist.timeline_id, + "couldn't find safekeeper with pending op id {node_id} in list of stored safekeepers" + ); + continue; + }; + let sk = Box::new(sk.clone()); + let tenant_id = TenantId::from_str(&op_persist.tenant_id)?; + let timeline_id = TimelineId::from_str(&op_persist.timeline_id)?; + let host_list = match op_persist.op_kind { + SafekeeperTimelineOpKind::Delete => Vec::new(), + SafekeeperTimelineOpKind::Exclude => Vec::new(), + SafekeeperTimelineOpKind::Pull => { + // TODO this code is super hacky, it doesn't take migrations into account + let timeline_persist = service + .persistence + .get_timeline(tenant_id, timeline_id) + .await?; + let Some(timeline_persist) = timeline_persist else { + // This shouldn't happen, the timeline should still exist + tracing::warn!( + tenant_id = op_persist.tenant_id, + timeline_id = op_persist.timeline_id, + "couldn't find timeline for corresponding pull op" + ); + continue; + }; + timeline_persist + .sk_set + .iter() + .filter_map(|sk_id| { + let other_node_id = NodeId(*sk_id as u64); + if node_id == other_node_id { + // We obviously don't want to pull from ourselves + return None; + } + let Some(sk) = safekeepers.get(&other_node_id) else { + tracing::warn!( + "couldnt find safekeeper with pending op id {other_node_id}, not pulling from it" + ); + return None; + }; + Some((other_node_id, sk.base_url())) + }) + .collect::>() + } + }; + let req = ScheduleRequest { + safekeeper: sk, + host_list, + tenant_id, + timeline_id, + generation: op_persist.generation as u32, + kind: op_persist.op_kind, + }; + res.push(req); + } + Ok(res) +} + +pub(crate) struct ScheduleRequest { + pub(crate) safekeeper: Box, + pub(crate) host_list: Vec<(NodeId, String)>, + pub(crate) tenant_id: TenantId, + pub(crate) timeline_id: TimelineId, + pub(crate) generation: u32, + pub(crate) kind: SafekeeperTimelineOpKind, +} + +struct ReconcilerHandle { + tx: UnboundedSender<(ScheduleRequest, Arc)>, + ongoing_tokens: Arc>>, + cancel: CancellationToken, +} + +impl ReconcilerHandle { + /// Obtain a new token slot, cancelling any existing reconciliations for that timeline + fn new_token_slot( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Arc { + let entry = self.ongoing_tokens.entry((tenant_id, timeline_id)); + if let Entry::Occupied(entry) = &entry { + let cancel: &CancellationToken = entry.get(); + cancel.cancel(); + } + entry.insert(Arc::new(self.cancel.child_token())).clone() + } + fn schedule_reconcile(&self, req: ScheduleRequest) { + let cancel = self.new_token_slot(req.tenant_id, req.timeline_id); + let hostname = req.safekeeper.skp.host.clone(); + if let Err(err) = self.tx.send((req, cancel)) { + tracing::info!("scheduling request onto {hostname} returned error: {err}"); + } + } +} + +pub(crate) struct SafekeeperReconciler { + service: Arc, + rx: UnboundedReceiver<(ScheduleRequest, Arc)>, + cancel: CancellationToken, +} + +impl SafekeeperReconciler { + fn spawn(cancel: CancellationToken, service: Arc) -> ReconcilerHandle { + // We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking. + let (tx, rx) = mpsc::unbounded_channel(); + let mut reconciler = SafekeeperReconciler { + service, + rx, + cancel: cancel.clone(), + }; + let handle = ReconcilerHandle { + tx, + ongoing_tokens: Arc::new(ClashMap::new()), + cancel, + }; + tokio::spawn(async move { reconciler.run().await }); + handle + } + async fn run(&mut self) { + loop { + // TODO add parallelism with semaphore here + let req = tokio::select! { + req = self.rx.recv() => req, + _ = self.cancel.cancelled() => break, + }; + let Some((req, req_cancel)) = req else { break }; + if req_cancel.is_cancelled() { + continue; + } + + let kind = req.kind; + let tenant_id = req.tenant_id; + let timeline_id = req.timeline_id; + self.reconcile_one(req, req_cancel) + .instrument(tracing::info_span!( + "reconcile_one", + ?kind, + %tenant_id, + %timeline_id + )) + .await; + } + } + async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: Arc) { + let req_host = req.safekeeper.skp.host.clone(); + match req.kind { + SafekeeperTimelineOpKind::Pull => { + let our_id = req.safekeeper.get_id(); + let http_hosts = req + .host_list + .iter() + .filter(|(node_id, _hostname)| *node_id != our_id) + .map(|(_, hostname)| hostname.clone()) + .collect::>(); + let pull_req = PullTimelineRequest { + http_hosts, + tenant_id: req.tenant_id, + timeline_id: req.timeline_id, + }; + self.reconcile_inner( + req, + async |client| client.pull_timeline(&pull_req).await, + |resp| { + tracing::info!( + "pulled timeline from {} onto {req_host}", + resp.safekeeper_host, + ); + }, + req_cancel, + ) + .await; + } + SafekeeperTimelineOpKind::Exclude => { + // TODO actually exclude instead of delete here + let tenant_id = req.tenant_id; + let timeline_id = req.timeline_id; + self.reconcile_inner( + req, + async |client| client.delete_timeline(tenant_id, timeline_id).await, + |_resp| { + tracing::info!("deleted timeline from {req_host}"); + }, + req_cancel, + ) + .await; + } + SafekeeperTimelineOpKind::Delete => { + let tenant_id = req.tenant_id; + let timeline_id = req.timeline_id; + self.reconcile_inner( + req, + async |client| client.delete_timeline(tenant_id, timeline_id).await, + |_resp| { + tracing::info!("deleted timeline from {req_host}"); + }, + req_cancel, + ) + .await; + } + } + } + async fn reconcile_inner( + &self, + req: ScheduleRequest, + closure: impl Fn(SafekeeperClient) -> F, + log_success: impl FnOnce(T) -> U, + req_cancel: Arc, + ) where + F: Future>, + { + let jwt = self + .service + .config + .safekeeper_jwt_token + .clone() + .map(SecretString::from); + let ssl_ca_cert = self.service.config.ssl_ca_cert.clone(); + loop { + let res = req + .safekeeper + .with_client_retries( + |client| { + let closure = &closure; + async move { closure(client).await } + }, + &jwt, + &ssl_ca_cert, + 3, + 10, + Duration::from_secs(10), + &req_cancel, + ) + .await; + match res { + Ok(resp) => { + log_success(resp); + let res = self + .service + .persistence + .remove_pending_op( + req.tenant_id, + req.timeline_id, + req.safekeeper.get_id(), + req.generation, + ) + .await; + if let Err(err) = res { + tracing::info!( + "couldn't remove reconciliation request onto {} from persistence: {err:?}", + req.safekeeper.skp.host + ); + } + return; + } + Err(mgmt_api::Error::Cancelled) => { + // On cancellation, the code that issued it will take care of removing db entries (if needed) + return; + } + Err(e) => { + tracing::info!( + "Reconcile attempt for safekeeper {} failed, retrying after sleep: {e:?}", + req.safekeeper.skp.host + ); + const SLEEP_TIME: Duration = Duration::from_secs(1); + tokio::time::sleep(SLEEP_TIME).await; + } + } + } + } +}