This commit is contained in:
Arpad Müller
2025-01-17 16:49:19 +01:00
parent e35a726b32
commit 78c4a82331
4 changed files with 121 additions and 27 deletions

View File

@@ -1073,13 +1073,17 @@ impl Persistence {
az_id: String,
#[diesel(sql_type = diesel::sql_types::Int8)]
timeline_count: i64,
#[diesel(sql_type = diesel::sql_types::Varchar)]
host: String,
#[diesel(sql_type = diesel::sql_types::Int8)]
port: i64,
}
let safekeepers: Vec<SafekeeperTimelineCountResponse> = self
.with_measured_conn(
DatabaseOperation::ListSafekeepers,
move |conn| -> DatabaseResult<_> {
let query = diesel::sql_query("\
SELECT safekeepers.id as sk_id, safekeepers.availability_zone_id as az_id, COUNT(*) as timeline_count \
SELECT safekeepers.id as sk_id, safekeepers.availability_zone_id as az_id, COUNT(*) as timeline_count, safekeepers.host as host, safekeepers.port as port \
FROM (select tenant_id, timeline_id, unnest(sk_set) as sk_id from timelines) as timelines_unnested \
JOIN safekeepers ON (safekeepers.id = timelines_unnested.id)\
");
@@ -1458,3 +1462,36 @@ pub(crate) struct TimelinePersistence {
pub(crate) cplane_notified_generation: i32,
pub(crate) status: String,
}
#[derive(PartialEq, Eq, Copy, Clone, Debug)]
pub(crate) enum TimelineStatus {
Creating,
Created,
Deleting,
Deleted,
}
impl FromStr for TimelineStatus {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"creating" => TimelineStatus::Creating,
"created" => TimelineStatus::Created,
"deleting" => TimelineStatus::Deleting,
"deleted" => TimelineStatus::Deleted,
_ => return Err(anyhow::anyhow!("unexpected timeline status: {s}")),
})
}
}
impl From<TimelineStatus> for String {
fn from(value: TimelineStatus) -> Self {
match value {
TimelineStatus::Creating => "creating",
TimelineStatus::Created => "created",
TimelineStatus::Deleting => "deleting",
TimelineStatus::Deleted => "deleted",
}
.to_string()
}
}

View File

