This commit is contained in:
Arpad Müller
2025-01-15 14:39:29 +01:00
parent 7d296b3cea
commit 3d81af8975
4 changed files with 175 additions and 7 deletions

View File

@@ -1104,6 +1104,7 @@ async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> any
existing_initdb_timeline_id: None,
pg_version: Some(args.pg_version),
},
safekeepers: None,
},
)
.await?;
@@ -1164,6 +1165,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
existing_initdb_timeline_id: None,
pg_version: Some(args.pg_version),
},
safekeepers: None,
};
let timeline_info = storage_controller
.tenant_timeline_create(tenant_id, create_req)
@@ -1222,6 +1224,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
ancestor_start_lsn: start_lsn,
pg_version: None,
},
safekeepers: None,
};
let timeline_info = storage_controller
.tenant_timeline_create(tenant_id, create_req)

View File

@@ -280,6 +280,18 @@ pub struct TimelineCreateRequest {
pub new_timeline_id: TimelineId,
#[serde(flatten)]
pub mode: TimelineCreateRequestMode,
/// Whether to also create timeline on the safekeepers
pub safekeepers: Option<bool>,
}
/// Storage controller specific extensions to [`TimelineInfo`].
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateResponseStorcon {
#[serde(flatten)]
pub timeline_info: TimelineInfo,
pub safekeepers: Option<Vec<NodeId>>,
pub safekeepers_generation: Option<u32>,
}
#[derive(Serialize, Deserialize, Clone)]

View File

@@ -108,6 +108,7 @@ pub(crate) enum DatabaseOperation {
ListMetadataHealthUnhealthy,
ListMetadataHealthOutdated,
ListSafekeepers,
InsertTimeline,
GetLeader,
UpdateLeader,
SetPreferredAzs,
@@ -1049,7 +1050,7 @@ impl Persistence {
pub(crate) async fn list_safekeepers(&self) -> DatabaseResult<Vec<SafekeeperPersistence>> {
let safekeepers: Vec<SafekeeperPersistence> = self
.with_measured_conn(
DatabaseOperation::ListNodes,
DatabaseOperation::ListSafekeepers,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::safekeepers::table.load::<SafekeeperPersistence>(conn)?)
},
@@ -1061,6 +1062,60 @@ impl Persistence {
Ok(safekeepers)
}
pub(crate) async fn list_safekeepers_with_timeline_count(
&self,
) -> DatabaseResult<Vec<(NodeId, String, u64)>> {
#[derive(QueryableByName, PartialEq, Debug)]
struct SafekeeperTimelineCountResponse {
#[diesel(sql_type = diesel::sql_types::Int8)]
sk_id: i64,
#[diesel(sql_type = diesel::sql_types::Varchar)]
az_id: String,
#[diesel(sql_type = diesel::sql_types::Int8)]
timeline_count: i64,
}
let safekeepers: Vec<SafekeeperTimelineCountResponse> = self
.with_measured_conn(
DatabaseOperation::ListSafekeepers,
move |conn| -> DatabaseResult<_> {
let query = diesel::sql_query("\
SELECT safekeepers.id as sk_id, safekeepers.availability_zone_id as az_id, COUNT(*) as timeline_count \
FROM (select tenant_id, timeline_id, unnest(sk_set) as sk_id from timelines) as timelines_unnested \
JOIN safekeepers ON (safekeepers.id = timelines_unnested.id)\
");
let results: Vec<_> = query.load(conn)?;
Ok(results)
},
)
.await?;
let safekeepers = safekeepers
.into_iter()
.map(|sk| {
if sk.sk_id < 0 {
return Err(DatabaseError::Logical(format!(
"invalid safekeeper id: {}",
sk.sk_id
)));
}
if sk.timeline_count < 0 {
return Err(DatabaseError::Logical(format!(
"invalid timeline count {} for sk: {}",
sk.timeline_count, sk.sk_id
)));
}
Ok((NodeId(sk.sk_id as u64), sk.az_id, sk.timeline_count as u64))
})
.collect::<Result<Vec<_>, DatabaseError>>()?;
tracing::info!(
"list_safekeepers_with_timeline_count: loaded {} safekeepers",
safekeepers.len()
);
Ok(safekeepers)
}
pub(crate) async fn safekeeper_get(
&self,
id: i64,
@@ -1135,6 +1190,32 @@ impl Persistence {
})
.await
}
/// Timelines must be persisted before we schedule them for the first time.
pub(crate) async fn insert_timeline(&self, entry: TimelinePersistence) -> DatabaseResult<()> {
use crate::schema::timelines;
self.with_measured_conn(
DatabaseOperation::InsertTimeline,
move |conn| -> DatabaseResult<()> {
let inserted_updated = diesel::insert_into(timelines::table)
.values(&entry)
.on_conflict((timelines::tenant_id, timelines::timeline_id))
.do_nothing()
.execute(conn)?;
if inserted_updated != 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({})",
inserted_updated
)));
}
Ok(())
},
)
.await
}
}
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
@@ -1362,3 +1443,15 @@ struct InsertUpdateSafekeeper<'a> {
availability_zone_id: &'a str,
scheduling_policy: Option<&'a str>,
}
#[derive(Insertable, AsChangeset)]
#[diesel(table_name = crate::schema::timelines)]
pub(crate) struct TimelinePersistence {
pub(crate) tenant_id: String,
pub(crate) timeline_id: String,
pub(crate) generation: i32,
pub(crate) sk_set: Vec<i64>,
pub(crate) new_sk_set: Vec<i64>,
pub(crate) cplane_notified_generation: i32,
pub(crate) status: String,
}

