finish the stub implementation of storcon side, it now PUTs to SKs and gets back 404s

This commit is contained in:
Christian Schwarz
2025-05-28 19:29:32 +02:00
parent a95015d967
commit 8eb853b731
11 changed files with 423 additions and 55 deletions

View File

@@ -6,6 +6,7 @@ use pageserver_api::shard::ShardIdentity;
use postgres_ffi::TimestampTz;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use utils::generation::Generation;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
@@ -309,3 +310,14 @@ pub struct PullTimelineResponse {
pub safekeeper_host: Option<String>,
// TODO: add more fields?
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TenantShardPageserverAttachments {
pub attachments: Vec<TenantShardPageserverAttachment>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TenantShardPageserverAttachment {
pub ps_id: NodeId,
pub generation: Generation,
}

View File

@@ -8,11 +8,12 @@ use std::error::Error as _;
use http_utils::error::HttpErrorBody;
use reqwest::{IntoUrl, Method, StatusCode};
use safekeeper_api::models::{
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest,
TimelineStatus,
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization,
TenantShardPageserverAttachments, TimelineCreateRequest, TimelineStatus,
};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
use utils::shard::TenantShardId;
#[derive(Debug, Clone)]
pub struct Client {
@@ -189,6 +190,19 @@ impl Client {
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn put_tenant_shard_pageserver_attachments(
&self,
tenant_shard_id: TenantShardId,
attachments: TenantShardPageserverAttachments,
) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{tenant_shard_id}/pageserver_attachments",
self.mgmt_api_endpoint
);
let resp = self.put(uri, attachments).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
async fn post<B: serde::Serialize, U: IntoUrl>(
&self,
uri: U,

View File

@@ -70,4 +70,4 @@ http-utils = { path = "../libs/http-utils/" }
utils = { path = "../libs/utils/" }
metrics = { path = "../libs/metrics/" }
control_plane = { path = "../control_plane" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -6,7 +6,8 @@ CREATE TABLE "sk_ps_discovery"(
"sk_id" INT8 NOT NULL REFERENCES "safekeepers"("id") ON DELETE CASCADE, -- more efficient that trigger on "safekeepers"
"ps_id" INT8 NOT NULL REFERENCES "nodes"("node_id") ON DELETE CASCADE, -- more efficient that trigger on "nodes"
"created_at" TIMESTAMPTZ NOT NULL,
"last_attempt_at" TIMESTAMPTZ,
"retries" INT4 NOT NULL DEFAULT 0,
"last_retry_at" TIMESTAMPTZ,
PRIMARY KEY("tenant_id", "shard_number", "shard_count", "ps_generation", "sk_id")
);
@@ -17,7 +18,7 @@ BEGIN
DELETE FROM sk_ps_discovery
WHERE tenant_id = ARG_TENANT_ID;
INSERT INTO sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id, ps_id,created_at,last_attempt_at)
INSERT INTO sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id, ps_id,created_at)
WITH sk_timeline_attachments AS (
SELECT DISTINCT tenant_id,unnest(array_cat(sk_set, new_sk_set)) as sk_id FROM timelines
WHERE
@@ -25,7 +26,7 @@ BEGIN
AND
timelines.deleted_at IS NULL
)
SELECT tenant_shards.tenant_id, tenant_shards.shard_number, tenant_shards.shard_count, tenant_shards.generation, sk_timeline_attachments.sk_id, tenant_shards.generation_pageserver, NOW(), NULL
SELECT tenant_shards.tenant_id, tenant_shards.shard_number, tenant_shards.shard_count, tenant_shards.generation, sk_timeline_attachments.sk_id, tenant_shards.generation_pageserver, NOW()
FROM tenant_shards
INNER JOIN sk_timeline_attachments ON tenant_shards.tenant_id = sk_timeline_attachments.tenant_id;

View File

@@ -436,7 +436,8 @@ async fn async_main() -> anyhow::Result<()> {
};
// Validate that we can connect to the database
Persistence::await_connection(secrets.database_url.clone(), args.db_connect_timeout.into()).await?;
Persistence::await_connection(secrets.database_url.clone(), args.db_connect_timeout.into())
.await?;
let persistence = Arc::new(Persistence::new(secrets.database_url).await);

View File

@@ -1,6 +1,7 @@
pub(crate) mod split_state;
use std::collections::HashMap;
use std::io::Write;
use std::ops::Add;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
@@ -139,6 +140,8 @@ pub(crate) enum DatabaseOperation {
DeleteTimelineImport,
ListTimelineImports,
IsTenantImportingTimeline,
ListSkPsDiscovery,
UpdateSkPsDiscoveryAttempt,
}
#[must_use]
@@ -1885,6 +1888,76 @@ impl Persistence {
drop(client);
}))
}
pub(crate) async fn get_all_sk_ps_discovery_work(
&self,
) -> DatabaseResult<Vec<SkPsDiscoveryPersistence>> {
use crate::schema::sk_ps_discovery::dsl;
self.with_measured_conn(DatabaseOperation::ListSkPsDiscovery, move |conn| {
Box::pin(async move {
let vec: Vec<SkPsDiscoveryPersistence> = dsl::sk_ps_discovery.load(conn).await?;
Ok(vec)
})
})
.await
}
pub(crate) async fn update_sk_ps_discovery_attempt(
&self,
pk: SkPsDiscoveryPersistencePk,
update: Result<(), ()>,
) -> DatabaseResult<()> {
use crate::schema::sk_ps_discovery::dsl;
self.with_measured_conn(DatabaseOperation::UpdateSkPsDiscoveryAttempt, move |conn| {
let pk = pk.clone();
Box::pin(async move {
match update {
Ok(()) => {
let SkPsDiscoveryPersistencePk {
tenant_id,
shard_number,
shard_count,
ps_generation,
sk_id,
} = pk;
diesel::delete(dsl::sk_ps_discovery)
.filter(dsl::tenant_id.eq(tenant_id))
.filter(dsl::shard_number.eq(shard_number))
.filter(dsl::shard_count.eq(shard_count))
.filter(dsl::ps_generation.eq(ps_generation))
.filter(dsl::sk_id.eq(sk_id))
.execute(conn) // TODO: check update count?
.await?;
}
Err(_) => {
let SkPsDiscoveryPersistencePk {
tenant_id,
shard_number,
shard_count,
ps_generation,
sk_id,
} = pk;
diesel::update(dsl::sk_ps_discovery)
.filter(dsl::tenant_id.eq(tenant_id))
.filter(dsl::shard_number.eq(shard_number))
.filter(dsl::shard_count.eq(shard_count))
.filter(dsl::ps_generation.eq(ps_generation))
.filter(dsl::sk_id.eq(sk_id))
.set((
dsl::retries.eq(dsl::retries.add(1)), // XXX: in split-brain situation we would bump twice...
dsl::last_retry_at.eq(diesel::dsl::now),
))
.execute(conn) // TODO: check update count?
.await?;
}
}
Ok(())
})
})
.await
}
}
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
@@ -2469,3 +2542,37 @@ pub(crate) struct TimelineImportPersistence {
pub(crate) timeline_id: String,
pub(crate) shard_statuses: serde_json::Value,
}
#[derive(Insertable, AsChangeset, Selectable, Clone)]
#[diesel(table_name = crate::schema::sk_ps_discovery)]
pub(crate) struct SkPsDiscoveryPersistencePk {
pub(crate) tenant_id: String,
pub(crate) shard_number: i32,
pub(crate) shard_count: i32,
pub(crate) ps_generation: i32,
pub(crate) sk_id: i64,
}
#[derive(Queryable, Selectable, Clone, PartialEq, Eq)]
#[diesel(table_name = crate::schema::sk_ps_discovery)]
pub(crate) struct SkPsDiscoveryPersistence {
pub(crate) tenant_id: String,
pub(crate) shard_number: i32,
pub(crate) shard_count: i32,
pub(crate) ps_generation: i32,
pub(crate) sk_id: i64,
pub(crate) ps_id: i64,
pub(crate) created_at: chrono::DateTime<chrono::Utc>,
pub(crate) retries: i32,
pub(crate) last_retry_at: Option<chrono::DateTime<chrono::Utc>>,
}
impl SkPsDiscoveryPersistence {
pub(crate) fn tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
Ok(TenantShardId {
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
shard_number: ShardNumber(self.shard_number as u8),
shard_count: ShardCount::new(self.shard_count as u8),
})
}
}

