WIP proto

This commit is contained in:
Christian Schwarz
2025-06-03 19:14:48 +02:00
parent 1de0f41403
commit 36ba2b8e44
6 changed files with 92 additions and 20 deletions

1
Cargo.lock generated
View File

@@ -6077,6 +6077,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"tonic",
"tracing",
"tracing-subscriber",
"url",

View File

@@ -52,6 +52,7 @@ tokio-postgres.workspace = true
tokio-rustls.workspace = true
tokio-tar.workspace = true
tokio-util = { workspace = true }
tonic = { workspace = true }
tracing.workspace = true
url.workspace = true
metrics.workspace = true

View File

@@ -6,11 +6,12 @@ use std::{
time::Duration,
};
use anyhow::Context;
use safekeeper_api::models::TenantShardPageserverAttachment;
use serde::Serialize;
use tokio::sync::{Notify, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};
use utils::{
generation::Generation,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -67,11 +68,13 @@ struct PageserverTask {
state: Arc<Mutex<PageserverSharedState>>,
notify: Arc<tokio::sync::Notify>,
cancel: CancellationToken,
pending_advertisements: HashMap<TenantTimelineId, Lsn>,
}
#[derive(Default)]
struct PageserverSharedState {
pending_advertisements: HashMap<TenantTimelineId, Lsn>,
endpoint: tonic::endpoint::Endpoint,
}
struct CommitLsnAdv {
@@ -110,7 +113,7 @@ struct SafekeeperTimeline {
tli: Arc<Timeline>,
/// Shared with [`SafekeeperTenant::tenant_shard_attachments`].
tenant_shard_attachments: tokio::sync::watch::Receiver<TenantShardAttachments>,
remote_consistent_lsns: HashMap<ShardAttachmentId, Lsn>,
remote_consistent_lsns: Mutex<HashMap<ShardAttachmentId, Lsn>>,
}
#[derive(Default)]
@@ -159,6 +162,13 @@ struct ShardAttachmentId {
ps_id: NodeId,
}
pub struct SafekeeperTimelineHandle {}
impl SafekeeperTimelineHandle {
pub fn ready_for_eviction(&self) -> bool {
todo!()
}
}
impl World {
pub fn register_timeline(
&self,
@@ -182,7 +192,7 @@ impl World {
.run()
.await;
});
Ok(vacant.insert(SafekeeperTimelineHandle { tx }).clone())
Ok(vacant.insert(SafekeeperTimelineHandle {}).clone())
}
pub fn update_pageserver_attachments(
&self,
@@ -277,7 +287,6 @@ impl SafekeeperTimeline {
// that other nodes continue to recieve advertisements.
pageserver_handle.advertise_commit_lsn(CommitLsnAdv { ttid, commit_lsn });
}
drop(shard_attachments);
}
drop(gate_guard);
}
@@ -285,6 +294,7 @@ impl SafekeeperTimeline {
impl PageserverHandle {
pub fn advertise_commit_lsn(&self, adv: CommitLsnAdv) {
let CommitLsnAdv { ttid, commit_lsn } = adv;
let mut state = self.state.write().unwrap();
state.pending_advertisements.insert(ttid, commit_lsn);
}
@@ -299,22 +309,42 @@ impl PageserverTask {
_ = self.cancel.cancelled() => {
return;
}
_ = self.notify.notified() => {}
}
{
let mut state = self.state.locck().unwrap();
pending_advertisements.clear();
pending_advertisements =
std::mem::replace(&mut state.pending_advertisements, pending_advertisements);
drop(state);
}
todo!("grpc call");
// batch updates by sleeping
tokio::time::sleep(Duration::from_secs(1)).await;
res = self.run0() => {
if let Err(err) = res {
error!(?err, "pageserver loop failed");
// TODO: backoff? + cancellation sensitivity
tokio::time::sleep(Duration::from_secs(10)).await;
}
continue;
}
};
}
}
async fn run0(&mut self) -> anyhow::Result<()> {
use storage_broker::wal_advertisement::pageserver_client::PageserverClient;
let stream = async_stream::stream! { loop {
while self.pending_advertisements.is_empty() {
tokio::select! {
_ = self.cancel.cancelled() => {
return;
}
_ = self.notify.notified() => {}
}
let mut state = self.state.lock().unwrap();
std::mem::swap(
&mut state.pending_advertisements,
&mut self.pending_advertisements,
);
}
} };
let client: PageserverClient<_> = PageserverClient::connect(todo!())
.await
.context("connect")?;
let publish_stream = client
.publish_commit_lsn_advertisements(stream)
.await
.context("publish stream")?;
}
}
impl From<safekeeper_api::models::TenantShardPageserverAttachment> for ShardAttachmentId {

View File

@@ -5,7 +5,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// easy location, but apparently interference with cachepot sometimes fails
// the build then. Anyway, per cargo docs build script shouldn't output to
// anywhere but $OUT_DIR.
tonic_build::compile_protos("proto/broker.proto")
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
let protos = [
"proto/broker.proto",
"proto/wal_advertisement.proto",
];
for proto in protos {
tonic_build::compile_protos(proto)
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
}
Ok(())
}

View File

@@ -0,0 +1,29 @@
syntax = "proto3";
import "google/protobuf/empty.proto";
package wal_advertisement;
service Pageserver {
rpc PublishCommitLsnAdvertisements(stream CommitLsnAdvertisement) returns (google.protobuf.Empty) {};
rpc SubscribeRemoteConsistentLsnAdvertisements(google.protobuf.Empty) returns (stream RemoteConsistentLsnAdvertisement) {};
}
message CommitLsnAdvertisement {
TenantTimelineId tenant_timeline_id = 1;
}
message RemoteConsistentLsnAdvertisement {
bytes tenant_id = 1;
uint32 shard_id = 2;
bytes timeline_id = 3;
uint64 generation = 4;
uint64 remote_consistent_lsn = 5;
}
message TenantTimelineId {
bytes tenant_id = 1;
bytes timeline_id = 2;
}

View File

@@ -20,6 +20,11 @@ pub mod proto {
tonic::include_proto!("storage_broker");
}
pub mod wal_advertisement {
#![allow(clippy::derive_partial_eq_without_eq)]
tonic::include_proto!("wal_advertisement");
}
pub mod metrics;
// Re-exports to avoid direct tonic dependency in user crates.