move pull_timeline to safekeeper_api and add SafekeeperGeneration (#10863)

Preparations for a successor of #10440: 

* move `pull_timeline` to `safekeeper_api` and add it to
`SafekeeperClient`. we want to do `pull_timeline` on any creations that
we couldn't do initially.
* Add a `SafekeeperGeneration` type instead of relying on a type alias.
we want to maintain a safekeeper specific generation number now in the
storcon database. A separate type is important to make it impossible to
mix it up with the tenant's pageserver specific generation number. We
absolutely want to avoid that for correctness reasons. If someone mixes
up a safekeeper and pageserver id (both use the `NodeId` type), that's
bad but there is no wrong generations flying around.

part of #9011
This commit is contained in:
Arpad Müller
2025-02-18 15:02:22 +01:00
committed by GitHub
parent d36baae758
commit caece02da7
9 changed files with 137 additions and 35 deletions

View File

@@ -9,13 +9,43 @@ use anyhow::bail;
use serde::{Deserialize, Serialize};
use utils::id::NodeId;
/// Number uniquely identifying safekeeper configuration.
/// Note: it is a part of sk control file.
pub type Generation = u32;
/// 1 is the first valid generation, 0 is used as
/// a placeholder before we fully migrate to generations.
pub const INVALID_GENERATION: Generation = 0;
pub const INITIAL_GENERATION: Generation = 1;
pub const INVALID_GENERATION: SafekeeperGeneration = SafekeeperGeneration::new(0);
pub const INITIAL_GENERATION: SafekeeperGeneration = SafekeeperGeneration::new(1);
/// Number uniquely identifying safekeeper configuration.
/// Note: it is a part of sk control file.
///
/// Like tenant generations, but for safekeepers.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct SafekeeperGeneration(u32);
impl SafekeeperGeneration {
pub const fn new(v: u32) -> Self {
Self(v)
}
#[track_caller]
pub fn previous(&self) -> Option<Self> {
Some(Self(self.0.checked_sub(1)?))
}
#[track_caller]
pub fn next(&self) -> Self {
Self(self.0 + 1)
}
pub fn into_inner(self) -> u32 {
self.0
}
}
impl Display for SafekeeperGeneration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
/// Membership is defined by ids so e.g. walproposer uses them to figure out
/// quorums, but we also carry host and port to give wp idea where to connect.
@@ -89,7 +119,7 @@ impl Display for MemberSet {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Configuration {
/// Unique id.
pub generation: Generation,
pub generation: SafekeeperGeneration,
/// Current members of the configuration.
pub members: MemberSet,
/// Some means it is a joint conf.

View File

@@ -282,3 +282,18 @@ pub struct TimelineTermBumpResponse {
pub struct SafekeeperUtilization {
pub timeline_count: u64,
}
/// pull_timeline request body.
#[derive(Debug, Deserialize, Serialize)]
pub struct PullTimelineRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub http_hosts: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PullTimelineResponse {
// Donor safekeeper host
pub safekeeper_host: String,
// TODO: add more fields?
}

View File

@@ -286,6 +286,11 @@ mod tests {
const SHORT2_ENC_LE: &[u8] = &[8, 0, 0, 3, 7];
const SHORT2_ENC_LE_TRAILING: &[u8] = &[8, 0, 0, 3, 7, 0xff, 0xff, 0xff];
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
struct NewTypeStruct(u32);
const NT1: NewTypeStruct = NewTypeStruct(414243);
const NT1_INNER: u32 = 414243;
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct LongMsg {
pub tag: u8,
@@ -408,4 +413,42 @@ mod tests {
let msg2 = LongMsg::des(&encoded).unwrap();
assert_eq!(msg, msg2);
}
#[test]
/// Ensure that newtype wrappers around u32 don't change the serialization format
fn be_nt() {
use super::BeSer;
assert_eq!(NT1.serialized_size().unwrap(), 4);
let msg = NT1;
let encoded = msg.ser().unwrap();
let expected = hex_literal::hex!("0006 5223");
assert_eq!(encoded, expected);
assert_eq!(encoded, NT1_INNER.ser().unwrap());
let msg2 = NewTypeStruct::des(&encoded).unwrap();
assert_eq!(msg, msg2);
}
#[test]
/// Ensure that newtype wrappers around u32 don't change the serialization format
fn le_nt() {
use super::LeSer;
assert_eq!(NT1.serialized_size().unwrap(), 4);
let msg = NT1;
let encoded = msg.ser().unwrap();
let expected = hex_literal::hex!("2352 0600");
assert_eq!(encoded, expected);
assert_eq!(encoded, NT1_INNER.ser().unwrap());
let msg2 = NewTypeStruct::des(&encoded).unwrap();
assert_eq!(msg, msg2);
}
}

View File

@@ -5,7 +5,10 @@
use http_utils::error::HttpErrorBody;
use reqwest::{IntoUrl, Method, StatusCode};
use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus};
use safekeeper_api::models::{
PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest,
TimelineStatus,
};
use std::error::Error as _;
use utils::{
id::{NodeId, TenantId, TimelineId},
@@ -88,6 +91,12 @@ impl Client {
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn pull_timeline(&self, req: &PullTimelineRequest) -> Result<PullTimelineResponse> {
let uri = format!("{}/v1/pull_timeline", self.mgmt_api_endpoint);
let resp = self.post(&uri, req).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
pub async fn delete_timeline(
&self,
tenant_id: TenantId,

View File

@@ -235,7 +235,7 @@ impl Storage for FileStorage {
#[cfg(test)]
mod test {
use super::*;
use safekeeper_api::membership::{Configuration, MemberSet};
use safekeeper_api::membership::{Configuration, MemberSet, SafekeeperGeneration};
use tokio::fs;
use utils::lsn::Lsn;
@@ -246,7 +246,7 @@ mod test {
let tempdir = camino_tempfile::tempdir()?;
let mut state = TimelinePersistentState::empty();
state.mconf = Configuration {
generation: 42,
generation: SafekeeperGeneration::new(42),
members: MemberSet::empty(),
new_members: None,
};

View File

@@ -2,6 +2,7 @@ use http_utils::failpoints::failpoints_handler;
use hyper::{Body, Request, Response, StatusCode};
use safekeeper_api::models;
use safekeeper_api::models::AcceptorStateStatus;
use safekeeper_api::models::PullTimelineRequest;
use safekeeper_api::models::SafekeeperStatus;
use safekeeper_api::models::TermSwitchApiEntry;
use safekeeper_api::models::TimelineStatus;
@@ -230,7 +231,7 @@ async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<
async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let data: pull_timeline::Request = json_request(&mut request).await?;
let data: PullTimelineRequest = json_request(&mut request).await?;
let conf = get_conf(&request);
let global_timelines = get_global_timelines(&request);

View File

@@ -4,10 +4,13 @@ use camino::Utf8PathBuf;
use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt, TryStreamExt};
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
use safekeeper_api::{models::TimelineStatus, Term};
use safekeeper_api::{
models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus},
Term,
};
use safekeeper_client::mgmt_api;
use safekeeper_client::mgmt_api::Client;
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use std::{
cmp::min,
io::{self, ErrorKind},
@@ -33,7 +36,7 @@ use crate::{
};
use utils::{
crashsafe::fsync_async_opt,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
id::{NodeId, TenantTimelineId},
logging::SecretString,
lsn::Lsn,
pausable_failpoint,
@@ -378,21 +381,6 @@ impl WalResidentTimeline {
}
}
/// pull_timeline request body.
#[derive(Debug, Deserialize)]
pub struct Request {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub http_hosts: Vec<String>,
}
#[derive(Debug, Serialize)]
pub struct Response {
// Donor safekeeper host
pub safekeeper_host: String,
// TODO: add more fields?
}
/// Response for debug dump request.
#[derive(Debug, Deserialize)]
pub struct DebugDumpResponse {
@@ -405,10 +393,10 @@ pub struct DebugDumpResponse {
/// Find the most advanced safekeeper and pull timeline from it.
pub async fn handle_request(
request: Request,
request: PullTimelineRequest,
sk_auth_token: Option<SecretString>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<Response> {
) -> Result<PullTimelineResponse> {
let existing_tli = global_timelines.get(TenantTimelineId::new(
request.tenant_id,
request.timeline_id,
@@ -460,7 +448,7 @@ async fn pull_timeline(
host: String,
sk_auth_token: Option<SecretString>,
global_timelines: Arc<GlobalTimelines>,
) -> Result<Response> {
) -> Result<PullTimelineResponse> {
let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id);
info!(
"pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}",
@@ -535,7 +523,7 @@ async fn pull_timeline(
.load_temp_timeline(ttid, &tli_dir_path, false)
.await?;
Ok(Response {
Ok(PullTimelineResponse {
safekeeper_host: host,
})
}

View File

@@ -1004,7 +1004,7 @@ mod tests {
use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
use safekeeper_api::{
membership::{Configuration, MemberSet, SafekeeperId},
membership::{Configuration, MemberSet, SafekeeperGeneration, SafekeeperId},
ServerInfo,
};
@@ -1303,7 +1303,7 @@ mod tests {
tenant_id,
timeline_id,
mconf: Configuration {
generation: 42,
generation: SafekeeperGeneration::new(42),
members: MemberSet::new(vec![SafekeeperId {
id: NodeId(1),
host: "hehe.org".to_owned(),

View File

@@ -1,5 +1,8 @@
use crate::metrics::PageserverRequestLabelGroup;
use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus};
use safekeeper_api::models::{
PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest,
TimelineStatus,
};
use safekeeper_client::mgmt_api::{Client, Result};
use utils::{
id::{NodeId, TenantId, TimelineId},
@@ -94,6 +97,19 @@ impl SafekeeperClient {
)
}
#[allow(dead_code)]
pub(crate) async fn pull_timeline(
&self,
req: &PullTimelineRequest,
) -> Result<PullTimelineResponse> {
measured_request!(
"pull_timeline",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.pull_timeline(req).await
)
}
pub(crate) async fn get_utilization(&self) -> Result<SafekeeperUtilization> {
measured_request!(
"utilization",