Compare commits

...

19 Commits

Author SHA1 Message Date
Arpad Müller
c660d787e4 Fixes due to merge 2025-01-27 20:05:11 +01:00
Arpad Müller
ad56b9f76b Merge remote-tracking branch 'origin/main' into arpad/sk_timelines_schema 2025-01-27 19:57:34 +01:00
Arpad Müller
1745fe5c65 Add SafekeeperGeneration 2025-01-27 12:57:44 +01:00
Arpad Müller
986db002cd Address simple review comments 2025-01-25 03:33:21 +01:00
Arpad Müller
7ec08ee805 deleted_at column instead of new_sk_set 2025-01-23 00:24:58 +01:00
Arpad Müller
e56aa822e1 clippy 2025-01-22 20:35:32 +01:00
Arpad Müller
f0777cf7ac Mark timelines for deletion during tenant deletion 2025-01-22 20:08:36 +01:00
Arpad Müller
a63153f4bc Optional get_timeline result 2025-01-22 20:08:36 +01:00
Arpad Müller
00380cedd7 Add support for timeline deletion 2025-01-22 20:08:36 +01:00
Arpad Müller
bf0d53aa2d Implement the creation part of the reconciler 2025-01-22 20:08:36 +01:00
Arpad Müller
ebe9ba0cdf Add status_kind 2025-01-22 20:08:36 +01:00
Arpad Müller
1ffe95c837 remove line 2025-01-22 20:08:36 +01:00
Arpad Müller
b5c29806f0 Draft for a reconciler 2025-01-22 20:08:36 +01:00
Arpad Müller
f0fe5fae6b persist 2025-01-22 20:08:36 +01:00
Arpad Müller
e805058364 Move to different function, clears up things a little 2025-01-22 20:08:36 +01:00
Arpad Müller
78c4a82331 wip 2025-01-22 20:08:36 +01:00
Arpad Müller
e35a726b32 wip 2025-01-22 20:08:34 +01:00
Arpad Müller
3d81af8975 wip 2025-01-22 20:07:10 +01:00
Arpad Müller
7d296b3cea Add schema for timelines table 2025-01-22 20:07:10 +01:00
16 changed files with 1152 additions and 26 deletions

2
Cargo.lock generated
View File

@@ -6347,6 +6347,8 @@ dependencies = [
"rand 0.8.5",
"reqwest",
"routerify",
"safekeeper_api",
"safekeeper_client",
"scoped-futures",
"scopeguard",
"serde",

View File

@@ -1104,6 +1104,7 @@ async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> any
existing_initdb_timeline_id: None,
pg_version: Some(args.pg_version),
},
safekeepers: None,
},
)
.await?;
@@ -1164,6 +1165,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
existing_initdb_timeline_id: None,
pg_version: Some(args.pg_version),
},
safekeepers: None,
};
let timeline_info = storage_controller
.tenant_timeline_create(tenant_id, create_req)
@@ -1222,6 +1224,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
ancestor_start_lsn: start_lsn,
pg_version: None,
},
safekeepers: None,
};
let timeline_info = storage_controller
.tenant_timeline_create(tenant_id, create_req)

View File

@@ -280,6 +280,18 @@ pub struct TimelineCreateRequest {
pub new_timeline_id: TimelineId,
#[serde(flatten)]
pub mode: TimelineCreateRequestMode,
/// Whether to also create timeline on the safekeepers (specific to storcon API)
pub safekeepers: Option<bool>,
}
/// Storage controller specific extensions to [`TimelineInfo`].
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateResponseStorcon {
#[serde(flatten)]
pub timeline_info: TimelineInfo,
pub safekeepers: Option<Vec<NodeId>>,
pub safekeepers_generation: Option<u32>,
}
#[derive(Serialize, Deserialize, Clone)]

View File

@@ -19,7 +19,7 @@ pub struct SafekeeperStatus {
pub id: NodeId,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct TimelineCreateRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,

View File

@@ -144,6 +144,30 @@ impl Debug for Generation {
}
}
/// Like tenant generations, but for safekeepers
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct SafekeeperGeneration(u32);
impl SafekeeperGeneration {
pub const fn new(v: u32) -> Self {
Self(v)
}
#[track_caller]
pub fn previous(&self) -> Option<Self> {
Some(Self(self.0.checked_sub(1)?))
}
#[track_caller]
pub fn next(&self) -> Self {
Self(self.0 + 1)
}
pub fn into_inner(self) -> u32 {
self.0
}
}
#[cfg(test)]
mod test {
use super::*;

View File

@@ -32,6 +32,8 @@ postgres_connection.workspace = true
rand.workspace = true
reqwest = { workspace = true, features = ["stream"] }
routerify.workspace = true
safekeeper_api.workspace = true
safekeeper_client.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true

View File

@@ -0,0 +1 @@
DROP TABLE timelines;

View File

@@ -0,0 +1,12 @@
CREATE TABLE timelines (
tenant_id VARCHAR NOT NULL,
timeline_id VARCHAR NOT NULL,
PRIMARY KEY(tenant_id, timeline_id),
generation INTEGER NOT NULL,
sk_set BIGINT[] NOT NULL,
cplane_notified_generation INTEGER NOT NULL,
status_kind VARCHAR NOT NULL,
status VARCHAR NOT NULL,
deleted_at timestamptz
);
CREATE INDEX timelines_idx ON timelines(status_kind, deleted_at, tenant_id, timeline_id);

View File

@@ -17,6 +17,7 @@ mod pageserver_client;
mod peer_client;
pub mod persistence;
mod reconciler;
mod safekeeper_client;
mod scheduler;
mod schema;
pub mod service;

View File

@@ -10,6 +10,7 @@ use storage_controller::http::make_router;
use storage_controller::metrics::preinitialize_metrics;
use storage_controller::persistence::Persistence;
use storage_controller::service::chaos_injector::ChaosInjector;
use storage_controller::service::safekeeper_reconciler::SafekeeperReconciler;
use storage_controller::service::{
Config, Service, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT,
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
@@ -351,6 +352,24 @@ async fn async_main() -> anyhow::Result<()> {
)
});
const SAFEKEEPER_RECONCILER_INTERVAL: Duration = Duration::from_secs(120);
let safekeeper_reconciler_task = {
let service = service.clone();
let cancel = CancellationToken::new();
let cancel_bg = cancel.clone();
(
tokio::task::spawn(
async move {
let reconciler =
SafekeeperReconciler::new(service, SAFEKEEPER_RECONCILER_INTERVAL);
reconciler.run(cancel_bg).await
}
.instrument(tracing::info_span!("safekeeper_reconciler")),
),
cancel,
)
};
// Wait until we receive a signal
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?;
@@ -384,6 +403,11 @@ async fn async_main() -> anyhow::Result<()> {
chaos_cancel.cancel();
chaos_jh.await.ok();
}
// Do the same for the safekeeper reconciler
{
safekeeper_reconciler_task.1.cancel();
_ = safekeeper_reconciler_task.0.await;
}
service.shutdown().await;
tracing::info!("Service shutdown complete");

View File

@@ -80,6 +80,11 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_pageserver_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Count of HTTP requests to the safekeeper that resulted in an error,
/// broken down by the safekeeper node id, request name and method
pub(crate) storage_controller_safekeeper_request_error:
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Latency of HTTP requests to the pageserver, broken down by pageserver
/// node id, request name and method. This include both successful and unsuccessful
/// requests.
@@ -87,6 +92,13 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_pageserver_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Latency of HTTP requests to the safekeeper, broken down by safekeeper
/// node id, request name and method. This include both successful and unsuccessful
/// requests.
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
pub(crate) storage_controller_safekeeper_request_latency:
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
/// broken down by the pageserver node id, request name and method
pub(crate) storage_controller_passthrough_request_error:

View File

