This commit is contained in:
Christian Schwarz
2025-05-05 10:59:45 +02:00
parent 7aa9beaefd
commit 257693e4f2
9 changed files with 268 additions and 7 deletions

View File

@@ -6,9 +6,11 @@ use pageserver_api::shard::ShardIdentity;
use postgres_ffi::TimestampTz;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use utils::generation::Generation;
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
use utils::shard::TenantShardId;
use crate::membership::Configuration;
use crate::{ServerInfo, Term};
@@ -308,3 +310,15 @@ pub struct PullTimelineResponse {
pub safekeeper_host: Option<String>,
// TODO: add more fields?
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PutTenantPageserverLocationRequest {
pub pageserver_locations: Vec<TenantShardPageserverLocation>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct TenantShardPageserverLocation {
pub tenant_shard_id: TenantShardId,
pub generation: Generation,
pub pageserver_node_id: NodeId,
}

View File

@@ -4,6 +4,7 @@ use std::io::Write as _;
use std::str::FromStr;
use std::sync::Arc;
use desim::node_os;
use http_utils::endpoint::{
self, ChannelWriter, auth_middleware, check_permission_with, profile_cpu_handler,
profile_heap_handler, prometheus_metrics_handler, request_span,
@@ -17,9 +18,10 @@ use hyper::{Body, Request, Response, StatusCode};
use pem::Pem;
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::{
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
TimelineStatus, TimelineTermBumpRequest,
AcceptorStateStatus, PullTimelineRequest, PutTenantPageserverLocationRequest, SafekeeperStatus,
SkTimelineInfo, TenantDeleteResult, TenantShardPageserverLocation, TermSwitchApiEntry,
TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult, TimelineStatus,
TimelineTermBumpRequest,
};
use safekeeper_api::{ServerInfo, membership, models};
use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
@@ -31,12 +33,14 @@ use tracing::{Instrument, info_span};
use utils::auth::SwappableJwtAuth;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use crate::debug_dump::TimelineDigestRequest;
use crate::safekeeper::TermLsn;
use crate::timelines_global_map::DeleteOrExclude;
use crate::{
GlobalTimelines, SafeKeeperConf, copy_timeline, debug_dump, patch_control_file, pull_timeline,
wal_advertiser,
};
/// Healthcheck handler.
@@ -91,6 +95,41 @@ async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Bo
json_response(StatusCode::OK, response_body)
}
async fn tenant_put_pageserver_locations_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_id))?;
let PutTenantPageserverLocationRequest {
pageserver_locations,
}: PutTenantPageserverLocationRequest = json_request(&mut request).await?;
let global_timelines = get_global_timelines(&request);
global_timelines
.update_tenant_locations(tenant_id, pageserver_locations)
.await?;
json_response(StatusCode::OK, response_body)
}
async fn tenant_get_pageserver_locations_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_id))?;
let global_timelines = get_global_timelines(&request);
let locations = global_timelines
.get_tenant_locations(tenant_id)
.await?;
json_response(StatusCode::OK, locations)
}
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
@@ -714,6 +753,12 @@ pub fn make_router(
.delete("/v1/tenant/:tenant_id", |r| {
request_span(r, tenant_delete_handler)
})
.put("/v1/tenant/:tenant_shard_id/pageserver_locations", |r| {
request_span(r, tenant_put_pageserver_locations_handler)
})
.get("/v1/tenant/:tenant_shard_id/pageserver_locations", |r| {
request_span(r, tenant_get_pageserver_locations_handler)
})
// Will be used in the future instead of implicit timeline creation
.post("/v1/tenant/timeline", |r| {
request_span(r, timeline_create_handler)

View File

@@ -43,6 +43,8 @@ pub mod wal_backup_partial;
pub mod wal_reader_stream;
pub mod wal_service;
pub mod wal_storage;
pub mod tenant;
pub mod wal_advertiser;
#[cfg(any(test, feature = "benchmarking"))]
pub mod test_utils;

View File

@@ -661,6 +661,7 @@ impl SafekeeperPostgresHandler {
reader,
ws_guard: ws_guard.clone(),
tli,
shard: self.shard.clone(),
};
let res = tokio::select! {
@@ -977,6 +978,7 @@ struct ReplyReader<IO> {
reader: PostgresBackendReader<IO>,
ws_guard: Arc<WalSenderGuard>,
tli: WalResidentTimeline,
shard: Option<ShardId>,
}
impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
@@ -1021,7 +1023,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
.walsenders
.record_ps_feedback(self.ws_guard.id, &ps_feedback);
self.tli
.update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
.update_remote_consistent_lsn(shard, ps_feedback.remote_consistent_lsn)
.await;
// in principle new remote_consistent_lsn could allow to
// deactivate the timeline, but we check that regularly through

42
safekeeper/src/tenant.rs Normal file
View File

@@ -0,0 +1,42 @@
use safekeeper_api::models::TenantShardPageserverLocation;
use utils::shard::TenantShardId;
use walproposer::bindings::Generation;
pub(crate) struct Tenant {
inner: Arc<Mutex<Inner>>,
}
struct Inner {
data: HashMap<TenantShardPageserverLocation, PerLocationState>,
}
struct PerLocationState {
remote_consistent_lsn: Lsn,
last_update_at: Instant,
dirty: bool,
}
impl Tenant {
pub fn load(conf: Arc<SafeKeeperConf>, tenant_id: TenantId) -> anyhow::Result<Arc<Tenant>> {
todo!()
}
pub fn get_locations(&self) -> anyhow::Result<Vec<TenantShardPageserverLocation>> {
todo!()
}
pub fn update_locations(
&self,
locations: Vec<TenantShardPageserverLocation>,
) -> anyhow::Result<()> {
todo!()
}
pub fn update_remote_consistent_lsn(
&self,
tenant_shard_id: TenantShardId,
generation: Generation,
timeline_id: TimelineId,
lsn: Lsn,
) -> anyhow::Result<()> {
todo!()
}
}

View File

@@ -24,6 +24,7 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::{NodeId, TenantId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use utils::sync::gate::Gate;
use crate::metrics::{FullTimelineInfo, MISC_OPERATION_SECONDS, WalStorageMetrics};
@@ -32,6 +33,7 @@ use crate::receive_wal::WalReceivers;
use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn};
use crate::send_wal::{WalSenders, WalSendersTimelineMetricValues};
use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
use crate::tenant::Tenant;
use crate::timeline_guard::ResidenceGuard;
use crate::timeline_manager::{AtomicStatus, ManagerCtl};
use crate::timelines_set::TimelinesSet;
@@ -425,6 +427,7 @@ type RemoteDeletionReceiver = tokio::sync::watch::Receiver<Option<anyhow::Result
/// It also holds SharedState and provides mutually exclusive access to it.
pub struct Timeline {
pub ttid: TenantTimelineId,
pub tenant: Arc<Tenant>,
pub remote_path: RemotePath,
/// Used to broadcast commit_lsn updates to all background jobs.
@@ -475,6 +478,7 @@ impl Timeline {
timeline_dir: &Utf8Path,
remote_path: &RemotePath,
shared_state: SharedState,
tenant: Arc<Tenant>,
conf: Arc<SafeKeeperConf>,
) -> Arc<Self> {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
@@ -489,6 +493,7 @@ impl Timeline {
Arc::new(Self {
ttid,
tenant,
remote_path: remote_path.to_owned(),
timeline_dir: timeline_dir.to_owned(),
commit_lsn_watch_tx,
@@ -515,6 +520,7 @@ impl Timeline {
/// Load existing timeline from disk.
pub fn load_timeline(
conf: Arc<SafeKeeperConf>,
tenant: Arc<Tenant>,
ttid: TenantTimelineId,
) -> Result<Arc<Timeline>> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
@@ -528,6 +534,7 @@ impl Timeline {
&timeline_dir,
&remote_path,
shared_state,
tenant,
conf,
))
}
@@ -1062,12 +1069,19 @@ impl WalResidentTimeline {
}
/// Update in memory remote consistent lsn.
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
pub async fn update_remote_consistent_lsn(&self, shard: Option<ShardId>, candidate: Lsn) {
let mut shared_state = self.write_shared_state().await;
shared_state.sk.state_mut().inmem.remote_consistent_lsn = max(
shared_state.sk.state().inmem.remote_consistent_lsn,
candidate,
);
drop(shared_state);
self.tli.tenant.update_remote_consistent_lsn(
todo!(),
todo!(),
todo!(),
)
}
}

View File

@@ -2,6 +2,7 @@
//! All timelines should always be present in this map, this is done by loading them
//! all from the disk on startup and keeping them in memory.
use std::any;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
@@ -11,18 +12,22 @@ use anyhow::{Context, Result, bail};
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use safekeeper_api::membership::Configuration;
use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult};
use safekeeper_api::models::{
SafekeeperUtilization, TenantShardPageserverLocation, TimelineDeleteResult,
};
use safekeeper_api::{ServerInfo, membership};
use tokio::fs;
use tracing::*;
use utils::crashsafe::{durable_rename, fsync_async_opt};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::shard::TenantShardId;
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
use crate::http::routes::DeleteOrExcludeError;
use crate::rate_limit::RateLimiter;
use crate::state::TimelinePersistentState;
use crate::tenant::Tenant;
use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir};
use crate::timelines_set::TimelinesSet;
use crate::wal_storage::Storage;
@@ -37,6 +42,7 @@ enum GlobalMapTimeline {
}
struct GlobalTimelinesState {
tenants: HashMap<TenantShardId, Arc<Tenant>>,
timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
@@ -87,6 +93,7 @@ impl GlobalTimelines {
pub fn new(conf: Arc<SafeKeeperConf>) -> Self {
Self {
state: Mutex::new(GlobalTimelinesState {
tenants: HashMap::new(),
timelines: HashMap::new(),
tombstones: HashMap::new(),
conf,
@@ -153,6 +160,18 @@ impl GlobalTimelines {
};
let timelines_dir = get_tenant_dir(&conf, &tenant_id);
let tenant = match Tenant::load(conf.clone(), tenant_id) {
Ok(tenant) => {
let mut state = self.state.lock().unwrap();
state.tenants.insert(tenant_id, Arc::clone(&tenant));
tenant
}
Err(e) => {
todo!("should we fail the whole startup process?")
}
};
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
.with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
{
@@ -162,7 +181,7 @@ impl GlobalTimelines {
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(conf.clone(), ttid) {
match Timeline::load_timeline(conf.clone(), tenant, ttid) {
Ok(tli) => {
let mut shared_state = tli.write_shared_state().await;
self.state
@@ -565,6 +584,27 @@ impl GlobalTimelines {
Ok(deleted)
}
pub async fn update_tenant_locations(
&self,
tenant_id: &TenantId,
locations: Vec<TenantShardPageserverLocation>,
) -> anyhow::Result<()> {
let tenant = {
let mut state = self.state.lock().unwrap();
state.tenants.get(tenant_id).context("tenant not found")?
};
tenant.update_locations(locations).await?
}
pub fn get_tenant_locations(
&self,
tenant_id: &TenantId,
) -> anyhow::Result<Vec<TenantShardPageserverLocation>> {
let state = self.state.lock().unwrap();
let tenant = state.tenants.get(tenant_id).context("tenant not found")?;
Ok(tenant.get_locations())
}
pub fn housekeeping(&self, tombstone_ttl: &Duration) {
let mut state = self.state.lock().unwrap();

View File

@@ -0,0 +1,76 @@
//! Advertise pending WAL to all pageservers that might be interested in it.
use std::{collections::HashSet, sync::Arc, time::Duration};
use desim::world::Node;
use crate::{GlobalTimelines, SafeKeeperConf};
mod advmap;
pub struct GlobalState {
pub resolver: NodeIdToHostportMap,
}
#[derive(Default)]
pub struct NodeIdToHostportMap {
inner: Arc<Mutex<HashMap<NodeId, String>>>,
}
impl NodeIdToHostportMap {
pub fn learn(&self, ) {
let mut inner = self.inner.lock().unwrap();
inner.insert(node_id, hostport);
}
}
pub(crate) async fn wal_advertiser_loop(
conf: Arc<SafeKeeperConf>,
global_timelines: Arc<GlobalTimelines>,
) -> anyhow::Result<()> {
let active_timelines_set = global_timelines.get_global_broker_active_set();
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
let outbound = async_stream::stream! {
loop {
// Note: we lock runtime here and in timeline methods as GlobalTimelines
// is under plain mutex. That's ok, all this code is not performance
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
let now = Instant::now();
let all_tlis = active_timelines_set.get_all();
let mut n_pushed_tlis = 0;
for tli in &all_tlis {
let sk_info = tli.get_safekeeper_info(&conf).await;
yield sk_info;
BROKER_PUSHED_UPDATES.inc();
n_pushed_tlis += 1;
}
let elapsed = now.elapsed();
BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64());
BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64);
if elapsed > push_interval / 2 {
info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed);
}
sleep(push_interval).await;
}
};
client
.publish_safekeeper_info(Request::new(outbound))
.await?;
Ok(())
}
pub(crate) async fn node_loop() {
loop {
}
}

View File

@@ -0,0 +1,26 @@
//! The data structure that track advertisement state.
use std::sync::{Arc, RwLock};
pub struct World {
pageservers: RwLock<HashMap<NodeId, Arc<Pageserver>>>,
}
pub struct Pageserver {
node_id: NodeId,
attachments: RwLock<HashMap<TenantShardId, Arc<Attachment>>>,
}
pub struct Attachment {
pageserver: NodeId,
tenant_shard_id: TenantShardId,
generation: Generation,
remote_consistent_lsn: RwLock<HashMap<TimelineId, Arc<Timeline>>>,
}
pub struct Timeline {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
remote_consistent_lsn: RwLock<Lsn>,
}