This commit is contained in:
Christian Schwarz
2025-05-05 12:45:45 +02:00
parent 257693e4f2
commit 4a1b52c12e
17 changed files with 177 additions and 204 deletions

View File

@@ -318,7 +318,6 @@ pub struct PutTenantPageserverLocationRequest {
#[derive(Debug, Serialize, Deserialize)]
pub struct TenantShardPageserverLocation {
pub tenant_shard_id: TenantShardId,
pub generation: Generation,
pub pageserver_node_id: NodeId,
}

View File

@@ -46,6 +46,8 @@ pub struct ConnectionConfigArgs<'a> {
pub auth_token: Option<&'a str>,
pub availability_zone: Option<&'a str>,
pub pageserver_generation: Option<u32>,
}
impl<'a> ConnectionConfigArgs<'a> {
@@ -72,6 +74,10 @@ impl<'a> ConnectionConfigArgs<'a> {
));
}
if let Some(pageserver_generation) = self.pageserver_generation {
options.push(format!("pageserver_generation={pageserver_generation}"));
}
options
}
}

View File

@@ -879,7 +879,9 @@ impl ConnectionManagerState {
shard_stripe_size,
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
availability_zone: self.conf.availability_zone.as_deref()
availability_zone: self.conf.availability_zone.as_deref(),
// TODO: do we still have the emergency mode that runs without generations? If so, this expect would panic in that mode.
pageserver_generation: Some(self.timeline.generation.into().expect("attachments always have a generation number nowadays")),
};
match wal_stream_connection_config(connection_conf_args) {

View File

@@ -24,7 +24,7 @@ use safekeeper::defaults::{
};
use safekeeper::{
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_backup, wal_service,
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_advertiser, wal_backup, wal_service,
};
use sd_notify::NotifyState;
use storage_broker::{DEFAULT_ENDPOINT, Uri};

View File

@@ -18,6 +18,7 @@ use safekeeper_api::models::ConnectionId;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{Instrument, debug, info, info_span};
use utils::auth::{Claims, JwtAuth, Scope};
use utils::generation::{self, Generation};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::postgres_client::PostgresClientProtocol;
@@ -37,6 +38,7 @@ pub struct SafekeeperPostgresHandler {
pub timeline_id: Option<TimelineId>,
pub ttid: TenantTimelineId,
pub shard: Option<ShardIdentity>,
pub pageserver_generation: Option<Generation>,
pub protocol: Option<PostgresClientProtocol>,
/// Unique connection id is logged in spans for observability.
pub conn_id: ConnectionId,
@@ -159,6 +161,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
let mut shard_count: Option<u8> = None;
let mut shard_number: Option<u8> = None;
let mut shard_stripe_size: Option<u32> = None;
let mut pageserver_generation: Option<Generation> = None;
for opt in options {
// FIXME `ztenantid` and `ztimelineid` left for compatibility during deploy,
@@ -201,6 +204,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
format!("Failed to parse {value} as shard stripe size")
})?);
}
Some(("pageserver_generation", value)) => {
self.pageserver_generation =
Some(value.parse::<u32>().map(Generation::new).with_context(
|| format!("Failed to parse {value} as generation"),
)?);
}
_ => continue,
}
}
@@ -259,6 +268,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
tracing::Span::current().record("shard", tracing::field::display(slug));
}
}
if let Some(pageserver_generation) = self.pageserver_generation {
tracing::Span::current().record(
"pageserver_generation",
tracing::field::display(pageserver_generation.get_suffix()),
);
}
Ok(())
} else {
@@ -370,6 +385,7 @@ impl SafekeeperPostgresHandler {
timeline_id: None,
ttid: TenantTimelineId::empty(),
shard: None,
pageserver_generation: None,
protocol: None,
conn_id,
claims: None,

View File

@@ -5,7 +5,7 @@ pub use routes::make_router;
pub use safekeeper_api::models;
use tokio_util::sync::CancellationToken;
use crate::{GlobalTimelines, SafeKeeperConf};
use crate::{GlobalTimelines, SafeKeeperConf, wal_advertiser};
pub async fn task_main_http(
conf: Arc<SafeKeeperConf>,

View File

@@ -4,7 +4,6 @@ use std::io::Write as _;
use std::str::FromStr;
use std::sync::Arc;
use desim::node_os;
use http_utils::endpoint::{
self, ChannelWriter, auth_middleware, check_permission_with, profile_cpu_handler,
profile_heap_handler, prometheus_metrics_handler, request_span,
@@ -38,6 +37,7 @@ use utils::shard::TenantShardId;
use crate::debug_dump::TimelineDigestRequest;
use crate::safekeeper::TermLsn;
use crate::timelines_global_map::DeleteOrExclude;
use crate::wal_advertiser::advmap::UpdatePageserverAttachmentsArg;
use crate::{
GlobalTimelines, SafeKeeperConf, copy_timeline, debug_dump, patch_control_file, pull_timeline,
wal_advertiser,
@@ -95,39 +95,37 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
json_response(StatusCode::OK, response_body)
}
async fn tenant_put_pageserver_locations_handler(
async fn tenant_put_pageserver_attachments(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_id))?;
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let PutTenantPageserverLocationRequest {
pageserver_locations,
}: PutTenantPageserverLocationRequest = json_request(&mut request).await?;
let global_timelines = get_global_timelines(&request);
global_timelines
.update_tenant_locations(tenant_id, pageserver_locations)
.await?;
json_response(StatusCode::OK, response_body)
}
async fn tenant_get_pageserver_locations_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_id))?;
let global_timelines = get_global_timelines(&request);
let locations = global_timelines
.get_tenant_locations(tenant_id)
.await?;
json_response(StatusCode::OK, locations)
let wal_advertiser = global_timelines.get_wal_advertiser();
wal_advertiser
.update_pageserver_attachments(
tenant_shard_id,
pageserver_locations
.into_iter()
.map(
|TenantShardPageserverLocation {
generation,
pageserver_node_id,
}| UpdatePageserverAttachmentsArg {
generation,
pageserver_node_id,
},
)
.collect(),
)
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -753,11 +751,8 @@ pub fn make_router(
.delete("/v1/tenant/:tenant_id", |r| {
request_span(r, tenant_delete_handler)
})
.put("/v1/tenant/:tenant_shard_id/pageserver_locations", |r| {
request_span(r, tenant_put_pageserver_locations_handler)
})
.get("/v1/tenant/:tenant_shard_id/pageserver_locations", |r| {
request_span(r, tenant_get_pageserver_locations_handler)
.put("/v1/tenant/:tenant_shard_id/pageserver_attachments", |r| {
request_span(r, tenant_put_pageserver_attachments)
})
// Will be used in the future instead of implicit timeline creation
.post("/v1/tenant/timeline", |r| {

View File

@@ -38,13 +38,12 @@ pub mod timeline_eviction;
pub mod timeline_guard;
pub mod timeline_manager;
pub mod timelines_set;
pub mod wal_advertiser;
pub mod wal_backup;
pub mod wal_backup_partial;
pub mod wal_reader_stream;
pub mod wal_service;
pub mod wal_storage;
pub mod tenant;
pub mod wal_advertiser;
#[cfg(any(test, feature = "benchmarking"))]
pub mod test_utils;

View File

@@ -360,6 +360,7 @@ async fn recovery_stream(
listen_pg_addr_str: &donor.pg_connstr,
auth_token: None,
availability_zone: None,
pageserver_generation: None,
};
let cfg = wal_stream_connection_config(connection_conf_args)?;
let mut cfg = cfg.to_tokio_postgres_config();

View File

@@ -37,6 +37,7 @@ use crate::send_interpreted_wal::{
Batch, InterpretedWalReader, InterpretedWalReaderHandle, InterpretedWalSender,
};
use crate::timeline::WalResidentTimeline;
use crate::wal_advertiser;
use crate::wal_reader_stream::StreamingWalReader;
use crate::wal_storage::WalReader;
@@ -657,11 +658,29 @@ impl SafekeeperPostgresHandler {
let tli_cancel = tli.cancel.clone();
let wal_advertiser = match (self.shard, self.pageserver_generation) {
(Some(shard), Some(pageserver_generation)) => {
Some(tli.wal_advertiser.get_pageserver_timeline(
self.ttid,
shard.shard_index(),
pageserver_generation,
))
}
(shard, pageserver_generation) => {
debug!(
?shard,
?pageserver_generation,
"cannot feedback last_record_lsn to wal_advertiser subsystem, client must specify shard and pageserver_generation"
);
None
}
};
let mut reply_reader = ReplyReader {
reader,
ws_guard: ws_guard.clone(),
tli,
shard: self.shard.clone(),
wal_advertiser,
};
let res = tokio::select! {
@@ -978,7 +997,7 @@ struct ReplyReader<IO> {
reader: PostgresBackendReader<IO>,
ws_guard: Arc<WalSenderGuard>,
tli: WalResidentTimeline,
shard: Option<ShardId>,
wal_advertiser: Option<Arc<wal_advertiser::advmap::PageserverTimeline>>,
}
impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
@@ -1023,8 +1042,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
.walsenders
.record_ps_feedback(self.ws_guard.id, &ps_feedback);
self.tli
.update_remote_consistent_lsn(shard, ps_feedback.remote_consistent_lsn)
.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
.await;
if let Some(wal_advertiser) = &self.wal_advertiser {
wal_advertiser.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn);
}
// in principle new remote_consistent_lsn could allow to
// deactivate the timeline, but we check that regularly through
// broker updated, not need to do it here

View File

@@ -1,42 +0,0 @@
use safekeeper_api::models::TenantShardPageserverLocation;
use utils::shard::TenantShardId;
use walproposer::bindings::Generation;
pub(crate) struct Tenant {
inner: Arc<Mutex<Inner>>,
}
struct Inner {
data: HashMap<TenantShardPageserverLocation, PerLocationState>,
}
struct PerLocationState {
remote_consistent_lsn: Lsn,
last_update_at: Instant,
dirty: bool,
}
impl Tenant {
pub fn load(conf: Arc<SafeKeeperConf>, tenant_id: TenantId) -> anyhow::Result<Arc<Tenant>> {
todo!()
}
pub fn get_locations(&self) -> anyhow::Result<Vec<TenantShardPageserverLocation>> {
todo!()
}
pub fn update_locations(
&self,
locations: Vec<TenantShardPageserverLocation>,
) -> anyhow::Result<()> {
todo!()
}
pub fn update_remote_consistent_lsn(
&self,
tenant_shard_id: TenantShardId,
generation: Generation,
timeline_id: TimelineId,
lsn: Lsn,
) -> anyhow::Result<()> {
todo!()
}
}

View File

@@ -106,6 +106,7 @@ impl Env {
&timeline_dir,
&remote_path,
shared_state,
todo!(),
conf.clone(),
);
timeline.bootstrap(

View File

@@ -33,14 +33,15 @@ use crate::receive_wal::WalReceivers;
use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn};
use crate::send_wal::{WalSenders, WalSendersTimelineMetricValues};
use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
use crate::tenant::Tenant;
use crate::timeline_guard::ResidenceGuard;
use crate::timeline_manager::{AtomicStatus, ManagerCtl};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self, remote_timeline_path};
use crate::wal_backup_partial::PartialRemoteSegment;
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage};
use crate::{
SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_advertiser, wal_storage,
};
fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
PeerInfo {
@@ -427,7 +428,6 @@ type RemoteDeletionReceiver = tokio::sync::watch::Receiver<Option<anyhow::Result
/// It also holds SharedState and provides mutually exclusive access to it.
pub struct Timeline {
pub ttid: TenantTimelineId,
pub tenant: Arc<Tenant>,
pub remote_path: RemotePath,
/// Used to broadcast commit_lsn updates to all background jobs.
@@ -449,6 +449,7 @@ pub struct Timeline {
/// synchronized with the disk. This is tokio mutex as we write WAL to disk
/// while holding it, ensuring that consensus checks are in order.
mutex: RwLock<SharedState>,
pub(crate) wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
walsenders: Arc<WalSenders>,
walreceivers: Arc<WalReceivers>,
timeline_dir: Utf8PathBuf,
@@ -478,7 +479,7 @@ impl Timeline {
timeline_dir: &Utf8Path,
remote_path: &RemotePath,
shared_state: SharedState,
tenant: Arc<Tenant>,
wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
conf: Arc<SafeKeeperConf>,
) -> Arc<Self> {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
@@ -493,7 +494,6 @@ impl Timeline {
Arc::new(Self {
ttid,
tenant,
remote_path: remote_path.to_owned(),
timeline_dir: timeline_dir.to_owned(),
commit_lsn_watch_tx,
@@ -503,6 +503,7 @@ impl Timeline {
shared_state_version_tx,
shared_state_version_rx,
mutex: RwLock::new(shared_state),
wal_advertiser,
walsenders: WalSenders::new(walreceivers.clone()),
walreceivers,
gate: Default::default(),
@@ -520,8 +521,8 @@ impl Timeline {
/// Load existing timeline from disk.
pub fn load_timeline(
conf: Arc<SafeKeeperConf>,
tenant: Arc<Tenant>,
ttid: TenantTimelineId,
wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
) -> Result<Arc<Timeline>> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
@@ -534,7 +535,7 @@ impl Timeline {
&timeline_dir,
&remote_path,
shared_state,
tenant,
wal_advertiser,
conf,
))
}
@@ -1069,19 +1070,12 @@ impl WalResidentTimeline {
}
/// Update in memory remote consistent lsn.
pub async fn update_remote_consistent_lsn(&self, shard: Option<ShardId>, candidate: Lsn) {
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
let mut shared_state = self.write_shared_state().await;
shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
shared_state.sk.state().inmem.remote_consistent_lsn,
candidate,
);
drop(shared_state);
self.tli.tenant.update_remote_consistent_lsn(
todo!(),
todo!(),
todo!(),
)
}
}

View File

@@ -27,11 +27,10 @@ use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
use crate::http::routes::DeleteOrExcludeError;
use crate::rate_limit::RateLimiter;
use crate::state::TimelinePersistentState;
use crate::tenant::Tenant;
use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir};
use crate::timelines_set::TimelinesSet;
use crate::wal_storage::Storage;
use crate::{SafeKeeperConf, control_file, wal_storage};
use crate::{SafeKeeperConf, control_file, wal_advertiser, wal_storage};
// Timeline entry in the global map: either a ready timeline, or mark that it is
// being created.
@@ -42,7 +41,6 @@ enum GlobalMapTimeline {
}
struct GlobalTimelinesState {
tenants: HashMap<TenantShardId, Arc<Tenant>>,
timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
@@ -52,16 +50,25 @@ struct GlobalTimelinesState {
conf: Arc<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
wal_advertisement: Arc<wal_advertiser::advmap::World>,
global_rate_limiter: RateLimiter,
}
impl GlobalTimelinesState {
/// Get dependencies for a timeline constructor.
fn get_dependencies(&self) -> (Arc<SafeKeeperConf>, Arc<TimelinesSet>, RateLimiter) {
fn get_dependencies(
&self,
) -> (
Arc<SafeKeeperConf>,
Arc<TimelinesSet>,
RateLimiter,
Arc<wal_advertiser::advmap::World>,
) {
(
self.conf.clone(),
self.broker_active_set.clone(),
self.global_rate_limiter.clone(),
self.wal_advertisement.clone(),
)
}
@@ -93,11 +100,11 @@ impl GlobalTimelines {
pub fn new(conf: Arc<SafeKeeperConf>) -> Self {
Self {
state: Mutex::new(GlobalTimelinesState {
tenants: HashMap::new(),
timelines: HashMap::new(),
tombstones: HashMap::new(),
conf,
broker_active_set: Arc::new(TimelinesSet::default()),
wal_advertisement: Arc::new(wal_advertiser::advmap::World::default()),
global_rate_limiter: RateLimiter::new(1, 1),
}),
}
@@ -154,24 +161,13 @@ impl GlobalTimelines {
/// just lock and unlock it for each timeline -- this function is called
/// during init when nothing else is running, so this is fine.
async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let (conf, broker_active_set, partial_backup_rate_limiter, wal_advertiser) = {
let state = self.state.lock().unwrap();
state.get_dependencies()
};
let timelines_dir = get_tenant_dir(&conf, &tenant_id);
let tenant = match Tenant::load(conf.clone(), tenant_id) {
Ok(tenant) => {
let mut state = self.state.lock().unwrap();
state.tenants.insert(tenant_id, Arc::clone(&tenant));
tenant
}
Err(e) => {
todo!("should we fail the whole startup process?")
}
};
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
.with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
{
@@ -181,7 +177,8 @@ impl GlobalTimelines {
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(conf.clone(), tenant, ttid) {
let wal_advertiser = wal_advertiser.load_timeline(ttid);
match Timeline::load_timeline(conf.clone(), ttid, wal_advertiser) {
Ok(tli) => {
let mut shared_state = tli.write_shared_state().await;
self.state
@@ -241,7 +238,7 @@ impl GlobalTimelines {
start_lsn: Lsn,
commit_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let (conf, _, _) = {
let (conf, _, _, _) = {
let state = self.state.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
// Timeline already exists, return it.
@@ -286,7 +283,7 @@ impl GlobalTimelines {
check_tombstone: bool,
) -> Result<Arc<Timeline>> {
// Check for existence and mark that we're creating it.
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let (conf, broker_active_set, partial_backup_rate_limiter, wal_advertiser) = {
let mut state = self.state.lock().unwrap();
match state.timelines.get(&ttid) {
Some(GlobalMapTimeline::CreationInProgress) => {
@@ -315,7 +312,10 @@ impl GlobalTimelines {
};
// Do the actual move and reflect the result in the map.
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone()).await {
let wal_advertiser = wal_advertiser.load_timeline(ttid);
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, wal_advertiser, conf.clone())
.await
{
Ok(timeline) => {
let mut timeline_shared_state = timeline.write_shared_state().await;
let mut state = self.state.lock().unwrap();
@@ -354,6 +354,7 @@ impl GlobalTimelines {
async fn install_temp_timeline(
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
wal_advertiser: Arc<wal_advertiser::advmap::SafekeeperTimeline>,
conf: Arc<SafeKeeperConf>,
) -> Result<Arc<Timeline>> {
let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
@@ -396,7 +397,7 @@ impl GlobalTimelines {
// Do the move.
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
Timeline::load_timeline(conf, ttid)
Timeline::load_timeline(conf, ttid, wal_advertiser)
}
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
@@ -584,25 +585,8 @@ impl GlobalTimelines {
Ok(deleted)
}
pub async fn update_tenant_locations(
&self,
tenant_id: &TenantId,
locations: Vec<TenantShardPageserverLocation>,
) -> anyhow::Result<()> {
let tenant = {
let mut state = self.state.lock().unwrap();
state.tenants.get(tenant_id).context("tenant not found")?
};
tenant.update_locations(locations).await?
}
pub fn get_tenant_locations(
&self,
tenant_id: &TenantId,
) -> anyhow::Result<Vec<TenantShardPageserverLocation>> {
let state = self.state.lock().unwrap();
let tenant = state.tenants.get(tenant_id).context("tenant not found")?;
Ok(tenant.get_locations())
pub fn get_wal_advertiser(&self) -> Arc<wal_advertiser::advmap::World> {
self.state.lock().unwrap().wal_advertisement.clone()
}
pub fn housekeeping(&self, tombstone_ttl: &Duration) {

View File

@@ -2,75 +2,19 @@
use std::{collections::HashSet, sync::Arc, time::Duration};
use desim::world::Node;
use crate::{GlobalTimelines, SafeKeeperConf};
mod advmap;
pub struct GlobalState {
pub resolver: NodeIdToHostportMap,
}
#[derive(Default)]
pub struct NodeIdToHostportMap {
inner: Arc<Mutex<HashMap<NodeId, String>>>,
}
impl NodeIdToHostportMap {
pub fn learn(&self, ) {
let mut inner = self.inner.lock().unwrap();
inner.insert(node_id, hostport);
}
}
pub(crate) mod advmap;
pub(crate) async fn wal_advertiser_loop(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
let active_timelines_set = global_timelines.get_global_broker_active_set();
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
let outbound = async_stream::stream! {
loop {
// Note: we lock runtime here and in timeline methods as GlobalTimelines
// is under plain mutex. That's ok, all this code is not performance
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
let now = Instant::now();
let all_tlis = active_timelines_set.get_all();
let mut n_pushed_tlis = 0;
for tli in &all_tlis {
let sk_info = tli.get_safekeeper_info(&conf).await;
yield sk_info;
BROKER_PUSHED_UPDATES.inc();
n_pushed_tlis += 1;
}
let elapsed = now.elapsed();
BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64);
if elapsed > push_interval / 2 {
info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed);
}
sleep(push_interval).await;
}
};
client
.publish_safekeeper_info(Request::new(outbound))
.await?;
todo!();
node_loop().await;
Ok(())
}
pub(crate) async fn node_loop() {
loop {
}
loop {}
}

View File

@@ -1,26 +1,78 @@
//! The data structure that track advertisement state.
use std::sync::{Arc, RwLock};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use serde::Serialize;
use utils::{
generation::Generation,
id::{NodeId, TenantTimelineId, TimelineId},
lsn::Lsn,
shard::{ShardIndex, TenantShardId},
};
#[derive(Default)]
pub struct World {
pageservers: RwLock<HashMap<NodeId, Arc<Pageserver>>>,
}
pub struct Pageserver {
node_id: NodeId,
attachments: RwLock<HashMap<TenantShardId, Arc<Attachment>>>,
attachments: RwLock<HashMap<TenantShardId, Arc<PageserverAttachment>>>,
}
pub struct Attachment {
pub struct PageserverAttachment {
pageserver: NodeId,
tenant_shard_id: TenantShardId,
generation: Generation,
remote_consistent_lsn: RwLock<HashMap<TimelineId, Arc<Timeline>>>,
remote_consistent_lsn: RwLock<HashMap<TimelineId, Arc<PageserverTimeline>>>,
}
pub struct Timeline {
pub struct PageserverTimeline {
pageserver: NodeId,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
generation: Generation,
remote_consistent_lsn: RwLock<Lsn>,
}
pub struct SafekeeperTimeline {}
pub struct UpdatePageserverAttachmentsArg {
pub generation: Generation,
pub pageserver_node_id: NodeId,
}
impl World {
pub fn housekeeping(&self) {}
pub fn load_timeline(&self, ttid: TenantTimelineId) -> Arc<SafekeeperTimeline> {
todo!()
}
pub fn update_pageserver_attachments(
&self,
tenant_shard_id: TenantShardId,
arg: Vec<UpdatePageserverAttachmentsArg>,
) -> anyhow::Result<()> {
todo!()
}
}
impl SafekeeperTimeline {
pub fn get_pageserver_timeline(
&self,
ttld: TenantTimelineId,
shard: ShardIndex,
pageserver_generation: Generation,
) -> Arc<PageserverTimeline> {
assert!(!pageserver_generation.is_none());
todo!()
}
}
impl PageserverTimeline {
pub fn update_remote_consistent_lsn(&self, lsn: Lsn) -> anyhow::Result<()> {
todo!()
}
}

View File

@@ -18,7 +18,7 @@ use utils::measured_stream::MeasuredStream;
use crate::handler::SafekeeperPostgresHandler;
use crate::metrics::TrafficMetrics;
use crate::{GlobalTimelines, SafeKeeperConf};
use crate::{GlobalTimelines, SafeKeeperConf, wal_advertiser};
/// Accept incoming TCP connections and spawn them into a background thread.
///
@@ -51,7 +51,7 @@ pub async fn task_main(
error!("connection handler exited: {}", err);
}
}
.instrument(info_span!("", cid = %conn_id, ttid = field::Empty, application_name = field::Empty, shard = field::Empty)),
.instrument(info_span!("", cid = %conn_id, ttid = field::Empty, application_name = field::Empty, shard = field::Empty, pageserver_generation = field::Empty)),
);
}
}