View File

@@ -26,7 +26,7 @@ use crate::{
peer_client::GlobalObservedState,
persistence::{
AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence,
ShardGenerationState, TenantFilter,
ShardGenerationState, TenantFilter, TimelinePersistence,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
@@ -35,7 +35,7 @@ use crate::{
ScheduleOptimization, ScheduleOptimizationAction,
},
};
use anyhow::Context;
use anyhow::{anyhow, Context};
use control_plane::storage_controller::{
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
};
@@ -54,7 +54,7 @@ use pageserver_api::{
},
models::{
SecondaryProgress, TenantConfigPatchRequest, TenantConfigRequest,
TimelineArchivalConfigRequest, TopTenantShardsRequest,
TimelineArchivalConfigRequest, TimelineCreateResponseStorcon, TopTenantShardsRequest,
},
};
use reqwest::StatusCode;
@@ -3273,8 +3273,10 @@ impl Service {
&self,
tenant_id: TenantId,
mut create_req: TimelineCreateRequest,
) -> Result<TimelineInfo, ApiError> {
) -> Result<TimelineCreateResponseStorcon, ApiError> {
let safekeepers = create_req.safekeepers.unwrap_or_default();
tracing::info!(
%safekeepers,
"Creating timeline {}/{}",
tenant_id,
create_req.new_timeline_id,
@@ -3287,8 +3289,9 @@ impl Service {
)
.await;
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
let timeline_id = create_req.new_timeline_id.clone();
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
let on_ps_fut = self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
if targets.0.is_empty() {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant not found").into(),
@@ -3404,8 +3407,38 @@ impl Service {
}
Ok(timeline_info)
});
let on_sk_fut = async {
if safekeepers {
// Choose initial set of safekeepers respecting affinity
let sks = self.safekeepers_for_new_timeline().await?;
let sks_persistence = sks.iter().map(|sk| sk.0 as i64).collect::<Vec<_>>();
// Add timeline to db
let timeline_persist = TimelinePersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: 0,
sk_set: sks_persistence.clone(),
new_sk_set: sks_persistence.clone(),
cplane_notified_generation: 0,
status: "creating".to_string(),
};
self.persistence.insert_timeline(timeline_persist).await?;
// TODO: reconcile: create timeline on safekeepers, return success if quorum is met after timeout (or all sks return before timeout)
// TODO: call /notify-safekeepers on cplane
Ok::<_, ApiError>((Some(0), Some(sks)))
} else {
Ok((None, None))
}
};
let (on_ps_res, on_sk_res) = tokio::join!(on_ps_fut, on_sk_fut);
let (safekeepers_generation, safekeepers) = on_sk_res?;
let timeline_info = on_ps_res??;
Ok(TimelineCreateResponseStorcon {
timeline_info,
safekeepers_generation,
safekeepers,
})
.await?
}
pub(crate) async fn tenant_timeline_archival_config(
@@ -7653,6 +7686,33 @@ impl Service {
global_observed
}
pub(crate) async fn safekeepers_for_new_timeline(&self) -> Result<Vec<NodeId>, ApiError> {
let mut all_safekeepers = self
.persistence
.list_safekeepers_with_timeline_count()
.await?;
all_safekeepers.sort_by_key(|sk| sk.2);
let mut sks = Vec::new();
let mut azs = HashSet::new();
// TODO: assign to safekeepers with smallest number of timelines
for (sk_id, az_id, _timeline_count) in all_safekeepers.iter() {
if !azs.insert(az_id) {
continue;
}
sks.push(*sk_id);
if sks.len() == 3 {
break;
}
}
if sks.len() == 3 {
return Ok(sks);
} else {
return Err(ApiError::InternalServerError(anyhow!(
"couldn't find three safekeepers in different AZs for new timeline"
)));
}
}
pub(crate) async fn safekeepers_list(
&self,
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {