diff --git a/Cargo.lock b/Cargo.lock index 1f090a27e4..18645faf2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6313,6 +6313,8 @@ dependencies = [ "rand 0.8.5", "reqwest", "routerify", + "safekeeper_api", + "safekeeper_client", "scopeguard", "serde", "serde_json", diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index db601f6b5f..29b965e740 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -280,7 +280,7 @@ pub struct TimelineCreateRequest { pub new_timeline_id: TimelineId, #[serde(flatten)] pub mode: TimelineCreateRequestMode, - /// Whether to also create timeline on the safekeepers + /// Whether to also create timeline on the safekeepers (specific to storcon API) pub safekeepers: Option, } diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 30418b0efd..35e3824c78 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -19,7 +19,7 @@ pub struct SafekeeperStatus { pub id: NodeId, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct TimelineCreateRequest { pub tenant_id: TenantId, pub timeline_id: TimelineId, diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index caaa22d0a5..0b6f537037 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -32,6 +32,8 @@ postgres_connection.workspace = true rand.workspace = true reqwest = { workspace = true, features = ["stream"] } routerify.workspace = true +safekeeper_api.workspace = true +safekeeper_client.workspace = true serde.workspace = true serde_json.workspace = true thiserror.workspace = true diff --git a/storage_controller/src/lib.rs b/storage_controller/src/lib.rs index f5823935e1..ca88c8e359 100644 --- a/storage_controller/src/lib.rs +++ b/storage_controller/src/lib.rs @@ -17,6 +17,7 @@ mod pageserver_client; mod peer_client; pub mod persistence; mod reconciler; +mod safekeeper_client; mod scheduler; mod schema; pub mod service; diff --git a/storage_controller/src/metrics.rs b/storage_controller/src/metrics.rs index 4164e3dc2b..6d67e0d130 100644 --- a/storage_controller/src/metrics.rs +++ b/storage_controller/src/metrics.rs @@ -80,6 +80,11 @@ pub(crate) struct StorageControllerMetricGroup { pub(crate) storage_controller_pageserver_request_error: measured::CounterVec, + /// Count of HTTP requests to the safekeeper that resulted in an error, + /// broken down by the safekeeper node id, request name and method + pub(crate) storage_controller_safekeeper_request_error: + measured::CounterVec, + /// Latency of HTTP requests to the pageserver, broken down by pageserver /// node id, request name and method. This include both successful and unsuccessful /// requests. @@ -87,6 +92,13 @@ pub(crate) struct StorageControllerMetricGroup { pub(crate) storage_controller_pageserver_request_latency: measured::HistogramVec, + /// Latency of HTTP requests to the safekeeper, broken down by safekeeper + /// node id, request name and method. This include both successful and unsuccessful + /// requests. + #[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))] + pub(crate) storage_controller_safekeeper_request_latency: + measured::HistogramVec, + /// Count of pass-through HTTP requests to the pageserver that resulted in an error, /// broken down by the pageserver node id, request name and method pub(crate) storage_controller_passthrough_request_error: diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index d4e980a581..31ef00f32b 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -1393,6 +1393,9 @@ impl SafekeeperPersistence { scheduling_policy, }) } + pub(crate) fn base_url(&self) -> String { + format!("http://{}:{}", self.host, self.http_port) + } } /// What we expect from the upsert http api diff --git a/storage_controller/src/safekeeper_client.rs b/storage_controller/src/safekeeper_client.rs new file mode 100644 index 0000000000..b48fe46e19 --- /dev/null +++ b/storage_controller/src/safekeeper_client.rs @@ -0,0 +1,74 @@ +use safekeeper_api::models::{TimelineCreateRequest, TimelineStatus}; +use safekeeper_client::mgmt_api::{Client, Result}; +use utils::{id::NodeId, logging::SecretString}; + +/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage +/// controller to collect metrics in a non-intrusive manner. +/// +/// Analogous to [`crate::pageserver_client::PageserverClient`]. +#[derive(Debug, Clone)] +pub(crate) struct SafekeeperClient { + inner: Client, + node_id_label: String, +} + +macro_rules! measured_request { + ($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{ + let labels = crate::metrics::PageserverRequestLabelGroup { + pageserver_id: $node_id, + path: $name, + method: $method, + }; + + let latency = &crate::metrics::METRICS_REGISTRY + .metrics_group + .storage_controller_safekeeper_request_latency; + let _timer_guard = latency.start_timer(labels.clone()); + + let res = $invoke; + + if res.is_err() { + let error_counters = &crate::metrics::METRICS_REGISTRY + .metrics_group + .storage_controller_pageserver_request_error; + error_counters.inc(labels) + } + + res + }}; +} + +impl SafekeeperClient { + pub(crate) fn new( + node_id: NodeId, + mgmt_api_endpoint: String, + jwt: Option, + ) -> Self { + Self { + inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt), + node_id_label: node_id.0.to_string(), + } + } + + #[allow(unused)] + pub(crate) fn from_client( + node_id: NodeId, + raw_client: reqwest::Client, + mgmt_api_endpoint: String, + jwt: Option, + ) -> Self { + Self { + inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt), + node_id_label: node_id.0.to_string(), + } + } + + pub(crate) async fn create_timeline(&self, req: TimelineCreateRequest) -> Result { + measured_request!( + "create_timeline", + crate::metrics::Method::Post, + &self.node_id_label, + self.inner.create_timeline(&req).await + ) + } +} diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 241229aade..f0184e576f 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -29,6 +29,7 @@ use crate::{ ShardGenerationState, TenantFilter, TimelinePersistence, }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, + safekeeper_client::SafekeeperClient, scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode}, tenant_shard::{ MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus, @@ -40,7 +41,7 @@ use control_plane::storage_controller::{ AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, }; use diesel::result::DatabaseErrorKind; -use futures::{stream::FuturesUnordered, StreamExt}; +use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use itertools::Itertools; use pageserver_api::{ controller_api::{ @@ -75,7 +76,7 @@ use pageserver_api::{ }, }; use pageserver_client::{mgmt_api, BlockUnblock}; -use tokio::sync::mpsc::error::TrySendError; +use tokio::{sync::mpsc::error::TrySendError, task::JoinSet}; use tokio_util::sync::CancellationToken; use utils::{ completion::Barrier, @@ -83,6 +84,8 @@ use utils::{ generation::Generation, http::error::ApiError, id::{NodeId, TenantId, TimelineId}, + logging::SecretString, + lsn::Lsn, pausable_failpoint, sync::gate::Gate, }; @@ -3424,8 +3427,101 @@ impl Service { status: "creating".to_string(), }; self.persistence.insert_timeline(timeline_persist).await?; - // TODO: reconcile: create timeline on safekeepers, return success if quorum is met after timeout (or all sks return before timeout) - // TODO: call /notify-safekeepers on cplane + // reconcile: create timeline on safekeepers + // If quorum is reached, return if we are outside of a specified timeout + let pg_version = 0; // TODO: pg_version + let start_lsn = Lsn::INVALID; // TODO: start_lsn + + let req = safekeeper_api::models::TimelineCreateRequest { + commit_lsn: None, + mconf: safekeeper_api::membership::Configuration::empty(), + pg_version, + start_lsn, + system_id: None, + tenant_id: tenant_id.clone(), + timeline_id: timeline_id.clone(), + wal_seg_size: None, + }; + let sk_persistences = self + .persistence + .list_safekeepers() + .await? + .into_iter() + .map(|p| (p.id, p)) + .collect::>(); + let jwt = self.config.jwt_token.clone().map(SecretString::from); + let mut joinset = JoinSet::new(); + for sk in sks.iter() { + let Some(sk_p) = sk_persistences.get(&(sk.0 as i64)) else { + // Can't do return Err because of async block, must do ? plus unreachable!() + Err(ApiError::InternalServerError(anyhow!( + "couldn't find persisted entry for safekeeper with id {sk}" + )))?; + unreachable!() + }; + let sk_clone = *sk; + let base_url = sk_p.base_url(); + let jwt = jwt.clone(); + let req = req.clone(); + joinset.spawn(async move { + let client = SafekeeperClient::new(sk_clone, base_url, jwt); + // TODO: logging on error, retries + client.create_timeline(req).await.map_err(|e| { + ApiError::InternalServerError( + anyhow::Error::new(e) + .context("error creating timeline on safekeeper"), + ) + }) + }); + } + const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); + let reconcile_deadline = + tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT; + + // Return within the timeout, success if a quorum was reached, then do backgroud reconciliation. + let timeout_or_quorum = tokio::time::timeout_at(reconcile_deadline, async { + ( + joinset.join_next().await.unwrap(), + joinset.join_next().await.unwrap(), + ) + }) + .await; + let mut reconcile_results = Vec::new(); + match timeout_or_quorum { + Ok((Ok(res_1), Ok(res_2))) => { + reconcile_results.push(res_1); + reconcile_results.push(res_2); + } + Ok((Err(_), Ok(_)) | (_, Err(_))) => { + Err(ApiError::InternalServerError(anyhow!( + "task was cancelled while reconciling timeline creation" + )))?; + unreachable!() + } + Err(_) => { + Err(ApiError::InternalServerError(anyhow!( + "couldn't reconcile timeline creation on safekeepers within timeout" + )))?; + unreachable!() + } + } + let timeout_or_last = tokio::time::timeout_at( + reconcile_deadline, + joinset.join_next().map(Option::unwrap), + ) + .await; + if let Ok(Ok(res)) = timeout_or_last { + reconcile_results.push(res); + } else { + // No error if cancelled or timed out: we already have feedback from a quorum of safekeepers + // TODO: maybe log? + } + // check now if quorum was reached in reconcile_results + // TODO + // TODO update database state from "creating" to "created" or something + + // notify cplane about creation + // TODO (this should probably be in a function so that the reconciler can use it too) Ok::<_, ApiError>((Some(0), Some(sks))) } else { Ok((None, None))