This commit is contained in:
Arpad Müller
2025-01-16 21:49:19 +01:00
parent 3d81af8975
commit e35a726b32
9 changed files with 196 additions and 6 deletions

View File

@@ -17,6 +17,7 @@ mod pageserver_client;
mod peer_client;
pub mod persistence;
mod reconciler;
mod safekeeper_client;
mod scheduler;
mod schema;
pub mod service;

View File

@@ -80,6 +80,11 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_pageserver_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Count of HTTP requests to the safekeeper that resulted in an error,
/// broken down by the safekeeper node id, request name and method
pub(crate) storage_controller_safekeeper_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Latency of HTTP requests to the pageserver, broken down by pageserver
/// node id, request name and method. This include both successful and unsuccessful
/// requests.
@@ -87,6 +92,13 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_pageserver_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Latency of HTTP requests to the safekeeper, broken down by safekeeper
/// node id, request name and method. This include both successful and unsuccessful
/// requests.
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
pub(crate) storage_controller_safekeeper_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
/// broken down by the pageserver node id, request name and method
pub(crate) storage_controller_passthrough_request_error:

View File

@@ -1393,6 +1393,9 @@ impl SafekeeperPersistence {
scheduling_policy,
})
}
pub(crate) fn base_url(&self) -> String {
format!("http://{}:{}", self.host, self.http_port)
}
}
/// What we expect from the upsert http api

View File

@@ -0,0 +1,74 @@
use safekeeper_api::models::{TimelineCreateRequest, TimelineStatus};
use safekeeper_client::mgmt_api::{Client, Result};
use utils::{id::NodeId, logging::SecretString};
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
/// controller to collect metrics in a non-intrusive manner.
///
/// Analogous to [`crate::pageserver_client::PageserverClient`].
#[derive(Debug, Clone)]
pub(crate) struct SafekeeperClient {
inner: Client,
node_id_label: String,
}
macro_rules! measured_request {
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
let labels = crate::metrics::PageserverRequestLabelGroup {
pageserver_id: $node_id,
path: $name,
method: $method,
};
let latency = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_safekeeper_request_latency;
let _timer_guard = latency.start_timer(labels.clone());
let res = $invoke;
if res.is_err() {
let error_counters = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_pageserver_request_error;
error_counters.inc(labels)
}
res
}};
}
impl SafekeeperClient {
pub(crate) fn new(
node_id: NodeId,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
#[allow(unused)]
pub(crate) fn from_client(
node_id: NodeId,
raw_client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
pub(crate) async fn create_timeline(&self, req: TimelineCreateRequest) -> Result<TimelineStatus> {
measured_request!(
"create_timeline",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.create_timeline(&req).await
)
}
}

View File

@@ -29,6 +29,7 @@ use crate::{
ShardGenerationState, TenantFilter, TimelinePersistence,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
safekeeper_client::SafekeeperClient,
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
@@ -40,7 +41,7 @@ use control_plane::storage_controller::{
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
};
use diesel::result::DatabaseErrorKind;
use futures::{stream::FuturesUnordered, StreamExt};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use itertools::Itertools;
use pageserver_api::{
controller_api::{
@@ -75,7 +76,7 @@ use pageserver_api::{
},
};
use pageserver_client::{mgmt_api, BlockUnblock};
use tokio::sync::mpsc::error::TrySendError;
use tokio::{sync::mpsc::error::TrySendError, task::JoinSet};
use tokio_util::sync::CancellationToken;
use utils::{
completion::Barrier,
@@ -83,6 +84,8 @@ use utils::{
generation::Generation,
http::error::ApiError,
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
lsn::Lsn,
pausable_failpoint,
sync::gate::Gate,
};
@@ -3424,8 +3427,101 @@ impl Service {
status: "creating".to_string(),
};
self.persistence.insert_timeline(timeline_persist).await?;
// TODO: reconcile: create timeline on safekeepers, return success if quorum is met after timeout (or all sks return before timeout)
// TODO: call /notify-safekeepers on cplane
// reconcile: create timeline on safekeepers
// If quorum is reached, return if we are outside of a specified timeout
let pg_version = 0; // TODO: pg_version
let start_lsn = Lsn::INVALID; // TODO: start_lsn
let req = safekeeper_api::models::TimelineCreateRequest {
commit_lsn: None,
mconf: safekeeper_api::membership::Configuration::empty(),
pg_version,
start_lsn,
system_id: None,
tenant_id: tenant_id.clone(),
timeline_id: timeline_id.clone(),
wal_seg_size: None,
};
let sk_persistences = self
.persistence
.list_safekeepers()
.await?
.into_iter()
.map(|p| (p.id, p))
.collect::<HashMap<_, _>>();
let jwt = self.config.jwt_token.clone().map(SecretString::from);
let mut joinset = JoinSet::new();
for sk in sks.iter() {
let Some(sk_p) = sk_persistences.get(&(sk.0 as i64)) else {
// Can't do return Err because of async block, must do ? plus unreachable!()
Err(ApiError::InternalServerError(anyhow!(
"couldn't find persisted entry for safekeeper with id {sk}"
)))?;
unreachable!()
};
let sk_clone = *sk;
let base_url = sk_p.base_url();
let jwt = jwt.clone();
let req = req.clone();
joinset.spawn(async move {
let client = SafekeeperClient::new(sk_clone, base_url, jwt);
// TODO: logging on error, retries
client.create_timeline(req).await.map_err(|e| {
ApiError::InternalServerError(
anyhow::Error::new(e)
.context("error creating timeline on safekeeper"),
)
})
});
}
const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
let reconcile_deadline =
tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
// Return within the timeout, success if a quorum was reached, then do backgroud reconciliation.
let timeout_or_quorum = tokio::time::timeout_at(reconcile_deadline, async {
(
joinset.join_next().await.unwrap(),
joinset.join_next().await.unwrap(),
)
})
.await;
let mut reconcile_results = Vec::new();
match timeout_or_quorum {
Ok((Ok(res_1), Ok(res_2))) => {
reconcile_results.push(res_1);
reconcile_results.push(res_2);
}
Ok((Err(_), Ok(_)) | (_, Err(_))) => {
Err(ApiError::InternalServerError(anyhow!(
"task was cancelled while reconciling timeline creation"
)))?;
unreachable!()
}
Err(_) => {
Err(ApiError::InternalServerError(anyhow!(
"couldn't reconcile timeline creation on safekeepers within timeout"
)))?;
unreachable!()
}
}
let timeout_or_last = tokio::time::timeout_at(
reconcile_deadline,
joinset.join_next().map(Option::unwrap),
)
.await;
if let Ok(Ok(res)) = timeout_or_last {
reconcile_results.push(res);
} else {
// No error if cancelled or timed out: we already have feedback from a quorum of safekeepers
// TODO: maybe log?
}
// check now if quorum was reached in reconcile_results
// TODO
// TODO update database state from "creating" to "created" or something
// notify cplane about creation
// TODO (this should probably be in a function so that the reconciler can use it too)
Ok::<_, ApiError>((Some(0), Some(sks)))
} else {
Ok((None, None))