@@ -63,7 +63,10 @@ impl SafekeeperClient {
}
}
pub(crate) async fn create_timeline(&self, req: TimelineCreateRequest) -> Result<TimelineStatus> {
pub(crate) async fn create_timeline(
&self,
req: &TimelineCreateRequest,
) -> Result<TimelineStatus> {
measured_request!(
"create_timeline",
crate::metrics::Method::Post,

View File

@@ -2,6 +2,7 @@ pub mod chaos_injector;
mod context_iterator;
use hyper::Uri;
use safekeeper_api::membership::{MemberSet, SafekeeperId};
use std::{
borrow::Cow,
cmp::Ordering,
@@ -26,7 +27,7 @@ use crate::{
peer_client::GlobalObservedState,
persistence::{
AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence,
ShardGenerationState, TenantFilter, TimelinePersistence,
ShardGenerationState, TenantFilter, TimelinePersistence, TimelineStatus,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
safekeeper_client::SafekeeperClient,
@@ -85,7 +86,6 @@ use utils::{
http::error::ApiError,
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
lsn::Lsn,
pausable_failpoint,
sync::gate::Gate,
};
@@ -3292,9 +3292,10 @@ impl Service {
)
.await;
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
let timeline_id = create_req.new_timeline_id.clone();
let timeline_id = create_req.new_timeline_id;
let create_mode = create_req.mode.clone();
let on_ps_fut = self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
let timeline_info = self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
if targets.0.is_empty() {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant not found").into(),
@@ -3410,9 +3411,26 @@ impl Service {
}
Ok(timeline_info)
});
}).await??;
let on_sk_fut = async {
if safekeepers {
let pg_version = timeline_info.pg_version;
let start_lsn = match create_mode {
models::TimelineCreateRequestMode::Bootstrap { .. } => {
timeline_info.last_record_lsn
}
models::TimelineCreateRequestMode::Branch { .. } => {
timeline_info.last_record_lsn
}
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
// Can't do return Err because of async block, must do ? plus unreachable!()
Err(ApiError::InternalServerError(anyhow!(
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
)))?;
unreachable!()
}
};
// Choose initial set of safekeepers respecting affinity
let sks = self.safekeepers_for_new_timeline().await?;
let sks_persistence = sks.iter().map(|sk| sk.0 as i64).collect::<Vec<_>>();
@@ -3424,24 +3442,11 @@ impl Service {
sk_set: sks_persistence.clone(),
new_sk_set: sks_persistence.clone(),
cplane_notified_generation: 0,
status: "creating".to_string(),
status: String::from(TimelineStatus::Creating),
};
self.persistence.insert_timeline(timeline_persist).await?;
// reconcile: create timeline on safekeepers
// If quorum is reached, return if we are outside of a specified timeout
let pg_version = 0; // TODO: pg_version
let start_lsn = Lsn::INVALID; // TODO: start_lsn
let req = safekeeper_api::models::TimelineCreateRequest {
commit_lsn: None,
mconf: safekeeper_api::membership::Configuration::empty(),
pg_version,
start_lsn,
system_id: None,
tenant_id: tenant_id.clone(),
timeline_id: timeline_id.clone(),
wal_seg_size: None,
};
let sk_persistences = self
.persistence
.list_safekeepers()
@@ -3451,6 +3456,8 @@ impl Service {
.collect::<HashMap<_, _>>();
let jwt = self.config.jwt_token.clone().map(SecretString::from);
let mut joinset = JoinSet::new();
let mut members = Vec::new();
for sk in sks.iter() {
let Some(sk_p) = sk_persistences.get(&(sk.0 as i64)) else {
// Can't do return Err because of async block, must do ? plus unreachable!()
@@ -3459,6 +3466,28 @@ impl Service {
)))?;
unreachable!()
};
members.push(SafekeeperId {
id: NodeId(sk_p.id as u64),
host: sk_p.host.clone(),
pg_port: sk_p.port as u16,
});
}
let mut mconf = safekeeper_api::membership::Configuration::empty();
mconf.members = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
let req = safekeeper_api::models::TimelineCreateRequest {
commit_lsn: None,
mconf,
pg_version,
start_lsn,
system_id: None,
tenant_id,
timeline_id,
wal_seg_size: None,
};
for sk in sks.iter() {
// Unwrap is fine as we already would have returned error above
let sk_p = sk_persistences.get(&(sk.0 as i64)).unwrap();
let sk_clone = *sk;
let base_url = sk_p.base_url();
let jwt = jwt.clone();
@@ -3474,11 +3503,17 @@ impl Service {
})
});
}
// After we have built the joinset, we now wait for the tasks to complete,
// but with a specified timeout to make sure we return swiftly, either with
// a failure or success.
const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
let reconcile_deadline =
tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
// Return within the timeout, success if a quorum was reached, then do backgroud reconciliation.
// Treat the first two tasks to finish differently, mostly when they timeout,
// because then we won't have a successful quorum.
// For the third task, we don't rely on it succeeding, and we need this to support
// continuing operations even if one safekeeper is down.
let timeout_or_quorum = tokio::time::timeout_at(reconcile_deadline, async {
(
joinset.join_next().await.unwrap(),
@@ -3517,6 +3552,25 @@ impl Service {
// TODO: maybe log?
}
// check now if quorum was reached in reconcile_results
let successful = reconcile_results
.into_iter()
.filter_map(|res| res.ok())
.collect::<Vec<_>>();
tracing::info!(
"Got {} successful results from reconciliation",
successful.len()
);
if successful.len() < 2 {
// Failure
} else if successful.len() == 3 {
// Success, state of timeline is
} else if successful.len() == 2 {
} else {
unreachable!(
"unexpected amount of successful reconciliations {}",
successful.len()
);
}
// TODO
// TODO update database state from "creating" to "created" or something
@@ -3527,9 +3581,8 @@ impl Service {
Ok((None, None))
}
};
let (on_ps_res, on_sk_res) = tokio::join!(on_ps_fut, on_sk_fut);
let on_sk_res = on_sk_fut.await;
let (safekeepers_generation, safekeepers) = on_sk_res?;
let timeline_info = on_ps_res??;
Ok(TimelineCreateResponseStorcon {
timeline_info,
safekeepers_generation,
@@ -7801,11 +7854,11 @@ impl Service {
}
}
if sks.len() == 3 {
return Ok(sks);
Ok(sks)
} else {
return Err(ApiError::InternalServerError(anyhow!(
Err(ApiError::InternalServerError(anyhow!(
"couldn't find three safekeepers in different AZs for new timeline"
)));
)))
}
}