View File

@@ -1,9 +1,11 @@
use safekeeper_api::models::{
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest,
self, PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization,
TenantShardPageserverAttachments, TimelineCreateRequest,
};
use safekeeper_client::mgmt_api::{Client, Result};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
use utils::shard::TenantShardId;
use crate::metrics::PageserverRequestLabelGroup;
@@ -164,4 +166,19 @@ impl SafekeeperClient {
self.inner.utilization().await
)
}
pub async fn put_tenant_shard_pageserver_attachments(
&self,
tenant_shard_id: TenantShardId,
attachments: TenantShardPageserverAttachments,
) -> Result<()> {
measured_request!(
"put_tenant_shard_pageserver_attachments",
crate::metrics::Method::Put,
&self.node_id_label,
self.inner
.put_tenant_shard_pageserver_attachments(tenant_shard_id, attachments)
.await
)
}
}

View File

@@ -60,13 +60,26 @@ diesel::table! {
}
}
diesel::table! {
sk_ps_discovery (tenant_id, shard_number, shard_count, ps_generation, sk_id) {
tenant_id -> Varchar,
shard_number -> Int4,
shard_count -> Int4,
ps_generation -> Int4,
sk_id -> Int8,
ps_id -> Int8,
created_at -> Timestamptz,
retries -> Int4,
last_retry_at -> Nullable<Timestamptz>,
}
}
diesel::table! {
tenant_shards (tenant_id, shard_number, shard_count) {
tenant_id -> Varchar,
shard_number -> Int4,
shard_count -> Int4,
shard_stripe_size -> Int4,
// pageserver generation
generation -> Nullable<Int4>,
generation_pageserver -> Nullable<Int8>,
placement_policy -> Varchar,
@@ -93,7 +106,6 @@ diesel::table! {
tenant_id -> Varchar,
timeline_id -> Varchar,
start_lsn -> PgLsn,
// sk config generation
generation -> Int4,
sk_set -> Array<Nullable<Int8>>,
new_sk_set -> Nullable<Array<Nullable<Int8>>>,
@@ -102,24 +114,8 @@ diesel::table! {
}
}
// Operational table that contains pending notifications for Safekeepers
// about tenant shard pageserver-side attachments.
// Rows are removed when the notification was acknowledged by the Safekeeper.
diesel::table! {
use diesel::sql_types::*;
sk_ps_discovery(tenant_id, shard_number, shard_count, ps_generation, sk_id) {
tenant_id -> Varchar,
shard_number -> Int4,
shard_count -> Int4,
ps_generation -> Int4,
sk_id -> Int8,
// payload
ps_id -> Int8,
// tracking of reliable delivery
created_at -> Timestamptz,
last_attempt_at -> Nullable<Timestamptz>,
}
}
diesel::joinable!(sk_ps_discovery -> nodes (ps_id));
diesel::joinable!(sk_ps_discovery -> safekeepers (sk_id));
diesel::allow_tables_to_appear_in_same_query!(
controllers,
@@ -127,8 +123,8 @@ diesel::allow_tables_to_appear_in_same_query!(
nodes,
safekeeper_timeline_pending_ops,
safekeepers,
sk_ps_discovery,
tenant_shards,
timeline_imports,
timelines,
sk_ps_discovery,
);

View File

@@ -489,7 +489,6 @@ pub struct Service {
inner: Arc<std::sync::RwLock<ServiceState>>,
config: Config,
persistence: Arc<Persistence>,
sk_ps_discovery: sk_ps_discovery::ActorClient,
compute_hook: Arc<ComputeHook>,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
@@ -1194,6 +1193,16 @@ impl Service {
}
}
}
#[instrument(skip_all)]
async fn run_sk_ps_discovery(self: &Arc<Self>) {
self.startup_complete.clone().wait().await;
sk_ps_discovery::run(
self.clone(),
self.http_client.clone(), /* TODO this client is configured to openf resh TCP connection each time, very inefficient */
).await;
}
/// Heartbeat all storage nodes once in a while.
#[instrument(skip_all)]
async fn spawn_heartbeat_driver(&self) {
@@ -1768,8 +1777,6 @@ impl Service {
LeadershipStatus::Leader
};
let sk_ps_discovery = sk_ps_discovery::spawn(persistence.clone());
let this = Arc::new(Self {
inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
nodes,
@@ -1782,7 +1789,6 @@ impl Service {
))),
config: config.clone(),
persistence,
sk_ps_discovery,
compute_hook: Arc::new(ComputeHook::new(config.clone())?),
result_tx,
heartbeater_ps,
@@ -1802,7 +1808,7 @@ impl Service {
reconcilers_gate: Gate::default(),
tenant_op_locks: Default::default(),
node_op_locks: Default::default(),
http_client,
http_client: http_client.clone(),
step_down_barrier: Default::default(),
});
@@ -1870,6 +1876,15 @@ impl Service {
}
});
tokio::task::spawn({
let this = this.clone();
let startup_complete = startup_complete.clone();
async move {
startup_complete.wait().await;
this.run_sk_ps_discovery().await
}
});
tokio::task::spawn({
let this = this.clone();
let startup_complete = startup_complete.clone();

View File

@@ -647,6 +647,11 @@ impl Service {
sk.describe_response()
}
pub(crate) fn get_safekeeper_object(&self, node_id: i64) -> Option<Safekeeper> {
let locked = self.inner.read().unwrap();
locked.safekeepers.get(&NodeId(node_id as u64)).cloned()
}
pub(crate) async fn upsert_safekeeper(
self: &Arc<Service>,
record: crate::persistence::SafekeeperUpsert,

View File

@@ -1,33 +1,44 @@
use std::sync::Arc;
use std::{
collections::{HashMap, hash_map},
sync::Arc,
time::Duration,
};
use anyhow::Context;
use futures::StreamExt;
use tracing::info;
use utils::id::TenantId;
use futures::{StreamExt, stream::FuturesUnordered};
use safekeeper_api::models::{TenantShardPageserverAttachment, TenantShardPageserverAttachments};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, Span, error, info, info_span};
use utils::{
generation::Generation,
id::{NodeId, TenantId},
logging::SecretString,
shard::TenantShardId,
};
use crate::persistence::Persistence;
use crate::{
heartbeater::SafekeeperState,
persistence::{Persistence, SkPsDiscoveryPersistence, SkPsDiscoveryPersistencePk},
};
pub struct ActorClient {
tx: tokio::sync::mpsc::UnboundedSender<Message>,
}
use super::Service;
struct Actor {
service: Arc<Service>,
persistence: Arc<Persistence>,
rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
http_client: reqwest::Client,
}
#[derive(Debug)]
enum Message {}
pub fn spawn(persistence: Arc<Persistence>) -> ActorClient {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let actor = Actor { persistence, rx };
tokio::spawn(actor.run());
ActorClient { tx }
pub async fn run(service: Arc<Service>, http_client: reqwest::Client) {
let actor = Actor {
persistence: service.persistence.clone(),
service,
http_client, // XXX: build our own client instead of getting Service's client; we probably want idle conn to each sk
};
actor.run().await;
}
impl ActorClient {}
impl Actor {
async fn run(mut self) {
loop {
@@ -55,8 +66,37 @@ impl Actor {
.await
.context("listen to sk_ps_discovery")?;
let mut sync_full_ticker = tokio::time::interval(std::time::Duration::from_secs(5));
#[derive(PartialEq, Eq, Hash, Debug)]
struct TaskKey {
tenant_shard_id: TenantShardId,
ps_generation: Generation,
sk_id: NodeId,
}
struct Task {
work: SkPsDiscoveryPersistence,
cancel: CancellationToken,
join_handle: Option<JoinHandle<()>>,
}
impl<'a> TryFrom<&'a SkPsDiscoveryPersistence> for TaskKey {
type Error = hex::FromHexError;
fn try_from(value: &'a SkPsDiscoveryPersistence) -> Result<Self, Self::Error> {
Ok(Self {
tenant_shard_id: value.tenant_shard_id()?,
ps_generation: Generation::new(value.ps_generation as u32),
sk_id: NodeId(value.sk_id as u64),
})
}
}
let mut tasks = HashMap::new();
loop {
tokio::select! {
biased; // control messages have higher priority, the periodic full tick, then subscriptions.
_ = sync_full_ticker.tick() => {
info!("rebuild");
}
maybe_res = subscription.next() => {
match maybe_res {
None => {
@@ -65,6 +105,7 @@ impl Actor {
Some(Ok(tenant_id)) => {
let tenant_id: TenantId = tenant_id;
info!(?tenant_id, "notify for tenant_id");
// for now, just also rebuild everything
}
Some(Err(err)) => {
let err: serde_json::Error = err;
@@ -72,10 +113,169 @@ impl Actor {
}
}
}
msg = self.rx.recv() => {
todo!("{msg:?}");
}
// get list of tasks from database
let mut new_tasks = self
.persistence
.get_all_sk_ps_discovery_work()
.await
.context("get_all_sk_ps_discovery_work")?
.into_iter()
.map(|work: SkPsDiscoveryPersistence| {
anyhow::Ok((
TaskKey::try_from(&work)?,
Task {
work,
cancel: CancellationToken::new(),
join_handle: None,
},
))
})
.collect::<Result<HashMap<_, _>, _>>()?;
// Carry over ongoing tasks
let mut ongoing_wait = FuturesUnordered::new();
for (
task_key,
Task {
work: ongoing_persistence,
cancel,
join_handle,
},
) in tasks.drain()
{
match new_tasks.entry(task_key) {
hash_map::Entry::Occupied(mut planned) => {
let Task {
work: planned_persistence,
cancel: planned_cancel,
join_handle: planned_jh,
} = planned.get_mut();
assert!(planned_jh.is_none());
if *planned_persistence == ongoing_persistence {
*planned_jh = join_handle;
*planned_cancel = cancel;
} else {
match join_handle {
Some(jh) => {
cancel.cancel();
ongoing_wait.push(jh);
}
None => (),
}
}
}
hash_map::Entry::Vacant(_) => match join_handle {
Some(jh) => {
cancel.cancel();
ongoing_wait.push(jh);
}
None => (),
},
}
}
while let Some(_) = ongoing_wait.next().await {}
tasks = new_tasks;
// Kick off new tasks
for (key, task) in tasks.iter_mut() {
if task.join_handle.is_none() {
task.join_handle = Some(tokio::spawn(
DeliveryAttempt {
cancel: task.cancel.clone(),
persistence: self.persistence.clone(),
service: self.service.clone(),
http_client: self.http_client.clone(),
work: task.work.clone(),
}
.run()
.instrument({
let span = info_span!(parent: None, "sk_ps_discovery_delivery", ?key);
span.follows_from(Span::current());
span
}),
))
}
}
}
}
}
struct DeliveryAttempt {
cancel: CancellationToken,
persistence: Arc<Persistence>,
service: Arc<super::Service>,
http_client: reqwest::Client,
work: SkPsDiscoveryPersistence,
}
impl DeliveryAttempt {
pub async fn run(self) {
let res = self.run0().await;
if self.cancel.is_cancelled() {
return;
}
if let Err(ref err) = res {
error!(?err, "attempt failed");
}
let res = self
.persistence
.update_sk_ps_discovery_attempt(
SkPsDiscoveryPersistencePk {
tenant_id: self.work.tenant_id,
shard_number: self.work.shard_number,
shard_count: self.work.shard_count,
ps_generation: self.work.ps_generation,
sk_id: self.work.sk_id,
},
res.map_err(|_| ()),
)
.await;
if let Err(ref err) = res {
error!(?err, "persistence of attempt result failed");
}
}
async fn run0(&self) -> anyhow::Result<()> {
let Some(sk) = self.service.get_safekeeper_object(self.work.sk_id) else {
anyhow::bail!("safekeeper object does not exist");
};
match sk.availability() {
SafekeeperState::Available { .. } => (),
SafekeeperState::Offline => {
anyhow::bail!("safekeeper is offline");
}
}
let tenant_shard_id = self.work.tenant_shard_id()?;
sk.with_client_retries(
|client| async move {
client
.put_tenant_shard_pageserver_attachments(
tenant_shard_id,
TenantShardPageserverAttachments {
attachments: vec![TenantShardPageserverAttachment {
ps_id: NodeId(self.work.ps_id as u64),
generation: Generation::new(self.work.ps_generation as u32),
}],
},
)
.await
},
&self.http_client,
&self
.service
.config
.safekeeper_jwt_token
.clone()
.map(SecretString::from),
1,
3,
Duration::from_secs(1),
&self.cancel,
)
.await?;
Ok(())
}
}