diff --git a/libs/safekeeper_api/src/membership.rs b/libs/safekeeper_api/src/membership.rs index a39fda526f..8b14a4f290 100644 --- a/libs/safekeeper_api/src/membership.rs +++ b/libs/safekeeper_api/src/membership.rs @@ -9,13 +9,43 @@ use anyhow::bail; use serde::{Deserialize, Serialize}; use utils::id::NodeId; -/// Number uniquely identifying safekeeper configuration. -/// Note: it is a part of sk control file. -pub type Generation = u32; /// 1 is the first valid generation, 0 is used as /// a placeholder before we fully migrate to generations. -pub const INVALID_GENERATION: Generation = 0; -pub const INITIAL_GENERATION: Generation = 1; +pub const INVALID_GENERATION: SafekeeperGeneration = SafekeeperGeneration::new(0); +pub const INITIAL_GENERATION: SafekeeperGeneration = SafekeeperGeneration::new(1); + +/// Number uniquely identifying safekeeper configuration. +/// Note: it is a part of sk control file. +/// +/// Like tenant generations, but for safekeepers. +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +pub struct SafekeeperGeneration(u32); + +impl SafekeeperGeneration { + pub const fn new(v: u32) -> Self { + Self(v) + } + + #[track_caller] + pub fn previous(&self) -> Option { + Some(Self(self.0.checked_sub(1)?)) + } + + #[track_caller] + pub fn next(&self) -> Self { + Self(self.0 + 1) + } + + pub fn into_inner(self) -> u32 { + self.0 + } +} + +impl Display for SafekeeperGeneration { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} /// Membership is defined by ids so e.g. walproposer uses them to figure out /// quorums, but we also carry host and port to give wp idea where to connect. @@ -89,7 +119,7 @@ impl Display for MemberSet { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct Configuration { /// Unique id. - pub generation: Generation, + pub generation: SafekeeperGeneration, /// Current members of the configuration. pub members: MemberSet, /// Some means it is a joint conf. diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 30418b0efd..41ccdaa428 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -282,3 +282,18 @@ pub struct TimelineTermBumpResponse { pub struct SafekeeperUtilization { pub timeline_count: u64, } + +/// pull_timeline request body. +#[derive(Debug, Deserialize, Serialize)] +pub struct PullTimelineRequest { + pub tenant_id: TenantId, + pub timeline_id: TimelineId, + pub http_hosts: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PullTimelineResponse { + // Donor safekeeper host + pub safekeeper_host: String, + // TODO: add more fields? +} diff --git a/libs/utils/src/bin_ser.rs b/libs/utils/src/bin_ser.rs index 42b45eeea0..4d173d0726 100644 --- a/libs/utils/src/bin_ser.rs +++ b/libs/utils/src/bin_ser.rs @@ -286,6 +286,11 @@ mod tests { const SHORT2_ENC_LE: &[u8] = &[8, 0, 0, 3, 7]; const SHORT2_ENC_LE_TRAILING: &[u8] = &[8, 0, 0, 3, 7, 0xff, 0xff, 0xff]; + #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] + struct NewTypeStruct(u32); + const NT1: NewTypeStruct = NewTypeStruct(414243); + const NT1_INNER: u32 = 414243; + #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct LongMsg { pub tag: u8, @@ -408,4 +413,42 @@ mod tests { let msg2 = LongMsg::des(&encoded).unwrap(); assert_eq!(msg, msg2); } + + #[test] + /// Ensure that newtype wrappers around u32 don't change the serialization format + fn be_nt() { + use super::BeSer; + + assert_eq!(NT1.serialized_size().unwrap(), 4); + + let msg = NT1; + + let encoded = msg.ser().unwrap(); + let expected = hex_literal::hex!("0006 5223"); + assert_eq!(encoded, expected); + + assert_eq!(encoded, NT1_INNER.ser().unwrap()); + + let msg2 = NewTypeStruct::des(&encoded).unwrap(); + assert_eq!(msg, msg2); + } + + #[test] + /// Ensure that newtype wrappers around u32 don't change the serialization format + fn le_nt() { + use super::LeSer; + + assert_eq!(NT1.serialized_size().unwrap(), 4); + + let msg = NT1; + + let encoded = msg.ser().unwrap(); + let expected = hex_literal::hex!("2352 0600"); + assert_eq!(encoded, expected); + + assert_eq!(encoded, NT1_INNER.ser().unwrap()); + + let msg2 = NewTypeStruct::des(&encoded).unwrap(); + assert_eq!(msg, msg2); + } } diff --git a/safekeeper/client/src/mgmt_api.rs b/safekeeper/client/src/mgmt_api.rs index d4f47fc96d..40e5afc4aa 100644 --- a/safekeeper/client/src/mgmt_api.rs +++ b/safekeeper/client/src/mgmt_api.rs @@ -5,7 +5,10 @@ use http_utils::error::HttpErrorBody; use reqwest::{IntoUrl, Method, StatusCode}; -use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus}; +use safekeeper_api::models::{ + PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest, + TimelineStatus, +}; use std::error::Error as _; use utils::{ id::{NodeId, TenantId, TimelineId}, @@ -88,6 +91,12 @@ impl Client { resp.json().await.map_err(Error::ReceiveBody) } + pub async fn pull_timeline(&self, req: &PullTimelineRequest) -> Result { + let uri = format!("{}/v1/pull_timeline", self.mgmt_api_endpoint); + let resp = self.post(&uri, req).await?; + resp.json().await.map_err(Error::ReceiveBody) + } + pub async fn delete_timeline( &self, tenant_id: TenantId, diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index e92ca881e1..35aebfd8ad 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -235,7 +235,7 @@ impl Storage for FileStorage { #[cfg(test)] mod test { use super::*; - use safekeeper_api::membership::{Configuration, MemberSet}; + use safekeeper_api::membership::{Configuration, MemberSet, SafekeeperGeneration}; use tokio::fs; use utils::lsn::Lsn; @@ -246,7 +246,7 @@ mod test { let tempdir = camino_tempfile::tempdir()?; let mut state = TimelinePersistentState::empty(); state.mconf = Configuration { - generation: 42, + generation: SafekeeperGeneration::new(42), members: MemberSet::empty(), new_members: None, }; diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 41e30d838a..cd2ac5f44c 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -2,6 +2,7 @@ use http_utils::failpoints::failpoints_handler; use hyper::{Body, Request, Response, StatusCode}; use safekeeper_api::models; use safekeeper_api::models::AcceptorStateStatus; +use safekeeper_api::models::PullTimelineRequest; use safekeeper_api::models::SafekeeperStatus; use safekeeper_api::models::TermSwitchApiEntry; use safekeeper_api::models::TimelineStatus; @@ -230,7 +231,7 @@ async fn timeline_delete_handler(mut request: Request) -> Result) -> Result, ApiError> { check_permission(&request, None)?; - let data: pull_timeline::Request = json_request(&mut request).await?; + let data: PullTimelineRequest = json_request(&mut request).await?; let conf = get_conf(&request); let global_timelines = get_global_timelines(&request); diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index f2d8e4c85f..4827b73074 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -4,10 +4,13 @@ use camino::Utf8PathBuf; use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt, TryStreamExt}; use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; -use safekeeper_api::{models::TimelineStatus, Term}; +use safekeeper_api::{ + models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus}, + Term, +}; use safekeeper_client::mgmt_api; use safekeeper_client::mgmt_api::Client; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use std::{ cmp::min, io::{self, ErrorKind}, @@ -33,7 +36,7 @@ use crate::{ }; use utils::{ crashsafe::fsync_async_opt, - id::{NodeId, TenantId, TenantTimelineId, TimelineId}, + id::{NodeId, TenantTimelineId}, logging::SecretString, lsn::Lsn, pausable_failpoint, @@ -378,21 +381,6 @@ impl WalResidentTimeline { } } -/// pull_timeline request body. -#[derive(Debug, Deserialize)] -pub struct Request { - pub tenant_id: TenantId, - pub timeline_id: TimelineId, - pub http_hosts: Vec, -} - -#[derive(Debug, Serialize)] -pub struct Response { - // Donor safekeeper host - pub safekeeper_host: String, - // TODO: add more fields? -} - /// Response for debug dump request. #[derive(Debug, Deserialize)] pub struct DebugDumpResponse { @@ -405,10 +393,10 @@ pub struct DebugDumpResponse { /// Find the most advanced safekeeper and pull timeline from it. pub async fn handle_request( - request: Request, + request: PullTimelineRequest, sk_auth_token: Option, global_timelines: Arc, -) -> Result { +) -> Result { let existing_tli = global_timelines.get(TenantTimelineId::new( request.tenant_id, request.timeline_id, @@ -460,7 +448,7 @@ async fn pull_timeline( host: String, sk_auth_token: Option, global_timelines: Arc, -) -> Result { +) -> Result { let ttid = TenantTimelineId::new(status.tenant_id, status.timeline_id); info!( "pulling timeline {} from safekeeper {}, commit_lsn={}, flush_lsn={}, term={}, epoch={}", @@ -535,7 +523,7 @@ async fn pull_timeline( .load_temp_timeline(ttid, &tli_dir_path, false) .await?; - Ok(Response { + Ok(PullTimelineResponse { safekeeper_host: host, }) } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 45e19c31b6..f816f8459a 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -1004,7 +1004,7 @@ mod tests { use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE}; use safekeeper_api::{ - membership::{Configuration, MemberSet, SafekeeperId}, + membership::{Configuration, MemberSet, SafekeeperGeneration, SafekeeperId}, ServerInfo, }; @@ -1303,7 +1303,7 @@ mod tests { tenant_id, timeline_id, mconf: Configuration { - generation: 42, + generation: SafekeeperGeneration::new(42), members: MemberSet::new(vec![SafekeeperId { id: NodeId(1), host: "hehe.org".to_owned(), diff --git a/storage_controller/src/safekeeper_client.rs b/storage_controller/src/safekeeper_client.rs index bb494f20fa..f234ab3429 100644 --- a/storage_controller/src/safekeeper_client.rs +++ b/storage_controller/src/safekeeper_client.rs @@ -1,5 +1,8 @@ use crate::metrics::PageserverRequestLabelGroup; -use safekeeper_api::models::{SafekeeperUtilization, TimelineCreateRequest, TimelineStatus}; +use safekeeper_api::models::{ + PullTimelineRequest, PullTimelineResponse, SafekeeperUtilization, TimelineCreateRequest, + TimelineStatus, +}; use safekeeper_client::mgmt_api::{Client, Result}; use utils::{ id::{NodeId, TenantId, TimelineId}, @@ -94,6 +97,19 @@ impl SafekeeperClient { ) } + #[allow(dead_code)] + pub(crate) async fn pull_timeline( + &self, + req: &PullTimelineRequest, + ) -> Result { + measured_request!( + "pull_timeline", + crate::metrics::Method::Post, + &self.node_id_label, + self.inner.pull_timeline(req).await + ) + } + pub(crate) async fn get_utilization(&self) -> Result { measured_request!( "utilization",