@@ -26,7 +26,9 @@ use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
use scoped_futures::ScopedBoxFuture;
use serde::{Deserialize, Serialize};
use utils::generation::Generation;
use utils::id::TimelineId;
use utils::id::{NodeId, TenantId};
use utils::lsn::Lsn;
use crate::metrics::{
DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
@@ -112,6 +114,9 @@ pub(crate) enum DatabaseOperation {
ListMetadataHealthUnhealthy,
ListMetadataHealthOutdated,
ListSafekeepers,
ListTimelines,
LoadTimeline,
InsertTimeline,
GetLeader,
UpdateLeader,
SetPreferredAzs,
@@ -1156,7 +1161,7 @@ impl Persistence {
/// At startup, populate the list of nodes which our shards may be placed on
pub(crate) async fn list_safekeepers(&self) -> DatabaseResult<Vec<SafekeeperPersistence>> {
let safekeepers: Vec<SafekeeperPersistence> = self
.with_measured_conn(DatabaseOperation::ListNodes, move |conn| {
.with_measured_conn(DatabaseOperation::ListSafekeepers, move |conn| {
Box::pin(async move {
Ok(crate::schema::safekeepers::table
.load::<SafekeeperPersistence>(conn)
@@ -1170,6 +1175,66 @@ impl Persistence {
Ok(safekeepers)
}
pub(crate) async fn list_safekeepers_with_timeline_count(
&self,
) -> DatabaseResult<Vec<(NodeId, String, u64)>> {
#[derive(QueryableByName, PartialEq, Debug)]
struct SafekeeperTimelineCountResponse {
#[diesel(sql_type = diesel::sql_types::Int8)]
sk_id: i64,
#[diesel(sql_type = diesel::sql_types::Varchar)]
az_id: String,
#[diesel(sql_type = diesel::sql_types::Int8)]
timeline_count: i64,
#[diesel(sql_type = diesel::sql_types::Varchar)]
host: String,
#[diesel(sql_type = diesel::sql_types::Int8)]
port: i64,
}
let safekeepers: Vec<SafekeeperTimelineCountResponse> = self
.with_measured_conn(
DatabaseOperation::ListSafekeepers,
move |conn| {
Box::pin(async move {
let query = diesel::sql_query("\
SELECT safekeepers.id as sk_id, safekeepers.availability_zone_id as az_id, COUNT(*) as timeline_count, safekeepers.host as host, safekeepers.port as port \
FROM (select tenant_id, timeline_id, unnest(sk_set) as sk_id from timelines) as timelines_unnested \
JOIN safekeepers ON (safekeepers.id = timelines_unnested.id)\
");
let results: Vec<_> = query.load(conn).await?;
Ok(results)
})
},
)
.await?;
let safekeepers = safekeepers
.into_iter()
.map(|sk| {
if sk.sk_id < 0 {
return Err(DatabaseError::Logical(format!(
"invalid safekeeper id: {}",
sk.sk_id
)));
}
if sk.timeline_count < 0 {
return Err(DatabaseError::Logical(format!(
"invalid timeline count {} for sk: {}",
sk.timeline_count, sk.sk_id
)));
}
Ok((NodeId(sk.sk_id as u64), sk.az_id, sk.timeline_count as u64))
})
.collect::<Result<Vec<_>, DatabaseError>>()?;
tracing::info!(
"list_safekeepers_with_timeline_count: loaded {} safekeepers",
safekeepers.len()
);
Ok(safekeepers)
}
pub(crate) async fn safekeeper_get(
&self,
id: i64,
@@ -1254,6 +1319,191 @@ impl Persistence {
})
.await
}
/// Timelines must be persisted before we schedule them for the first time.
pub(crate) async fn insert_timeline(&self, entry: TimelinePersistence) -> DatabaseResult<()> {
use crate::schema::timelines;
let entry = &entry;
self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| {
Box::pin(async move {
let inserted_updated = diesel::insert_into(timelines::table)
.values(entry)
.on_conflict((timelines::tenant_id, timelines::timeline_id))
.do_nothing()
.execute(conn)
.await?;
if inserted_updated != 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({})",
inserted_updated
)));
}
Ok(())
})
})
.await
}
pub(crate) async fn update_timeline_status(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
status_kind: TimelineStatusKind,
status: String,
) -> DatabaseResult<()> {
use crate::schema::timelines;
let status = &status;
self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| {
Box::pin(async move {
let inserted_updated = diesel::update(timelines::table)
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
.set((
timelines::status_kind.eq(String::from(status_kind)),
timelines::status.eq(status),
))
.execute(conn)
.await?;
if inserted_updated != 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({})",
inserted_updated
)));
}
Ok(())
})
})
.await
}
pub(crate) async fn update_timeline_status_deleted(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<()> {
use crate::schema::timelines;
let now = chrono::offset::Utc::now();
self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| {
Box::pin(async move {
let inserted_updated = diesel::update(timelines::table)
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
.filter(timelines::status_kind.eq(String::from(TimelineStatusKind::Deleting)))
.set((
timelines::status_kind.eq(String::from(TimelineStatusKind::Deleted)),
timelines::status.eq("{}"),
timelines::deleted_at.eq(now),
))
.execute(conn)
.await?;
if inserted_updated != 1 {
return Err(DatabaseError::Logical(format!(
"unexpected number of rows ({})",
inserted_updated
)));
}
Ok(())
})
})
.await
}
/// Obtains the timeline, returns None if not present
pub(crate) async fn get_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> DatabaseResult<Option<TimelinePersistence>> {
use crate::schema::timelines;
let mut timelines: Vec<TimelineFromDb> = self
.with_measured_conn(DatabaseOperation::LoadTimeline, move |conn| {
Box::pin(async move {
Ok(timelines::table
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
.load::<TimelineFromDb>(conn)
.await?)
})
})
.await?;
if timelines.is_empty() {
return Ok(None);
} else if timelines.len() > 1 {
return Err(DatabaseError::Logical(format!(
"incorrect number of returned timelines: ({})",
timelines.len()
)));
}
let tl = timelines.pop().unwrap().into_persistence();
tracing::info!("get_timeline: loaded timeline");
Ok(Some(tl))
}
/// Marks all timelines referencing a tenant for deletion
pub(crate) async fn mark_timelines_for_deletion(
&self,
del_tenant_id: TenantId,
) -> DatabaseResult<()> {
use crate::schema::timelines::dsl::*;
let count = self
.with_measured_conn(DatabaseOperation::DeleteTenant, move |conn| {
Box::pin(async move {
Ok(diesel::update(timelines)
.filter(tenant_id.eq(del_tenant_id.to_string()))
.filter(status_kind.ne(String::from(TimelineStatusKind::Deleted)))
.set((
status.eq(String::from("")),
status_kind.eq(String::from(TimelineStatusKind::Deleted)),
))
.execute(conn)
.await?)
})
})
.await?;
tracing::info!("marked {count} timelines for deletion in timelines table");
Ok(())
}
pub(crate) async fn timelines_to_be_reconciled(
&self,
) -> DatabaseResult<Vec<TimelinePersistence>> {
use crate::schema::timelines;
let timelines: Vec<TimelineFromDb> = self
.with_measured_conn(DatabaseOperation::ListTimelines, move |conn| {
Box::pin(async move {
Ok(timelines::table
.filter(
timelines::status
.eq(String::from(TimelineStatusKind::Creating))
.or(timelines::status
.eq(String::from(TimelineStatusKind::Deleting))),
)
.load::<TimelineFromDb>(conn)
.await?)
})
})
.await?;
let timelines = timelines
.into_iter()
.map(|tl| tl.into_persistence())
.collect::<Vec<_>>();
tracing::info!("list_timelines: loaded {} timelines", timelines.len());
Ok(timelines)
}
}
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
@@ -1431,6 +1681,9 @@ impl SafekeeperPersistence {
scheduling_policy,
})
}
pub(crate) fn base_url(&self) -> String {
format!("http://{}:{}", self.host, self.http_port)
}
}
/// What we expect from the upsert http api
@@ -1481,3 +1734,82 @@ struct InsertUpdateSafekeeper<'a> {
availability_zone_id: &'a str,
scheduling_policy: Option<&'a str>,
}
#[derive(Insertable, AsChangeset, Queryable, Selectable, Clone)]
#[diesel(table_name = crate::schema::timelines)]
pub(crate) struct TimelinePersistence {
pub(crate) tenant_id: String,
pub(crate) timeline_id: String,
pub(crate) generation: i32,
pub(crate) sk_set: Vec<i64>,
pub(crate) cplane_notified_generation: i32,
pub(crate) status_kind: String,
pub(crate) status: String,
}
#[derive(Queryable, Selectable)]
#[diesel(table_name = crate::schema::timelines)]
pub(crate) struct TimelineFromDb {
pub(crate) tenant_id: String,
pub(crate) timeline_id: String,
pub(crate) generation: i32,
pub(crate) sk_set: Vec<Option<i64>>,
pub(crate) cplane_notified_generation: i32,
pub(crate) status_kind: String,
pub(crate) status: String,
#[allow(unused)]
pub(crate) deleted_at: Option<chrono::DateTime<chrono::Utc>>,
}
impl TimelineFromDb {
fn into_persistence(self) -> TimelinePersistence {
TimelinePersistence {
tenant_id: self.tenant_id,
timeline_id: self.timeline_id,
generation: self.generation,
sk_set: self.sk_set.into_iter().flatten().collect(),
cplane_notified_generation: self.cplane_notified_generation,
status_kind: self.status_kind,
status: self.status,
}
}
}
#[derive(PartialEq, Eq, Copy, Clone, Debug)]
pub(crate) enum TimelineStatusKind {
Creating,
Created,
Deleting,
Deleted,
}
impl FromStr for TimelineStatusKind {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"creating" => TimelineStatusKind::Creating,
"created" => TimelineStatusKind::Created,
"deleting" => TimelineStatusKind::Deleting,
"deleted" => TimelineStatusKind::Deleted,
_ => return Err(anyhow::anyhow!("unexpected timeline status: {s}")),
})
}
}
impl From<TimelineStatusKind> for String {
fn from(value: TimelineStatusKind) -> Self {
match value {
TimelineStatusKind::Creating => "creating",
TimelineStatusKind::Created => "created",
TimelineStatusKind::Deleting => "deleting",
TimelineStatusKind::Deleted => "deleted",
}
.to_string()
}
}
#[derive(Serialize, Deserialize)]
pub(crate) struct TimelineStatusCreating {
pub(crate) pg_version: u32,
pub(crate) start_lsn: Lsn,
}

