mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-12 07:00:36 +00:00
Compare commits
19 Commits
cloneable/
...
arpad/sk_t
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c660d787e4 | ||
|
|
ad56b9f76b | ||
|
|
1745fe5c65 | ||
|
|
986db002cd | ||
|
|
7ec08ee805 | ||
|
|
e56aa822e1 | ||
|
|
f0777cf7ac | ||
|
|
a63153f4bc | ||
|
|
00380cedd7 | ||
|
|
bf0d53aa2d | ||
|
|
ebe9ba0cdf | ||
|
|
1ffe95c837 | ||
|
|
b5c29806f0 | ||
|
|
f0fe5fae6b | ||
|
|
e805058364 | ||
|
|
78c4a82331 | ||
|
|
e35a726b32 | ||
|
|
3d81af8975 | ||
|
|
7d296b3cea |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -6347,6 +6347,8 @@ dependencies = [
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
"routerify",
|
||||
"safekeeper_api",
|
||||
"safekeeper_client",
|
||||
"scoped-futures",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
|
||||
@@ -1104,6 +1104,7 @@ async fn handle_tenant(subcmd: &TenantCmd, env: &mut local_env::LocalEnv) -> any
|
||||
existing_initdb_timeline_id: None,
|
||||
pg_version: Some(args.pg_version),
|
||||
},
|
||||
safekeepers: None,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
@@ -1164,6 +1165,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
|
||||
existing_initdb_timeline_id: None,
|
||||
pg_version: Some(args.pg_version),
|
||||
},
|
||||
safekeepers: None,
|
||||
};
|
||||
let timeline_info = storage_controller
|
||||
.tenant_timeline_create(tenant_id, create_req)
|
||||
@@ -1222,6 +1224,7 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
|
||||
ancestor_start_lsn: start_lsn,
|
||||
pg_version: None,
|
||||
},
|
||||
safekeepers: None,
|
||||
};
|
||||
let timeline_info = storage_controller
|
||||
.tenant_timeline_create(tenant_id, create_req)
|
||||
|
||||
@@ -280,6 +280,18 @@ pub struct TimelineCreateRequest {
|
||||
pub new_timeline_id: TimelineId,
|
||||
#[serde(flatten)]
|
||||
pub mode: TimelineCreateRequestMode,
|
||||
/// Whether to also create timeline on the safekeepers (specific to storcon API)
|
||||
pub safekeepers: Option<bool>,
|
||||
}
|
||||
|
||||
/// Storage controller specific extensions to [`TimelineInfo`].
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct TimelineCreateResponseStorcon {
|
||||
#[serde(flatten)]
|
||||
pub timeline_info: TimelineInfo,
|
||||
|
||||
pub safekeepers: Option<Vec<NodeId>>,
|
||||
pub safekeepers_generation: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
|
||||
@@ -19,7 +19,7 @@ pub struct SafekeeperStatus {
|
||||
pub id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
pub struct TimelineCreateRequest {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
|
||||
@@ -144,6 +144,30 @@ impl Debug for Generation {
|
||||
}
|
||||
}
|
||||
|
||||
/// Like tenant generations, but for safekeepers
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
pub struct SafekeeperGeneration(u32);
|
||||
|
||||
impl SafekeeperGeneration {
|
||||
pub const fn new(v: u32) -> Self {
|
||||
Self(v)
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub fn previous(&self) -> Option<Self> {
|
||||
Some(Self(self.0.checked_sub(1)?))
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub fn next(&self) -> Self {
|
||||
Self(self.0 + 1)
|
||||
}
|
||||
|
||||
pub fn into_inner(self) -> u32 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
@@ -32,6 +32,8 @@ postgres_connection.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest = { workspace = true, features = ["stream"] }
|
||||
routerify.workspace = true
|
||||
safekeeper_api.workspace = true
|
||||
safekeeper_client.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
thiserror.workspace = true
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
DROP TABLE timelines;
|
||||
@@ -0,0 +1,12 @@
|
||||
CREATE TABLE timelines (
|
||||
tenant_id VARCHAR NOT NULL,
|
||||
timeline_id VARCHAR NOT NULL,
|
||||
PRIMARY KEY(tenant_id, timeline_id),
|
||||
generation INTEGER NOT NULL,
|
||||
sk_set BIGINT[] NOT NULL,
|
||||
cplane_notified_generation INTEGER NOT NULL,
|
||||
status_kind VARCHAR NOT NULL,
|
||||
status VARCHAR NOT NULL,
|
||||
deleted_at timestamptz
|
||||
);
|
||||
CREATE INDEX timelines_idx ON timelines(status_kind, deleted_at, tenant_id, timeline_id);
|
||||
@@ -17,6 +17,7 @@ mod pageserver_client;
|
||||
mod peer_client;
|
||||
pub mod persistence;
|
||||
mod reconciler;
|
||||
mod safekeeper_client;
|
||||
mod scheduler;
|
||||
mod schema;
|
||||
pub mod service;
|
||||
|
||||
@@ -10,6 +10,7 @@ use storage_controller::http::make_router;
|
||||
use storage_controller::metrics::preinitialize_metrics;
|
||||
use storage_controller::persistence::Persistence;
|
||||
use storage_controller::service::chaos_injector::ChaosInjector;
|
||||
use storage_controller::service::safekeeper_reconciler::SafekeeperReconciler;
|
||||
use storage_controller::service::{
|
||||
Config, Service, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT,
|
||||
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
|
||||
@@ -351,6 +352,24 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
)
|
||||
});
|
||||
|
||||
const SAFEKEEPER_RECONCILER_INTERVAL: Duration = Duration::from_secs(120);
|
||||
let safekeeper_reconciler_task = {
|
||||
let service = service.clone();
|
||||
let cancel = CancellationToken::new();
|
||||
let cancel_bg = cancel.clone();
|
||||
(
|
||||
tokio::task::spawn(
|
||||
async move {
|
||||
let reconciler =
|
||||
SafekeeperReconciler::new(service, SAFEKEEPER_RECONCILER_INTERVAL);
|
||||
reconciler.run(cancel_bg).await
|
||||
}
|
||||
.instrument(tracing::info_span!("safekeeper_reconciler")),
|
||||
),
|
||||
cancel,
|
||||
)
|
||||
};
|
||||
|
||||
// Wait until we receive a signal
|
||||
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
||||
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?;
|
||||
@@ -384,6 +403,11 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
chaos_cancel.cancel();
|
||||
chaos_jh.await.ok();
|
||||
}
|
||||
// Do the same for the safekeeper reconciler
|
||||
{
|
||||
safekeeper_reconciler_task.1.cancel();
|
||||
_ = safekeeper_reconciler_task.0.await;
|
||||
}
|
||||
|
||||
service.shutdown().await;
|
||||
tracing::info!("Service shutdown complete");
|
||||
|
||||
@@ -80,6 +80,11 @@ pub(crate) struct StorageControllerMetricGroup {
|
||||
pub(crate) storage_controller_pageserver_request_error:
|
||||
measured::CounterVec<PageserverRequestLabelGroupSet>,
|
||||
|
||||
/// Count of HTTP requests to the safekeeper that resulted in an error,
|
||||
/// broken down by the safekeeper node id, request name and method
|
||||
pub(crate) storage_controller_safekeeper_request_error:
|
||||
measured::CounterVec<PageserverRequestLabelGroupSet>,
|
||||
|
||||
/// Latency of HTTP requests to the pageserver, broken down by pageserver
|
||||
/// node id, request name and method. This include both successful and unsuccessful
|
||||
/// requests.
|
||||
@@ -87,6 +92,13 @@ pub(crate) struct StorageControllerMetricGroup {
|
||||
pub(crate) storage_controller_pageserver_request_latency:
|
||||
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
|
||||
|
||||
/// Latency of HTTP requests to the safekeeper, broken down by safekeeper
|
||||
/// node id, request name and method. This include both successful and unsuccessful
|
||||
/// requests.
|
||||
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
|
||||
pub(crate) storage_controller_safekeeper_request_latency:
|
||||
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
|
||||
|
||||
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
|
||||
/// broken down by the pageserver node id, request name and method
|
||||
pub(crate) storage_controller_passthrough_request_error:
|
||||
|
||||
@@ -26,7 +26,9 @@ use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
|
||||
use scoped_futures::ScopedBoxFuture;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::TimelineId;
|
||||
use utils::id::{NodeId, TenantId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::metrics::{
|
||||
DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
|
||||
@@ -112,6 +114,9 @@ pub(crate) enum DatabaseOperation {
|
||||
ListMetadataHealthUnhealthy,
|
||||
ListMetadataHealthOutdated,
|
||||
ListSafekeepers,
|
||||
ListTimelines,
|
||||
LoadTimeline,
|
||||
InsertTimeline,
|
||||
GetLeader,
|
||||
UpdateLeader,
|
||||
SetPreferredAzs,
|
||||
@@ -1156,7 +1161,7 @@ impl Persistence {
|
||||
/// At startup, populate the list of nodes which our shards may be placed on
|
||||
pub(crate) async fn list_safekeepers(&self) -> DatabaseResult<Vec<SafekeeperPersistence>> {
|
||||
let safekeepers: Vec<SafekeeperPersistence> = self
|
||||
.with_measured_conn(DatabaseOperation::ListNodes, move |conn| {
|
||||
.with_measured_conn(DatabaseOperation::ListSafekeepers, move |conn| {
|
||||
Box::pin(async move {
|
||||
Ok(crate::schema::safekeepers::table
|
||||
.load::<SafekeeperPersistence>(conn)
|
||||
@@ -1170,6 +1175,66 @@ impl Persistence {
|
||||
Ok(safekeepers)
|
||||
}
|
||||
|
||||
pub(crate) async fn list_safekeepers_with_timeline_count(
|
||||
&self,
|
||||
) -> DatabaseResult<Vec<(NodeId, String, u64)>> {
|
||||
#[derive(QueryableByName, PartialEq, Debug)]
|
||||
struct SafekeeperTimelineCountResponse {
|
||||
#[diesel(sql_type = diesel::sql_types::Int8)]
|
||||
sk_id: i64,
|
||||
#[diesel(sql_type = diesel::sql_types::Varchar)]
|
||||
az_id: String,
|
||||
#[diesel(sql_type = diesel::sql_types::Int8)]
|
||||
timeline_count: i64,
|
||||
#[diesel(sql_type = diesel::sql_types::Varchar)]
|
||||
host: String,
|
||||
#[diesel(sql_type = diesel::sql_types::Int8)]
|
||||
port: i64,
|
||||
}
|
||||
let safekeepers: Vec<SafekeeperTimelineCountResponse> = self
|
||||
.with_measured_conn(
|
||||
DatabaseOperation::ListSafekeepers,
|
||||
move |conn| {
|
||||
Box::pin(async move {
|
||||
let query = diesel::sql_query("\
|
||||
SELECT safekeepers.id as sk_id, safekeepers.availability_zone_id as az_id, COUNT(*) as timeline_count, safekeepers.host as host, safekeepers.port as port \
|
||||
FROM (select tenant_id, timeline_id, unnest(sk_set) as sk_id from timelines) as timelines_unnested \
|
||||
JOIN safekeepers ON (safekeepers.id = timelines_unnested.id)\
|
||||
");
|
||||
let results: Vec<_> = query.load(conn).await?;
|
||||
Ok(results)
|
||||
})
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let safekeepers = safekeepers
|
||||
.into_iter()
|
||||
.map(|sk| {
|
||||
if sk.sk_id < 0 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"invalid safekeeper id: {}",
|
||||
sk.sk_id
|
||||
)));
|
||||
}
|
||||
if sk.timeline_count < 0 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"invalid timeline count {} for sk: {}",
|
||||
sk.timeline_count, sk.sk_id
|
||||
)));
|
||||
}
|
||||
Ok((NodeId(sk.sk_id as u64), sk.az_id, sk.timeline_count as u64))
|
||||
})
|
||||
.collect::<Result<Vec<_>, DatabaseError>>()?;
|
||||
|
||||
tracing::info!(
|
||||
"list_safekeepers_with_timeline_count: loaded {} safekeepers",
|
||||
safekeepers.len()
|
||||
);
|
||||
|
||||
Ok(safekeepers)
|
||||
}
|
||||
|
||||
pub(crate) async fn safekeeper_get(
|
||||
&self,
|
||||
id: i64,
|
||||
@@ -1254,6 +1319,191 @@ impl Persistence {
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Timelines must be persisted before we schedule them for the first time.
|
||||
pub(crate) async fn insert_timeline(&self, entry: TimelinePersistence) -> DatabaseResult<()> {
|
||||
use crate::schema::timelines;
|
||||
|
||||
let entry = &entry;
|
||||
self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| {
|
||||
Box::pin(async move {
|
||||
let inserted_updated = diesel::insert_into(timelines::table)
|
||||
.values(entry)
|
||||
.on_conflict((timelines::tenant_id, timelines::timeline_id))
|
||||
.do_nothing()
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
if inserted_updated != 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({})",
|
||||
inserted_updated
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
pub(crate) async fn update_timeline_status(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
status_kind: TimelineStatusKind,
|
||||
status: String,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::timelines;
|
||||
|
||||
let status = &status;
|
||||
self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| {
|
||||
Box::pin(async move {
|
||||
let inserted_updated = diesel::update(timelines::table)
|
||||
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
|
||||
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
|
||||
.set((
|
||||
timelines::status_kind.eq(String::from(status_kind)),
|
||||
timelines::status.eq(status),
|
||||
))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
if inserted_updated != 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({})",
|
||||
inserted_updated
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
pub(crate) async fn update_timeline_status_deleted(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::timelines;
|
||||
|
||||
let now = chrono::offset::Utc::now();
|
||||
self.with_measured_conn(DatabaseOperation::InsertTimeline, move |conn| {
|
||||
Box::pin(async move {
|
||||
let inserted_updated = diesel::update(timelines::table)
|
||||
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
|
||||
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
|
||||
.filter(timelines::status_kind.eq(String::from(TimelineStatusKind::Deleting)))
|
||||
.set((
|
||||
timelines::status_kind.eq(String::from(TimelineStatusKind::Deleted)),
|
||||
timelines::status.eq("{}"),
|
||||
timelines::deleted_at.eq(now),
|
||||
))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
if inserted_updated != 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"unexpected number of rows ({})",
|
||||
inserted_updated
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Obtains the timeline, returns None if not present
|
||||
pub(crate) async fn get_timeline(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> DatabaseResult<Option<TimelinePersistence>> {
|
||||
use crate::schema::timelines;
|
||||
let mut timelines: Vec<TimelineFromDb> = self
|
||||
.with_measured_conn(DatabaseOperation::LoadTimeline, move |conn| {
|
||||
Box::pin(async move {
|
||||
Ok(timelines::table
|
||||
.filter(timelines::tenant_id.eq(tenant_id.to_string()))
|
||||
.filter(timelines::timeline_id.eq(timeline_id.to_string()))
|
||||
.load::<TimelineFromDb>(conn)
|
||||
.await?)
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
if timelines.is_empty() {
|
||||
return Ok(None);
|
||||
} else if timelines.len() > 1 {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"incorrect number of returned timelines: ({})",
|
||||
timelines.len()
|
||||
)));
|
||||
}
|
||||
|
||||
let tl = timelines.pop().unwrap().into_persistence();
|
||||
|
||||
tracing::info!("get_timeline: loaded timeline");
|
||||
|
||||
Ok(Some(tl))
|
||||
}
|
||||
|
||||
/// Marks all timelines referencing a tenant for deletion
|
||||
pub(crate) async fn mark_timelines_for_deletion(
|
||||
&self,
|
||||
del_tenant_id: TenantId,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::timelines::dsl::*;
|
||||
let count = self
|
||||
.with_measured_conn(DatabaseOperation::DeleteTenant, move |conn| {
|
||||
Box::pin(async move {
|
||||
Ok(diesel::update(timelines)
|
||||
.filter(tenant_id.eq(del_tenant_id.to_string()))
|
||||
.filter(status_kind.ne(String::from(TimelineStatusKind::Deleted)))
|
||||
.set((
|
||||
status.eq(String::from("")),
|
||||
status_kind.eq(String::from(TimelineStatusKind::Deleted)),
|
||||
))
|
||||
.execute(conn)
|
||||
.await?)
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
tracing::info!("marked {count} timelines for deletion in timelines table");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn timelines_to_be_reconciled(
|
||||
&self,
|
||||
) -> DatabaseResult<Vec<TimelinePersistence>> {
|
||||
use crate::schema::timelines;
|
||||
let timelines: Vec<TimelineFromDb> = self
|
||||
.with_measured_conn(DatabaseOperation::ListTimelines, move |conn| {
|
||||
Box::pin(async move {
|
||||
Ok(timelines::table
|
||||
.filter(
|
||||
timelines::status
|
||||
.eq(String::from(TimelineStatusKind::Creating))
|
||||
.or(timelines::status
|
||||
.eq(String::from(TimelineStatusKind::Deleting))),
|
||||
)
|
||||
.load::<TimelineFromDb>(conn)
|
||||
.await?)
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
let timelines = timelines
|
||||
.into_iter()
|
||||
.map(|tl| tl.into_persistence())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
tracing::info!("list_timelines: loaded {} timelines", timelines.len());
|
||||
|
||||
Ok(timelines)
|
||||
}
|
||||
}
|
||||
|
||||
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
|
||||
@@ -1431,6 +1681,9 @@ impl SafekeeperPersistence {
|
||||
scheduling_policy,
|
||||
})
|
||||
}
|
||||
pub(crate) fn base_url(&self) -> String {
|
||||
format!("http://{}:{}", self.host, self.http_port)
|
||||
}
|
||||
}
|
||||
|
||||
/// What we expect from the upsert http api
|
||||
@@ -1481,3 +1734,82 @@ struct InsertUpdateSafekeeper<'a> {
|
||||
availability_zone_id: &'a str,
|
||||
scheduling_policy: Option<&'a str>,
|
||||
}
|
||||
|
||||
#[derive(Insertable, AsChangeset, Queryable, Selectable, Clone)]
|
||||
#[diesel(table_name = crate::schema::timelines)]
|
||||
pub(crate) struct TimelinePersistence {
|
||||
pub(crate) tenant_id: String,
|
||||
pub(crate) timeline_id: String,
|
||||
pub(crate) generation: i32,
|
||||
pub(crate) sk_set: Vec<i64>,
|
||||
pub(crate) cplane_notified_generation: i32,
|
||||
pub(crate) status_kind: String,
|
||||
pub(crate) status: String,
|
||||
}
|
||||
|
||||
#[derive(Queryable, Selectable)]
|
||||
#[diesel(table_name = crate::schema::timelines)]
|
||||
pub(crate) struct TimelineFromDb {
|
||||
pub(crate) tenant_id: String,
|
||||
pub(crate) timeline_id: String,
|
||||
pub(crate) generation: i32,
|
||||
pub(crate) sk_set: Vec<Option<i64>>,
|
||||
pub(crate) cplane_notified_generation: i32,
|
||||
pub(crate) status_kind: String,
|
||||
pub(crate) status: String,
|
||||
#[allow(unused)]
|
||||
pub(crate) deleted_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
}
|
||||
|
||||
impl TimelineFromDb {
|
||||
fn into_persistence(self) -> TimelinePersistence {
|
||||
TimelinePersistence {
|
||||
tenant_id: self.tenant_id,
|
||||
timeline_id: self.timeline_id,
|
||||
generation: self.generation,
|
||||
sk_set: self.sk_set.into_iter().flatten().collect(),
|
||||
cplane_notified_generation: self.cplane_notified_generation,
|
||||
status_kind: self.status_kind,
|
||||
status: self.status,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Copy, Clone, Debug)]
|
||||
pub(crate) enum TimelineStatusKind {
|
||||
Creating,
|
||||
Created,
|
||||
Deleting,
|
||||
Deleted,
|
||||
}
|
||||
|
||||
impl FromStr for TimelineStatusKind {
|
||||
type Err = anyhow::Error;
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
Ok(match s {
|
||||
"creating" => TimelineStatusKind::Creating,
|
||||
"created" => TimelineStatusKind::Created,
|
||||
"deleting" => TimelineStatusKind::Deleting,
|
||||
"deleted" => TimelineStatusKind::Deleted,
|
||||
_ => return Err(anyhow::anyhow!("unexpected timeline status: {s}")),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TimelineStatusKind> for String {
|
||||
fn from(value: TimelineStatusKind) -> Self {
|
||||
match value {
|
||||
TimelineStatusKind::Creating => "creating",
|
||||
TimelineStatusKind::Created => "created",
|
||||
TimelineStatusKind::Deleting => "deleting",
|
||||
TimelineStatusKind::Deleted => "deleted",
|
||||
}
|
||||
.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub(crate) struct TimelineStatusCreating {
|
||||
pub(crate) pg_version: u32,
|
||||
pub(crate) start_lsn: Lsn,
|
||||
}
|
||||
|
||||
94
storage_controller/src/safekeeper_client.rs
Normal file
94
storage_controller/src/safekeeper_client.rs
Normal file
@@ -0,0 +1,94 @@
|
||||
use crate::metrics::PageserverRequestLabelGroup;
|
||||
use safekeeper_api::models::{TimelineCreateRequest, TimelineStatus};
|
||||
use safekeeper_client::mgmt_api::{Client, Result};
|
||||
use utils::{
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
logging::SecretString,
|
||||
};
|
||||
|
||||
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
|
||||
/// controller to collect metrics in a non-intrusive manner.
|
||||
///
|
||||
/// Analogous to [`crate::pageserver_client::PageserverClient`].
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct SafekeeperClient {
|
||||
inner: Client,
|
||||
node_id_label: String,
|
||||
}
|
||||
|
||||
macro_rules! measured_request {
|
||||
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
|
||||
let labels = PageserverRequestLabelGroup {
|
||||
pageserver_id: $node_id,
|
||||
path: $name,
|
||||
method: $method,
|
||||
};
|
||||
|
||||
let latency = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_safekeeper_request_latency;
|
||||
let _timer_guard = latency.start_timer(labels.clone());
|
||||
|
||||
let res = $invoke;
|
||||
|
||||
if res.is_err() {
|
||||
let error_counters = &crate::metrics::METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_pageserver_request_error;
|
||||
error_counters.inc(labels)
|
||||
}
|
||||
|
||||
res
|
||||
}};
|
||||
}
|
||||
|
||||
impl SafekeeperClient {
|
||||
pub(crate) fn new(
|
||||
node_id: NodeId,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<SecretString>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Client::from_client(reqwest::Client::new(), mgmt_api_endpoint, jwt),
|
||||
node_id_label: node_id.0.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn from_client(
|
||||
node_id: NodeId,
|
||||
raw_client: reqwest::Client,
|
||||
mgmt_api_endpoint: String,
|
||||
jwt: Option<SecretString>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Client::from_client(raw_client, mgmt_api_endpoint, jwt),
|
||||
node_id_label: node_id.0.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn create_timeline(
|
||||
&self,
|
||||
req: &TimelineCreateRequest,
|
||||
) -> Result<TimelineStatus> {
|
||||
measured_request!(
|
||||
"create_timeline",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner.create_timeline(req).await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_timeline(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<TimelineStatus> {
|
||||
measured_request!(
|
||||
"delete_timeline",
|
||||
crate::metrics::Method::Delete,
|
||||
&self.node_id_label,
|
||||
self.inner.delete_timeline(tenant_id, timeline_id).await
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -58,10 +58,24 @@ diesel::table! {
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
timelines (tenant_id, timeline_id) {
|
||||
tenant_id -> Varchar,
|
||||
timeline_id -> Varchar,
|
||||
generation -> Int4,
|
||||
sk_set -> Array<Nullable<Int8>>,
|
||||
cplane_notified_generation -> Int4,
|
||||
status_kind -> Varchar,
|
||||
status -> Varchar,
|
||||
deleted_at -> Nullable<Timestamptz>,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::allow_tables_to_appear_in_same_query!(
|
||||
controllers,
|
||||
metadata_health,
|
||||
nodes,
|
||||
safekeepers,
|
||||
tenant_shards,
|
||||
timelines,
|
||||
);
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
pub mod chaos_injector;
|
||||
mod context_iterator;
|
||||
pub mod safekeeper_reconciler;
|
||||
|
||||
use hyper::Uri;
|
||||
use safekeeper_api::membership::{MemberSet, SafekeeperId};
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
cmp::Ordering,
|
||||
@@ -26,21 +28,23 @@ use crate::{
|
||||
peer_client::GlobalObservedState,
|
||||
persistence::{
|
||||
AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence,
|
||||
ShardGenerationState, TenantFilter,
|
||||
SafekeeperPersistence, ShardGenerationState, TenantFilter, TimelinePersistence,
|
||||
TimelineStatusCreating, TimelineStatusKind,
|
||||
},
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
safekeeper_client::SafekeeperClient,
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ObservedStateDelta, ReconcileNeeded, ReconcilerStatus,
|
||||
ScheduleOptimization, ScheduleOptimizationAction,
|
||||
},
|
||||
};
|
||||
use anyhow::Context;
|
||||
use anyhow::{anyhow, Context};
|
||||
use control_plane::storage_controller::{
|
||||
AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
|
||||
};
|
||||
use diesel::result::DatabaseErrorKind;
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
@@ -54,7 +58,7 @@ use pageserver_api::{
|
||||
},
|
||||
models::{
|
||||
SecondaryProgress, TenantConfigPatchRequest, TenantConfigRequest,
|
||||
TimelineArchivalConfigRequest, TopTenantShardsRequest,
|
||||
TimelineArchivalConfigRequest, TimelineCreateResponseStorcon, TopTenantShardsRequest,
|
||||
},
|
||||
};
|
||||
use reqwest::StatusCode;
|
||||
@@ -75,14 +79,16 @@ use pageserver_api::{
|
||||
},
|
||||
};
|
||||
use pageserver_client::{mgmt_api, BlockUnblock};
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio::{sync::mpsc::error::TrySendError, task::JoinSet};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{
|
||||
backoff,
|
||||
completion::Barrier,
|
||||
failpoint_support,
|
||||
generation::Generation,
|
||||
generation::{Generation, SafekeeperGeneration},
|
||||
http::error::ApiError,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
logging::SecretString,
|
||||
pausable_failpoint,
|
||||
sync::gate::Gate,
|
||||
};
|
||||
@@ -151,6 +157,7 @@ enum TenantOperations {
|
||||
SecondaryDownload,
|
||||
TimelineCreate,
|
||||
TimelineDelete,
|
||||
TimelineReconcile,
|
||||
AttachHook,
|
||||
TimelineArchivalConfig,
|
||||
TimelineDetachAncestor,
|
||||
@@ -3097,6 +3104,10 @@ impl Service {
|
||||
|
||||
self.maybe_load_tenant(tenant_id, &_tenant_lock).await?;
|
||||
|
||||
self.persistence
|
||||
.mark_timelines_for_deletion(tenant_id)
|
||||
.await?;
|
||||
|
||||
// Detach all shards. This also deletes local pageserver shard data.
|
||||
let (detach_waiters, node) = {
|
||||
let mut detach_waiters = Vec::new();
|
||||
@@ -3269,25 +3280,11 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_create(
|
||||
async fn tenant_timeline_create_pageservers(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
mut create_req: TimelineCreateRequest,
|
||||
) -> Result<TimelineInfo, ApiError> {
|
||||
tracing::info!(
|
||||
"Creating timeline {}/{}",
|
||||
tenant_id,
|
||||
create_req.new_timeline_id,
|
||||
);
|
||||
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.tenant_op_locks,
|
||||
tenant_id,
|
||||
TenantOperations::TimelineCreate,
|
||||
)
|
||||
.await;
|
||||
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
|
||||
|
||||
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
|
||||
if targets.0.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
@@ -3404,8 +3401,261 @@ impl Service {
|
||||
}
|
||||
|
||||
Ok(timeline_info)
|
||||
}).await?
|
||||
}
|
||||
|
||||
/// reconcile: create timeline on safekeepers
|
||||
///
|
||||
/// Assumes tenant lock is held while calling this function
|
||||
async fn tenant_timeline_create_safekeepers_reconcile(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
timeline_persistence: &TimelinePersistence,
|
||||
status_creating: &TimelineStatusCreating,
|
||||
sk_persistences: &HashMap<i64, SafekeeperPersistence>,
|
||||
) -> Result<(), ApiError> {
|
||||
// If quorum is reached, return if we are outside of a specified timeout
|
||||
let jwt = self.config.jwt_token.clone().map(SecretString::from);
|
||||
let mut joinset = JoinSet::new();
|
||||
|
||||
let mut members = Vec::new();
|
||||
for sk in timeline_persistence.sk_set.iter() {
|
||||
let Some(sk_p) = sk_persistences.get(sk) else {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't find persisted entry for safekeeper with id {sk}"
|
||||
)))?;
|
||||
};
|
||||
members.push(SafekeeperId {
|
||||
id: NodeId(sk_p.id as u64),
|
||||
host: sk_p.host.clone(),
|
||||
pg_port: sk_p.port as u16,
|
||||
});
|
||||
}
|
||||
let mut mconf = safekeeper_api::membership::Configuration::empty();
|
||||
mconf.members = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
|
||||
|
||||
let req = safekeeper_api::models::TimelineCreateRequest {
|
||||
commit_lsn: None,
|
||||
mconf,
|
||||
pg_version: status_creating.pg_version,
|
||||
start_lsn: status_creating.start_lsn,
|
||||
system_id: None,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
wal_seg_size: None,
|
||||
};
|
||||
for sk in timeline_persistence.sk_set.iter() {
|
||||
// Unwrap is fine as we already would have returned error above
|
||||
let sk_p = sk_persistences.get(sk).unwrap();
|
||||
let sk_clone = NodeId(*sk as u64);
|
||||
let base_url = sk_p.base_url();
|
||||
let jwt = jwt.clone();
|
||||
let req = req.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
joinset.spawn(async move {
|
||||
let client = SafekeeperClient::new(sk_clone, base_url, jwt);
|
||||
let req = req;
|
||||
let retry_result = backoff::retry(
|
||||
|| client.create_timeline(&req),
|
||||
|_e| {
|
||||
// TODO find right criteria here for deciding on retries
|
||||
false
|
||||
},
|
||||
3,
|
||||
5,
|
||||
"create timeline on safekeeper",
|
||||
&cancel,
|
||||
)
|
||||
.await;
|
||||
if let Some(res) = retry_result {
|
||||
res.map_err(|e| {
|
||||
ApiError::InternalServerError(
|
||||
anyhow::Error::new(e).context("error creating timeline on safekeeper"),
|
||||
)
|
||||
})
|
||||
} else {
|
||||
Err(ApiError::Cancelled)
|
||||
}
|
||||
});
|
||||
}
|
||||
// After we have built the joinset, we now wait for the tasks to complete,
|
||||
// but with a specified timeout to make sure we return swiftly, either with
|
||||
// a failure or success.
|
||||
const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
|
||||
|
||||
// Treat the first two tasks to finish differently, mostly when they timeout,
|
||||
// because then we won't have a successful quorum.
|
||||
// For the third task, we don't rely on it succeeding, and we need this to support
|
||||
// continuing operations even if one safekeeper is down.
|
||||
let timeout_or_quorum = tokio::time::timeout_at(reconcile_deadline, async {
|
||||
(
|
||||
joinset.join_next().await.unwrap(),
|
||||
joinset.join_next().await.unwrap(),
|
||||
)
|
||||
})
|
||||
.await;
|
||||
let mut reconcile_results = Vec::new();
|
||||
match timeout_or_quorum {
|
||||
Ok((Ok(res_1), Ok(res_2))) => {
|
||||
reconcile_results.push(res_1);
|
||||
reconcile_results.push(res_2);
|
||||
}
|
||||
Ok((Err(_), Ok(_)) | (_, Err(_))) => {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"task was cancelled while reconciling timeline creation"
|
||||
)));
|
||||
}
|
||||
Err(_) => {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't reconcile timeline creation on safekeepers within timeout"
|
||||
)));
|
||||
}
|
||||
}
|
||||
let timeout_or_last =
|
||||
tokio::time::timeout_at(reconcile_deadline, joinset.join_next().map(Option::unwrap))
|
||||
.await;
|
||||
if let Ok(Ok(res)) = timeout_or_last {
|
||||
reconcile_results.push(res);
|
||||
} else {
|
||||
// No error if cancelled or timed out: we already have feedback from a quorum of safekeepers
|
||||
tracing::info!("timeout for third reconciliation");
|
||||
}
|
||||
// check now if quorum was reached in reconcile_results
|
||||
let successful = reconcile_results
|
||||
.into_iter()
|
||||
.filter_map(|res| res.ok())
|
||||
.collect::<Vec<_>>();
|
||||
tracing::info!(
|
||||
"Got {} successful results from reconciliation",
|
||||
successful.len()
|
||||
);
|
||||
let status_kind = if successful.len() < 2 {
|
||||
// Failure
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"not enough successful reconciliations to reach quorum, please retry: {}",
|
||||
successful.len()
|
||||
)));
|
||||
} else if successful.len() == 3 {
|
||||
// Success, state of timeline is Created
|
||||
TimelineStatusKind::Created
|
||||
} else if successful.len() == 2 {
|
||||
// Success, state of timeline remains Creating
|
||||
TimelineStatusKind::Creating
|
||||
} else {
|
||||
unreachable!(
|
||||
"unexpected number of successful reconciliations {}",
|
||||
successful.len()
|
||||
);
|
||||
};
|
||||
|
||||
// notify cplane about creation
|
||||
// TODO
|
||||
|
||||
self.persistence
|
||||
.update_timeline_status(tenant_id, timeline_id, status_kind, "{}".to_owned())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn tenant_timeline_create_safekeepers(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_info: &TimelineInfo,
|
||||
create_mode: models::TimelineCreateRequestMode,
|
||||
) -> Result<(SafekeeperGeneration, Vec<NodeId>), ApiError> {
|
||||
let timeline_id = timeline_info.timeline_id;
|
||||
let pg_version = timeline_info.pg_version;
|
||||
let start_lsn = match create_mode {
|
||||
models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
|
||||
models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
|
||||
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
|
||||
)))?;
|
||||
}
|
||||
};
|
||||
|
||||
// Choose initial set of safekeepers respecting affinity
|
||||
let sks = self.safekeepers_for_new_timeline().await?;
|
||||
let sks_persistence = sks.iter().map(|sk| sk.0 as i64).collect::<Vec<_>>();
|
||||
let status_creating = TimelineStatusCreating {
|
||||
pg_version,
|
||||
start_lsn,
|
||||
};
|
||||
let status = serde_json::to_string(&status_creating).unwrap();
|
||||
// Add timeline to db
|
||||
let timeline_persist = TimelinePersistence {
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
generation: 0,
|
||||
sk_set: sks_persistence.clone(),
|
||||
cplane_notified_generation: 0,
|
||||
status_kind: String::from(TimelineStatusKind::Creating),
|
||||
status,
|
||||
};
|
||||
self.persistence
|
||||
.insert_timeline(timeline_persist.clone())
|
||||
.await?;
|
||||
let sk_persistences = self
|
||||
.persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|p| (p.id, p))
|
||||
.collect::<HashMap<_, _>>();
|
||||
self.tenant_timeline_create_safekeepers_reconcile(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&timeline_persist,
|
||||
&status_creating,
|
||||
&sk_persistences,
|
||||
)
|
||||
.await?;
|
||||
Ok((SafekeeperGeneration::new(0), sks))
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_create(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
create_req: TimelineCreateRequest,
|
||||
) -> Result<TimelineCreateResponseStorcon, ApiError> {
|
||||
let safekeepers = create_req.safekeepers.unwrap_or_default();
|
||||
tracing::info!(
|
||||
%safekeepers,
|
||||
"Creating timeline {}/{}",
|
||||
tenant_id,
|
||||
create_req.new_timeline_id,
|
||||
);
|
||||
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.tenant_op_locks,
|
||||
tenant_id,
|
||||
TenantOperations::TimelineCreate,
|
||||
)
|
||||
.await;
|
||||
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
|
||||
let create_mode = create_req.mode.clone();
|
||||
|
||||
let timeline_info = self
|
||||
.tenant_timeline_create_pageservers(tenant_id, create_req)
|
||||
.await?;
|
||||
|
||||
let (safekeepers_generation, safekeepers) = if safekeepers {
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, create_mode)
|
||||
.await?;
|
||||
(Some(res.0.into_inner()), Some(res.1))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
Ok(TimelineCreateResponseStorcon {
|
||||
timeline_info,
|
||||
safekeepers_generation,
|
||||
safekeepers,
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_archival_config(
|
||||
@@ -3870,6 +4120,187 @@ impl Service {
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn tenant_timeline_delete_safekeepers_reconcile(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
tl_p: &TimelinePersistence,
|
||||
sk_persistences: &HashMap<i64, SafekeeperPersistence>,
|
||||
) -> Result<(), ApiError> {
|
||||
// If at least one deletion succeeded, return if we are outside of a specified timeout
|
||||
let jwt = self.config.jwt_token.clone().map(SecretString::from);
|
||||
let mut joinset = JoinSet::new();
|
||||
|
||||
let mut members = Vec::new();
|
||||
for sk in tl_p.sk_set.iter() {
|
||||
let Some(sk_p) = sk_persistences.get(sk) else {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't find persisted entry for safekeeper with id {sk}"
|
||||
)))?;
|
||||
};
|
||||
members.push(SafekeeperId {
|
||||
id: NodeId(sk_p.id as u64),
|
||||
host: sk_p.host.clone(),
|
||||
pg_port: sk_p.port as u16,
|
||||
});
|
||||
}
|
||||
|
||||
let sks_to_reconcile = &tl_p.sk_set;
|
||||
for sk in sks_to_reconcile.iter() {
|
||||
// Unwrap is fine as we already would have returned error above
|
||||
let sk_p = sk_persistences.get(sk).unwrap();
|
||||
let sk_clone = NodeId(*sk as u64);
|
||||
let base_url = sk_p.base_url();
|
||||
let jwt = jwt.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
joinset.spawn(async move {
|
||||
let client = SafekeeperClient::new(sk_clone, base_url, jwt);
|
||||
let retry_result = backoff::retry(
|
||||
|| client.delete_timeline(tenant_id, timeline_id),
|
||||
|_e| {
|
||||
// TODO find right criteria here for deciding on retries
|
||||
false
|
||||
},
|
||||
3,
|
||||
5,
|
||||
"delete timeline on safekeeper",
|
||||
&cancel,
|
||||
)
|
||||
.await;
|
||||
if let Some(res) = retry_result {
|
||||
res.map_err(|e| {
|
||||
ApiError::InternalServerError(
|
||||
anyhow::Error::new(e).context("error deleting timeline on safekeeper"),
|
||||
)
|
||||
})
|
||||
} else {
|
||||
Err(ApiError::Cancelled)
|
||||
}
|
||||
});
|
||||
}
|
||||
// After we have built the joinset, we now wait for the tasks to complete,
|
||||
// but with a specified timeout to make sure we return swiftly, either with
|
||||
// a failure or success.
|
||||
const SK_DELETE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
let reconcile_deadline = tokio::time::Instant::now() + SK_DELETE_TIMELINE_RECONCILE_TIMEOUT;
|
||||
|
||||
// Treat the first task to finish differently, mostly when it times out,
|
||||
// because then we won't have any successful deletion.
|
||||
// For the second and third task, we don't rely on them succeeding, and we need this to support
|
||||
// continuing operations even if a safekeeper is down.
|
||||
let timeout_or_first = tokio::time::timeout_at(reconcile_deadline, async {
|
||||
joinset.join_next().await.unwrap()
|
||||
})
|
||||
.await;
|
||||
let mut reconcile_results = Vec::new();
|
||||
match timeout_or_first {
|
||||
Ok(Ok(res_1)) => {
|
||||
reconcile_results.push(res_1);
|
||||
}
|
||||
Ok(Err(_)) => {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"task was cancelled while reconciling timeline deletion"
|
||||
)));
|
||||
}
|
||||
Err(_) => {
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't reconcile timeline deletion on safekeepers within timeout"
|
||||
)));
|
||||
}
|
||||
}
|
||||
let timeout_or_last = tokio::time::timeout_at(reconcile_deadline, async {
|
||||
while let Some(next_res) = joinset.join_next().await {
|
||||
match next_res {
|
||||
Ok(res) => {
|
||||
reconcile_results.push(res);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::info!("aborting reconciliation due to join error: {e:?}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
if let Err(e) = timeout_or_last.await {
|
||||
// No error if cancelled or timed out: we already have feedback from a quorum of safekeepers
|
||||
tracing::info!(
|
||||
"timeout for last {} reconciliations: {e}",
|
||||
sks_to_reconcile.len() - 1
|
||||
);
|
||||
}
|
||||
// check now if quorum was reached in reconcile_results
|
||||
let successful = reconcile_results
|
||||
.into_iter()
|
||||
.filter_map(|res| res.ok())
|
||||
.collect::<Vec<_>>();
|
||||
tracing::info!(
|
||||
"Got {} successful results from reconciliation",
|
||||
successful.len()
|
||||
);
|
||||
let new_status_kind = if successful.is_empty() {
|
||||
// Failure
|
||||
return Err(ApiError::InternalServerError(anyhow!(
|
||||
"not enough successful reconciliations to reach quorum, please retry: {}",
|
||||
successful.len()
|
||||
)));
|
||||
} else if successful.len() == sks_to_reconcile.len() {
|
||||
// Success, state of timeline is Deleted
|
||||
TimelineStatusKind::Deleted
|
||||
} else if successful.len() == 2 {
|
||||
// Success, state of timeline remains Creating
|
||||
TimelineStatusKind::Deleting
|
||||
} else {
|
||||
unreachable!(
|
||||
"unexpected number of successful reconciliations {}",
|
||||
successful.len()
|
||||
);
|
||||
};
|
||||
|
||||
if new_status_kind == TimelineStatusKind::Deleted {
|
||||
self.persistence
|
||||
.update_timeline_status_deleted(tenant_id, timeline_id)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn tenant_timeline_delete_safekeepers(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<(), ApiError> {
|
||||
let tl = self
|
||||
.persistence
|
||||
.get_timeline(tenant_id, timeline_id)
|
||||
.await?;
|
||||
let Some(tl) = tl else {
|
||||
tracing::info!("timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed");
|
||||
return Ok(());
|
||||
};
|
||||
let status_kind =
|
||||
TimelineStatusKind::from_str(&tl.status_kind).map_err(ApiError::InternalServerError)?;
|
||||
if status_kind != TimelineStatusKind::Deleting {
|
||||
// Set status to deleting
|
||||
let new_status_kind = TimelineStatusKind::Deleting;
|
||||
self.persistence
|
||||
.update_timeline_status(tenant_id, timeline_id, new_status_kind, "{}".to_owned())
|
||||
.await?;
|
||||
}
|
||||
let sk_persistences = self
|
||||
.persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|p| (p.id, p))
|
||||
.collect::<HashMap<_, _>>();
|
||||
self.tenant_timeline_delete_safekeepers_reconcile(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&tl,
|
||||
&sk_persistences,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
pub(crate) async fn tenant_timeline_delete(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
@@ -3883,7 +4314,7 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
|
||||
let ps_fut = self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
|
||||
if targets.0.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
@@ -3955,7 +4386,13 @@ impl Service {
|
||||
)
|
||||
.await?;
|
||||
Ok(shard_zero_status)
|
||||
}).await?
|
||||
});
|
||||
|
||||
let sk_fut = self.tenant_timeline_delete_safekeepers(tenant_id, timeline_id);
|
||||
|
||||
let (ps_res, sk_res) = tokio::join!(ps_fut, sk_fut);
|
||||
sk_res?;
|
||||
ps_res?
|
||||
}
|
||||
|
||||
/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
|
||||
@@ -7653,6 +8090,32 @@ impl Service {
|
||||
global_observed
|
||||
}
|
||||
|
||||
pub(crate) async fn safekeepers_for_new_timeline(&self) -> Result<Vec<NodeId>, ApiError> {
|
||||
let mut all_safekeepers = self
|
||||
.persistence
|
||||
.list_safekeepers_with_timeline_count()
|
||||
.await?;
|
||||
all_safekeepers.sort_by_key(|sk| sk.2);
|
||||
let mut sks = Vec::new();
|
||||
let mut azs = HashSet::new();
|
||||
for (sk_id, az_id, _timeline_count) in all_safekeepers.iter() {
|
||||
if !azs.insert(az_id) {
|
||||
continue;
|
||||
}
|
||||
sks.push(*sk_id);
|
||||
if sks.len() == 3 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if sks.len() == 3 {
|
||||
Ok(sks)
|
||||
} else {
|
||||
Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't find three safekeepers in different AZs for new timeline"
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn safekeepers_list(
|
||||
&self,
|
||||
) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
|
||||
|
||||
130
storage_controller/src/service/safekeeper_reconciler.rs
Normal file
130
storage_controller/src/service/safekeeper_reconciler.rs
Normal file
@@ -0,0 +1,130 @@
|
||||
use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
use utils::{
|
||||
failpoint_support,
|
||||
id::{TenantId, TimelineId},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
id_lock_map::trace_shared_lock,
|
||||
persistence::SafekeeperPersistence,
|
||||
service::{TenantOperations, TimelineStatusCreating, TimelineStatusKind},
|
||||
};
|
||||
|
||||
use super::{Service, TimelinePersistence};
|
||||
|
||||
pub struct SafekeeperReconciler {
|
||||
service: Arc<Service>,
|
||||
duration: Duration,
|
||||
}
|
||||
|
||||
impl SafekeeperReconciler {
|
||||
pub fn new(service: Arc<Service>, duration: Duration) -> Self {
|
||||
SafekeeperReconciler { service, duration }
|
||||
}
|
||||
pub async fn run(&self, cancel: CancellationToken) {
|
||||
while !cancel.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(self.duration) => (),
|
||||
_ = cancel.cancelled() => break,
|
||||
}
|
||||
match self.reconcile_iteration(&cancel).await {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
tracing::warn!("Error during safekeeper reconciliation: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn reconcile_iteration(&self, cancel: &CancellationToken) -> Result<(), anyhow::Error> {
|
||||
let work_list = self
|
||||
.service
|
||||
.persistence
|
||||
.timelines_to_be_reconciled()
|
||||
.await?;
|
||||
if work_list.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let sk_persistences = self
|
||||
.service
|
||||
.persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|p| (p.id, p))
|
||||
.collect::<HashMap<_, _>>();
|
||||
for tl in work_list {
|
||||
let reconcile_fut =
|
||||
self.reconcile_timeline(&tl, &sk_persistences)
|
||||
.instrument(tracing::info_span!(
|
||||
"safekeeper_reconcile_timeline",
|
||||
timeline_id = tl.timeline_id,
|
||||
tenant_id = tl.tenant_id
|
||||
));
|
||||
|
||||
tokio::select! {
|
||||
r = reconcile_fut => r?,
|
||||
_ = cancel.cancelled() => break,
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn reconcile_timeline(
|
||||
&self,
|
||||
tl: &TimelinePersistence,
|
||||
sk_persistences: &HashMap<i64, SafekeeperPersistence>,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
tracing::info!("Reconciling timeline on safekeepers");
|
||||
let tenant_id = TenantId::from_slice(tl.tenant_id.as_bytes())?;
|
||||
let timeline_id = TimelineId::from_slice(tl.timeline_id.as_bytes())?;
|
||||
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.service.tenant_op_locks,
|
||||
tenant_id,
|
||||
TenantOperations::TimelineReconcile,
|
||||
)
|
||||
.await;
|
||||
|
||||
failpoint_support::sleep_millis_async!("safekeeper-reconcile-timeline-shared-lock");
|
||||
// Load the timeline again from the db: unless we hold the tenant lock, the timeline can change under our noses.
|
||||
let tl = self
|
||||
.service
|
||||
.persistence
|
||||
.get_timeline(tenant_id, timeline_id)
|
||||
.await?;
|
||||
let Some(tl) = tl else {
|
||||
// This can happen but is a bit unlikely, so print it on the warn level instead of info
|
||||
tracing::warn!("timeline row in database disappeared");
|
||||
return Ok(());
|
||||
};
|
||||
let status = TimelineStatusKind::from_str(&tl.status)?;
|
||||
match status {
|
||||
TimelineStatusKind::Created | TimelineStatusKind::Deleted => (),
|
||||
TimelineStatusKind::Creating => {
|
||||
let status_creating: TimelineStatusCreating = serde_json::from_str(&tl.status)?;
|
||||
self.service
|
||||
.tenant_timeline_create_safekeepers_reconcile(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&tl,
|
||||
&status_creating,
|
||||
sk_persistences,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
TimelineStatusKind::Deleting => {
|
||||
self.service
|
||||
.tenant_timeline_delete_safekeepers_reconcile(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
&tl,
|
||||
sk_persistences,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user