View File

@@ -0,0 +1,94 @@
use crate::metrics::PageserverRequestLabelGroup;
use safekeeper_api::models::{TimelineCreateRequest, TimelineStatus};
use safekeeper_client::mgmt_api::{Client, Result};
use utils::{
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
};
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
/// controller to collect metrics in a non-intrusive manner.
///
/// Analogous to [`crate::pageserver_client::PageserverClient`].
#[derive(Debug, Clone)]
pub(crate) struct SafekeeperClient {
inner: Client,
node_id_label: String,
}
macro_rules! measured_request {
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
let labels = PageserverRequestLabelGroup {
pageserver_id: $node_id,
path: $name,
method: $method,
};
let latency = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_safekeeper_request_latency;
let _timer_guard = latency.start_timer(labels.clone());
let res = $invoke;
if res.is_err() {
let error_counters = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_pageserver_request_error;
error_counters.inc(labels)
}
res
}};
}
impl SafekeeperClient {
pub(crate) fn new(
node_id: NodeId,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
#[allow(unused)]
pub(crate) fn from_client(
node_id: NodeId,
raw_client: reqwest::Client,
mgmt_api_endpoint: String,
jwt: Option<SecretString>,
) -> Self {
Self {
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
node_id_label: node_id.0.to_string(),
}
}
pub(crate) async fn create_timeline(
&self,
req: &TimelineCreateRequest,
) -> Result<TimelineStatus> {
measured_request!(
"create_timeline",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.create_timeline(req).await
)
}
pub(crate) async fn delete_timeline(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<TimelineStatus> {
measured_request!(
"delete_timeline",
crate::metrics::Method::Delete,
&self.node_id_label,
self.inner.delete_timeline(tenant_id, timeline_id).await
)
}
}

View File

@@ -58,10 +58,24 @@ diesel::table! {
}
}
diesel::table! {
timelines (tenant_id, timeline_id) {
tenant_id -> Varchar,
timeline_id -> Varchar,
generation -> Int4,
sk_set -> Array<Nullable<Int8>>,
cplane_notified_generation -> Int4,
status_kind -> Varchar,
status -> Varchar,
deleted_at -> Nullable<Timestamptz>,
}
}
diesel::allow_tables_to_appear_in_same_query!(
controllers,
metadata_health,
nodes,
safekeepers,
tenant_shards,
timelines,
);

View File

@@ -1,7 +1,9 @@
pub mod chaos_injector;
mod context_iterator;
pub mod safekeeper_reconciler;
use hyper::Uri;
use safekeeper_api::membership::{MemberSet, SafekeeperId};
use std::{
borrow::Cow,
cmp::Ordering,
@@ -26,21 +28,23 @@ use crate::{
peer_client::GlobalObservedState,
persistence::{
AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence,
ShardGenerationState, TenantFilter,
SafekeeperPersistence, ShardGenerationState, TenantFilter, TimelinePersistence,
TimelineStatusCreating, TimelineStatusKind,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
safekeeper_client::SafekeeperClient,
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
ScheduleOptimization, ScheduleOptimizationAction,
},
};
use anyhow::Context;
use anyhow::{anyhow, Context};
use control_plane::storage_controller::{
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
};
use diesel::result::DatabaseErrorKind;
use futures::{stream::FuturesUnordered, StreamExt};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use itertools::Itertools;
use pageserver_api::{
controller_api::{
@@ -54,7 +58,7 @@ use pageserver_api::{
},
models::{
SecondaryProgress, TenantConfigPatchRequest, TenantConfigRequest,
TimelineArchivalConfigRequest, TopTenantShardsRequest,
TimelineArchivalConfigRequest, TimelineCreateResponseStorcon, TopTenantShardsRequest,
},
};
use reqwest::StatusCode;
@@ -75,14 +79,16 @@ use pageserver_api::{
},
};
use pageserver_client::{mgmt_api, BlockUnblock};
use tokio::sync::mpsc::error::TrySendError;
use tokio::{sync::mpsc::error::TrySendError, task::JoinSet};
use tokio_util::sync::CancellationToken;
use utils::{
backoff,
completion::Barrier,
failpoint_support,
generation::Generation,
generation::{Generation, SafekeeperGeneration},
http::error::ApiError,
id::{NodeId, TenantId, TimelineId},
logging::SecretString,
pausable_failpoint,
sync::gate::Gate,
};
@@ -151,6 +157,7 @@ enum TenantOperations {
SecondaryDownload,
TimelineCreate,
TimelineDelete,
TimelineReconcile,
AttachHook,
TimelineArchivalConfig,
TimelineDetachAncestor,
@@ -3097,6 +3104,10 @@ impl Service {
self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;
self.persistence
.mark_timelines_for_deletion(tenant_id)
.await?;
// Detach all shards. This also deletes local pageserver shard data.
let (detach_waiters, node) = {
let mut detach_waiters = Vec::new();
@@ -3269,25 +3280,11 @@ impl Service {
Ok(())
}
pub(crate) async fn tenant_timeline_create(
async fn tenant_timeline_create_pageservers(
&self,
tenant_id: TenantId,
mut create_req: TimelineCreateRequest,
) -> Result<TimelineInfo, ApiError> {
tracing::info!(
"Creating timeline {}/{}",
tenant_id,
create_req.new_timeline_id,
);
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::TimelineCreate,
)
.await;
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
if targets.0.is_empty() {
return Err(ApiError::NotFound(
@@ -3404,8 +3401,261 @@ impl Service {
}
Ok(timeline_info)
}).await?
}
/// reconcile: create timeline on safekeepers
///
/// Assumes tenant lock is held while calling this function
async fn tenant_timeline_create_safekeepers_reconcile(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
timeline_persistence: &TimelinePersistence,
status_creating: &TimelineStatusCreating,
sk_persistences: &HashMap<i64, SafekeeperPersistence>,
) -> Result<(), ApiError> {
// If quorum is reached, return if we are outside of a specified timeout
let jwt = self.config.jwt_token.clone().map(SecretString::from);
let mut joinset = JoinSet::new();
let mut members = Vec::new();
for sk in timeline_persistence.sk_set.iter() {
let Some(sk_p) = sk_persistences.get(sk) else {
return Err(ApiError::InternalServerError(anyhow!(
"couldn't find persisted entry for safekeeper with id {sk}"
)))?;
};
members.push(SafekeeperId {
id: NodeId(sk_p.id as u64),
host: sk_p.host.clone(),
pg_port: sk_p.port as u16,
});
}
let mut mconf = safekeeper_api::membership::Configuration::empty();
mconf.members = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
let req = safekeeper_api::models::TimelineCreateRequest {
commit_lsn: None,
mconf,
pg_version: status_creating.pg_version,
start_lsn: status_creating.start_lsn,
system_id: None,
tenant_id,
timeline_id,
wal_seg_size: None,
};
for sk in timeline_persistence.sk_set.iter() {
// Unwrap is fine as we already would have returned error above
let sk_p = sk_persistences.get(sk).unwrap();
let sk_clone = NodeId(*sk as u64);
let base_url = sk_p.base_url();
let jwt = jwt.clone();
let req = req.clone();
let cancel = self.cancel.clone();
joinset.spawn(async move {
let client = SafekeeperClient::new(sk_clone, base_url, jwt);
let req = req;
let retry_result = backoff::retry(
|| client.create_timeline(&req),
|_e| {
// TODO find right criteria here for deciding on retries
false
},
3,
5,
"create timeline on safekeeper",
&cancel,
)
.await;
if let Some(res) = retry_result {
res.map_err(|e| {
ApiError::InternalServerError(
anyhow::Error::new(e).context("error creating timeline on safekeeper"),
)
})
} else {
Err(ApiError::Cancelled)
}
});
}
// After we have built the joinset, we now wait for the tasks to complete,
// but with a specified timeout to make sure we return swiftly, either with
// a failure or success.
const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
// Treat the first two tasks to finish differently, mostly when they timeout,
// because then we won't have a successful quorum.
// For the third task, we don't rely on it succeeding, and we need this to support
// continuing operations even if one safekeeper is down.
let timeout_or_quorum = tokio::time::timeout_at(reconcile_deadline, async {
(
joinset.join_next().await.unwrap(),
joinset.join_next().await.unwrap(),
)
})
.await;
let mut reconcile_results = Vec::new();
match timeout_or_quorum {
Ok((Ok(res_1), Ok(res_2))) => {
reconcile_results.push(res_1);
reconcile_results.push(res_2);
}
Ok((Err(_), Ok(_)) | (_, Err(_))) => {
return Err(ApiError::InternalServerError(anyhow!(
"task was cancelled while reconciling timeline creation"
)));
}
Err(_) => {
return Err(ApiError::InternalServerError(anyhow!(
"couldn't reconcile timeline creation on safekeepers within timeout"
)));
}
}
let timeout_or_last =
tokio::time::timeout_at(reconcile_deadline, joinset.join_next().map(Option::unwrap))
.await;
if let Ok(Ok(res)) = timeout_or_last {
reconcile_results.push(res);
} else {
// No error if cancelled or timed out: we already have feedback from a quorum of safekeepers
tracing::info!("timeout for third reconciliation");
}
// check now if quorum was reached in reconcile_results
let successful = reconcile_results
.into_iter()
.filter_map(|res| res.ok())
.collect::<Vec<_>>();
tracing::info!(
"Got {} successful results from reconciliation",
successful.len()
);
let status_kind = if successful.len() < 2 {
// Failure
return Err(ApiError::InternalServerError(anyhow!(
"not enough successful reconciliations to reach quorum, please retry: {}",
successful.len()
)));
} else if successful.len() == 3 {
// Success, state of timeline is Created
TimelineStatusKind::Created
} else if successful.len() == 2 {
// Success, state of timeline remains Creating
TimelineStatusKind::Creating
} else {
unreachable!(
"unexpected number of successful reconciliations {}",
successful.len()
);
};
// notify cplane about creation
// TODO
self.persistence
.update_timeline_status(tenant_id, timeline_id, status_kind, "{}".to_owned())
.await?;
Ok(())
}
async fn tenant_timeline_create_safekeepers(
&self,
tenant_id: TenantId,
timeline_info: &TimelineInfo,
create_mode: models::TimelineCreateRequestMode,
) -> Result<(SafekeeperGeneration, Vec<NodeId>), ApiError> {
let timeline_id = timeline_info.timeline_id;
let pg_version = timeline_info.pg_version;
let start_lsn = match create_mode {
models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
return Err(ApiError::InternalServerError(anyhow!(
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
)))?;
}
};
// Choose initial set of safekeepers respecting affinity
let sks = self.safekeepers_for_new_timeline().await?;
let sks_persistence = sks.iter().map(|sk| sk.0 as i64).collect::<Vec<_>>();
let status_creating = TimelineStatusCreating {
pg_version,
start_lsn,
};
let status = serde_json::to_string(&status_creating).unwrap();
// Add timeline to db
let timeline_persist = TimelinePersistence {
tenant_id: tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
generation: 0,
sk_set: sks_persistence.clone(),
cplane_notified_generation: 0,
status_kind: String::from(TimelineStatusKind::Creating),
status,
};
self.persistence
.insert_timeline(timeline_persist.clone())
.await?;
let sk_persistences = self
.persistence
.list_safekeepers()
.await?
.into_iter()
.map(|p| (p.id, p))
.collect::<HashMap<_, _>>();
self.tenant_timeline_create_safekeepers_reconcile(
tenant_id,
timeline_id,
&timeline_persist,
&status_creating,
&sk_persistences,
)
.await?;
Ok((SafekeeperGeneration::new(0), sks))
}
pub(crate) async fn tenant_timeline_create(
&self,
tenant_id: TenantId,
create_req: TimelineCreateRequest,
) -> Result<TimelineCreateResponseStorcon, ApiError> {
let safekeepers = create_req.safekeepers.unwrap_or_default();
tracing::info!(
%safekeepers,
"Creating timeline {}/{}",
tenant_id,
create_req.new_timeline_id,
);
let _tenant_lock = trace_shared_lock(
&self.tenant_op_locks,
tenant_id,
TenantOperations::TimelineCreate,
)
.await;
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
let create_mode = create_req.mode.clone();
let timeline_info = self
.tenant_timeline_create_pageservers(tenant_id, create_req)
.await?;
let (safekeepers_generation, safekeepers) = if safekeepers {
let res = self
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, create_mode)
.await?;
(Some(res.0.into_inner()), Some(res.1))
} else {
(None, None)
};
Ok(TimelineCreateResponseStorcon {
timeline_info,
safekeepers_generation,
safekeepers,
})
.await?
}
pub(crate) async fn tenant_timeline_archival_config(
@@ -3870,6 +4120,187 @@ impl Service {
Ok(result)
}
async fn tenant_timeline_delete_safekeepers_reconcile(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
tl_p: &TimelinePersistence,
sk_persistences: &HashMap<i64, SafekeeperPersistence>,
) -> Result<(), ApiError> {
// If at least one deletion succeeded, return if we are outside of a specified timeout
let jwt = self.config.jwt_token.clone().map(SecretString::from);
let mut joinset = JoinSet::new();
let mut members = Vec::new();
for sk in tl_p.sk_set.iter() {
let Some(sk_p) = sk_persistences.get(sk) else {
return Err(ApiError::InternalServerError(anyhow!(
"couldn't find persisted entry for safekeeper with id {sk}"
)))?;
};
members.push(SafekeeperId {
id: NodeId(sk_p.id as u64),
host: sk_p.host.clone(),
pg_port: sk_p.port as u16,
});
}
let sks_to_reconcile = &tl_p.sk_set;
for sk in sks_to_reconcile.iter() {
// Unwrap is fine as we already would have returned error above
let sk_p = sk_persistences.get(sk).unwrap();
let sk_clone = NodeId(*sk as u64);
let base_url = sk_p.base_url();
let jwt = jwt.clone();
let cancel = self.cancel.clone();
joinset.spawn(async move {
let client = SafekeeperClient::new(sk_clone, base_url, jwt);
let retry_result = backoff::retry(
|| client.delete_timeline(tenant_id, timeline_id),
|_e| {
// TODO find right criteria here for deciding on retries
false
},
3,
5,
"delete timeline on safekeeper",
&cancel,
)
.await;
if let Some(res) = retry_result {
res.map_err(|e| {
ApiError::InternalServerError(
anyhow::Error::new(e).context("error deleting timeline on safekeeper"),
)
})
} else {
Err(ApiError::Cancelled)
}
});
}
// After we have built the joinset, we now wait for the tasks to complete,
// but with a specified timeout to make sure we return swiftly, either with
// a failure or success.
const SK_DELETE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
let reconcile_deadline = tokio::time::Instant::now() + SK_DELETE_TIMELINE_RECONCILE_TIMEOUT;
// Treat the first task to finish differently, mostly when it times out,
// because then we won't have any successful deletion.
// For the second and third task, we don't rely on them succeeding, and we need this to support
// continuing operations even if a safekeeper is down.
let timeout_or_first = tokio::time::timeout_at(reconcile_deadline, async {
joinset.join_next().await.unwrap()
})
.await;
let mut reconcile_results = Vec::new();
match timeout_or_first {
Ok(Ok(res_1)) => {
reconcile_results.push(res_1);
}
Ok(Err(_)) => {
return Err(ApiError::InternalServerError(anyhow!(
"task was cancelled while reconciling timeline deletion"
)));
}
Err(_) => {
return Err(ApiError::InternalServerError(anyhow!(
"couldn't reconcile timeline deletion on safekeepers within timeout"
)));
}
}
let timeout_or_last = tokio::time::timeout_at(reconcile_deadline, async {
while let Some(next_res) = joinset.join_next().await {
match next_res {
Ok(res) => {
reconcile_results.push(res);
}
Err(e) => {
tracing::info!("aborting reconciliation due to join error: {e:?}");
break;
}
}
}
});
if let Err(e) = timeout_or_last.await {
// No error if cancelled or timed out: we already have feedback from a quorum of safekeepers
tracing::info!(
"timeout for last {} reconciliations: {e}",
sks_to_reconcile.len() - 1
);
}
// check now if quorum was reached in reconcile_results
let successful = reconcile_results
.into_iter()
.filter_map(|res| res.ok())
.collect::<Vec<_>>();
tracing::info!(
"Got {} successful results from reconciliation",
successful.len()
);
let new_status_kind = if successful.is_empty() {
// Failure
return Err(ApiError::InternalServerError(anyhow!(
"not enough successful reconciliations to reach quorum, please retry: {}",
successful.len()
)));
} else if successful.len() == sks_to_reconcile.len() {
// Success, state of timeline is Deleted
TimelineStatusKind::Deleted
} else if successful.len() == 2 {
// Success, state of timeline remains Creating
TimelineStatusKind::Deleting
} else {
unreachable!(
"unexpected number of successful reconciliations {}",
successful.len()
);
};
if new_status_kind == TimelineStatusKind::Deleted {
self.persistence
.update_timeline_status_deleted(tenant_id, timeline_id)
.await?;
}
Ok(())
}
async fn tenant_timeline_delete_safekeepers(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<(), ApiError> {
let tl = self
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let Some(tl) = tl else {
tracing::info!("timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed");
return Ok(());
};
let status_kind =
TimelineStatusKind::from_str(&tl.status_kind).map_err(ApiError::InternalServerError)?;
if status_kind != TimelineStatusKind::Deleting {
// Set status to deleting
let new_status_kind = TimelineStatusKind::Deleting;
self.persistence
.update_timeline_status(tenant_id, timeline_id, new_status_kind, "{}".to_owned())
.await?;
}
let sk_persistences = self
.persistence
.list_safekeepers()
.await?
.into_iter()
.map(|p| (p.id, p))
.collect::<HashMap<_, _>>();
self.tenant_timeline_delete_safekeepers_reconcile(
tenant_id,
timeline_id,
&tl,
&sk_persistences,
)
.await?;
Ok(())
}
pub(crate) async fn tenant_timeline_delete(
&self,
tenant_id: TenantId,
@@ -3883,7 +4314,7 @@ impl Service {
)
.await;
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
let ps_fut = self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
if targets.0.is_empty() {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant not found").into(),
@@ -3955,7 +4386,13 @@ impl Service {
)
.await?;
Ok(shard_zero_status)
}).await?
});
let sk_fut = self.tenant_timeline_delete_safekeepers(tenant_id, timeline_id);
let (ps_res, sk_res) = tokio::join!(ps_fut, sk_fut);
sk_res?;
ps_res?
}
/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
@@ -7653,6 +8090,32 @@ impl Service {
global_observed
}
pub(crate) async fn safekeepers_for_new_timeline(&self) -> Result<Vec<NodeId>, ApiError> {
let mut all_safekeepers = self
.persistence
.list_safekeepers_with_timeline_count()
.await?;
all_safekeepers.sort_by_key(|sk| sk.2);
let mut sks = Vec::new();
let mut azs = HashSet::new();
for (sk_id, az_id, _timeline_count) in all_safekeepers.iter() {
if !azs.insert(az_id) {
continue;
}
sks.push(*sk_id);
if sks.len() == 3 {
break;
}
}
if sks.len() == 3 {
Ok(sks)
} else {
Err(ApiError::InternalServerError(anyhow!(
"couldn't find three safekeepers in different AZs for new timeline"
)))
}
}
pub(crate) async fn safekeepers_list(
&self,
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {

View File

@@ -0,0 +1,130 @@
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{
failpoint_support,
id::{TenantId, TimelineId},
};
use crate::{
id_lock_map::trace_shared_lock,
persistence::SafekeeperPersistence,
service::{TenantOperations, TimelineStatusCreating, TimelineStatusKind},
};
use super::{Service, TimelinePersistence};
pub struct SafekeeperReconciler {
service: Arc<Service>,
duration: Duration,
}
impl SafekeeperReconciler {
pub fn new(service: Arc<Service>, duration: Duration) -> Self {
SafekeeperReconciler { service, duration }
}
pub async fn run(&self, cancel: CancellationToken) {
while !cancel.is_cancelled() {
tokio::select! {
_ = tokio::time::sleep(self.duration) => (),
_ = cancel.cancelled() => break,
}
match self.reconcile_iteration(&cancel).await {
Ok(()) => (),
Err(e) => {
tracing::warn!("Error during safekeeper reconciliation: {e:?}");
}
}
}
}
async fn reconcile_iteration(&self, cancel: &CancellationToken) -> Result<(), anyhow::Error> {
let work_list = self
.service
.persistence
.timelines_to_be_reconciled()
.await?;
if work_list.is_empty() {
return Ok(());
}
let sk_persistences = self
.service
.persistence
.list_safekeepers()
.await?
.into_iter()
.map(|p| (p.id, p))
.collect::<HashMap<_, _>>();
for tl in work_list {
let reconcile_fut =
self.reconcile_timeline(&tl, &sk_persistences)
.instrument(tracing::info_span!(
"safekeeper_reconcile_timeline",
timeline_id = tl.timeline_id,
tenant_id = tl.tenant_id
));
tokio::select! {
r = reconcile_fut => r?,
_ = cancel.cancelled() => break,
}
}
Ok(())
}
async fn reconcile_timeline(
&self,
tl: &TimelinePersistence,
sk_persistences: &HashMap<i64, SafekeeperPersistence>,
) -> Result<(), anyhow::Error> {
tracing::info!("Reconciling timeline on safekeepers");
let tenant_id = TenantId::from_slice(tl.tenant_id.as_bytes())?;
let timeline_id = TimelineId::from_slice(tl.timeline_id.as_bytes())?;
let _tenant_lock = trace_shared_lock(
&self.service.tenant_op_locks,
tenant_id,
TenantOperations::TimelineReconcile,
)
.await;
failpoint_support::sleep_millis_async!("safekeeper-reconcile-timeline-shared-lock");
// Load the timeline again from the db: unless we hold the tenant lock, the timeline can change under our noses.
let tl = self
.service
.persistence
.get_timeline(tenant_id, timeline_id)
.await?;
let Some(tl) = tl else {
// This can happen but is a bit unlikely, so print it on the warn level instead of info
tracing::warn!("timeline row in database disappeared");
return Ok(());
};
let status = TimelineStatusKind::from_str(&tl.status)?;
match status {
TimelineStatusKind::Created | TimelineStatusKind::Deleted => (),
TimelineStatusKind::Creating => {
let status_creating: TimelineStatusCreating = serde_json::from_str(&tl.status)?;
self.service
.tenant_timeline_create_safekeepers_reconcile(
tenant_id,
timeline_id,
&tl,
&status_creating,
sk_persistences,
)
.await?;
}
TimelineStatusKind::Deleting => {
self.service
.tenant_timeline_delete_safekeepers_reconcile(
tenant_id,
timeline_id,
&tl,
sk_persistences,
)
.await?;
}
}
Ok(())
}
}