Extract public sk types to safekeeper_api (#10137)

## Problem

We want to extract safekeeper http client to separate crate for use in
storage controller and neon_local. However, many types used in the API
are internal to safekeeper.

## Summary of changes

Move them to safekeeper_api crate. No functional changes.

ref https://github.com/neondatabase/neon/issues/9011
This commit is contained in:
Arseny Sher
2024-12-13 17:06:27 +03:00
committed by GitHub
parent 7dc382601c
commit ce8eb089f3
24 changed files with 264 additions and 245 deletions

3
Cargo.lock generated
View File

@@ -5565,7 +5565,10 @@ name = "safekeeper_api"
version = "0.1.0"
dependencies = [
"const_format",
"postgres_ffi",
"pq_proto",
"serde",
"tokio",
"utils",
]

View File

@@ -5,6 +5,9 @@ edition.workspace = true
license.workspace = true
[dependencies]
serde.workspace = true
const_format.workspace = true
serde.workspace = true
postgres_ffi.workspace = true
pq_proto.workspace = true
tokio.workspace = true
utils.workspace = true

View File

@@ -1,10 +1,27 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use const_format::formatcp;
use pq_proto::SystemId;
use serde::{Deserialize, Serialize};
/// Public API types
pub mod models;
/// Consensus logical timestamp. Note: it is a part of sk control file.
pub type Term = u64;
pub const INVALID_TERM: Term = 0;
/// Information about Postgres. Safekeeper gets it once and then verifies all
/// further connections from computes match. Note: it is a part of sk control
/// file.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ServerInfo {
/// Postgres server version
pub pg_version: u32,
pub system_id: SystemId,
pub wal_seg_size: u32,
}
pub const DEFAULT_PG_LISTEN_PORT: u16 = 5454;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");

View File

@@ -1,10 +1,23 @@
//! Types used in safekeeper http API. Many of them are also reused internally.
use postgres_ffi::TimestampTz;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use tokio::time::Instant;
use utils::{
id::{NodeId, TenantId, TimelineId},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
pageserver_feedback::PageserverFeedback,
};
use crate::{ServerInfo, Term};
#[derive(Debug, Serialize)]
pub struct SafekeeperStatus {
pub id: NodeId,
}
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
pub tenant_id: TenantId,
@@ -18,6 +31,161 @@ pub struct TimelineCreateRequest {
pub local_start_lsn: Option<Lsn>,
}
/// Same as TermLsn, but serializes LSN using display serializer
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct TermSwitchApiEntry {
pub term: Term,
pub lsn: Lsn,
}
/// Augment AcceptorState with last_log_term for convenience
#[derive(Debug, Serialize, Deserialize)]
pub struct AcceptorStateStatus {
pub term: Term,
pub epoch: Term, // aka last_log_term, old `epoch` name is left for compatibility
pub term_history: Vec<TermSwitchApiEntry>,
}
/// Things safekeeper should know about timeline state on peers.
/// Used as both model and internally.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub sk_id: NodeId,
pub term: Term,
/// Term of the last entry.
pub last_log_term: Term,
/// LSN of the last record.
pub flush_lsn: Lsn,
pub commit_lsn: Lsn,
/// Since which LSN safekeeper has WAL.
pub local_start_lsn: Lsn,
/// When info was received. Serde annotations are not very useful but make
/// the code compile -- we don't rely on this field externally.
#[serde(skip)]
#[serde(default = "Instant::now")]
pub ts: Instant,
pub pg_connstr: String,
pub http_connstr: String,
}
pub type FullTransactionId = u64;
/// Hot standby feedback received from replica
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct HotStandbyFeedback {
pub ts: TimestampTz,
pub xmin: FullTransactionId,
pub catalog_xmin: FullTransactionId,
}
pub const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
impl HotStandbyFeedback {
pub fn empty() -> HotStandbyFeedback {
HotStandbyFeedback {
ts: 0,
xmin: 0,
catalog_xmin: 0,
}
}
}
/// Standby status update
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StandbyReply {
pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
pub reply_requested: bool,
}
impl StandbyReply {
pub fn empty() -> Self {
StandbyReply {
write_lsn: Lsn::INVALID,
flush_lsn: Lsn::INVALID,
apply_lsn: Lsn::INVALID,
reply_ts: 0,
reply_requested: false,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StandbyFeedback {
pub reply: StandbyReply,
pub hs_feedback: HotStandbyFeedback,
}
impl StandbyFeedback {
pub fn empty() -> Self {
StandbyFeedback {
reply: StandbyReply::empty(),
hs_feedback: HotStandbyFeedback::empty(),
}
}
}
/// Receiver is either pageserver or regular standby, which have different
/// feedbacks.
/// Used as both model and internally.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum ReplicationFeedback {
Pageserver(PageserverFeedback),
Standby(StandbyFeedback),
}
/// Uniquely identifies a WAL service connection. Logged in spans for
/// observability.
pub type ConnectionId = u32;
/// Serialize is used only for json'ing in API response. Also used internally.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalSenderState {
pub ttid: TenantTimelineId,
pub addr: SocketAddr,
pub conn_id: ConnectionId,
// postgres application_name
pub appname: Option<String>,
pub feedback: ReplicationFeedback,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalReceiverState {
/// None means it is recovery initiated by us (this safekeeper).
pub conn_id: Option<ConnectionId>,
pub status: WalReceiverStatus,
}
/// Walreceiver status. Currently only whether it passed voting stage and
/// started receiving the stream, but it is easy to add more if needed.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WalReceiverStatus {
Voting,
Streaming,
}
/// Info about timeline on safekeeper ready for reporting.
#[derive(Debug, Serialize, Deserialize)]
pub struct TimelineStatus {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub acceptor_state: AcceptorStateStatus,
pub pg_info: ServerInfo,
pub flush_lsn: Lsn,
pub timeline_start_lsn: Lsn,
pub local_start_lsn: Lsn,
pub commit_lsn: Lsn,
pub backup_lsn: Lsn,
pub peer_horizon_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
pub peers: Vec<PeerInfo>,
pub walsenders: Vec<WalSenderState>,
pub walreceivers: Vec<WalReceiverState>,
}
fn lsn_invalid() -> Lsn {
Lsn::INVALID
}

View File

@@ -1,11 +1,12 @@
//! Code to deal with safekeeper control file upgrades
use crate::{
safekeeper::{AcceptorState, PgUuid, ServerInfo, Term, TermHistory, TermLsn},
safekeeper::{AcceptorState, PgUuid, TermHistory, TermLsn},
state::{EvictionState, PersistedPeers, TimelinePersistentState},
wal_backup_partial,
};
use anyhow::{bail, Result};
use pq_proto::SystemId;
use safekeeper_api::{ServerInfo, Term};
use serde::{Deserialize, Serialize};
use tracing::*;
use utils::{

View File

@@ -14,6 +14,7 @@ use camino::Utf8PathBuf;
use chrono::{DateTime, Utc};
use postgres_ffi::XLogSegNo;
use postgres_ffi::MAX_SEND_SIZE;
use safekeeper_api::models::WalSenderState;
use serde::Deserialize;
use serde::Serialize;
@@ -25,7 +26,6 @@ use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::safekeeper::TermHistory;
use crate::send_wal::WalSenderState;
use crate::state::TimelineMemState;
use crate::state::TimelinePersistentState;
use crate::timeline::get_timeline_dir;

View File

@@ -4,6 +4,8 @@
use anyhow::Context;
use pageserver_api::models::ShardParameters;
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
use safekeeper_api::models::ConnectionId;
use safekeeper_api::Term;
use std::future::Future;
use std::str::{self, FromStr};
use std::sync::Arc;
@@ -16,9 +18,7 @@ use crate::auth::check_permission;
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
use crate::metrics::{TrafficMetrics, PG_QUERIES_GAUGE};
use crate::safekeeper::Term;
use crate::timeline::TimelineError;
use crate::wal_service::ConnectionId;
use crate::{GlobalTimelines, SafeKeeperConf};
use postgres_backend::PostgresBackend;
use postgres_backend::QueryError;

View File

@@ -8,6 +8,7 @@
//! etc.
use reqwest::{IntoUrl, Method, StatusCode};
use safekeeper_api::models::TimelineStatus;
use std::error::Error as _;
use utils::{
http::error::HttpErrorBody,
@@ -15,8 +16,6 @@ use utils::{
logging::SecretString,
};
use super::routes::TimelineStatus;
#[derive(Debug, Clone)]
pub struct Client {
mgmt_api_endpoint: String,

View File

@@ -1,5 +1,9 @@
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use safekeeper_api::models::AcceptorStateStatus;
use safekeeper_api::models::SafekeeperStatus;
use safekeeper_api::models::TermSwitchApiEntry;
use safekeeper_api::models::TimelineStatus;
use safekeeper_api::ServerInfo;
use std::collections::HashMap;
use std::fmt;
use std::io::Write as _;
@@ -31,26 +35,17 @@ use utils::{
request::{ensure_no_body, parse_request_param},
RequestExt, RouterBuilder,
},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
id::{TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
};
use crate::debug_dump::TimelineDigestRequest;
use crate::receive_wal::WalReceiverState;
use crate::safekeeper::Term;
use crate::safekeeper::{ServerInfo, TermLsn};
use crate::send_wal::WalSenderState;
use crate::timeline::PeerInfo;
use crate::safekeeper::TermLsn;
use crate::timelines_global_map::TimelineDeleteForceResult;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline};
#[derive(Debug, Serialize)]
struct SafekeeperStatus {
id: NodeId,
}
/// Healthcheck handler.
async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
@@ -73,50 +68,6 @@ fn get_global_timelines(request: &Request<Body>) -> Arc<GlobalTimelines> {
.clone()
}
/// Same as TermLsn, but serializes LSN using display serializer
/// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct TermSwitchApiEntry {
pub term: Term,
pub lsn: Lsn,
}
impl From<TermSwitchApiEntry> for TermLsn {
fn from(api_val: TermSwitchApiEntry) -> Self {
TermLsn {
term: api_val.term,
lsn: api_val.lsn,
}
}
}
/// Augment AcceptorState with last_log_term for convenience
#[derive(Debug, Serialize, Deserialize)]
pub struct AcceptorStateStatus {
pub term: Term,
pub epoch: Term, // aka last_log_term
pub term_history: Vec<TermSwitchApiEntry>,
}
/// Info about timeline on safekeeper ready for reporting.
#[derive(Debug, Serialize, Deserialize)]
pub struct TimelineStatus {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub acceptor_state: AcceptorStateStatus,
pub pg_info: ServerInfo,
pub flush_lsn: Lsn,
pub timeline_start_lsn: Lsn,
pub local_start_lsn: Lsn,
pub commit_lsn: Lsn,
pub backup_lsn: Lsn,
pub peer_horizon_lsn: Lsn,
pub remote_consistent_lsn: Lsn,
pub peers: Vec<PeerInfo>,
pub walsenders: Vec<WalSenderState>,
pub walreceivers: Vec<WalReceiverState>,
}
fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
check_permission_with(request, |claims| {
crate::auth::check_permission(claims, tenant_id)
@@ -187,6 +138,15 @@ async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>,
json_response(StatusCode::OK, res)
}
impl From<TermSwitchApiEntry> for TermLsn {
fn from(api_val: TermSwitchApiEntry) -> Self {
TermLsn {
term: api_val.term,
lsn: api_val.lsn,
}
}
}
/// Report info about timeline.
async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(

View File

@@ -8,16 +8,17 @@
use anyhow::Context;
use postgres_backend::QueryError;
use safekeeper_api::{ServerInfo, Term};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::*;
use crate::handler::SafekeeperPostgresHandler;
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
use crate::safekeeper::{
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
};
use crate::safekeeper::{Term, TermHistory, TermLsn};
use crate::safekeeper::{TermHistory, TermLsn};
use crate::state::TimelinePersistentState;
use crate::timeline::WalResidentTimeline;
use postgres_backend::PostgresBackend;

View File

@@ -4,6 +4,7 @@ use camino::Utf8PathBuf;
use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt, TryStreamExt};
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
use safekeeper_api::{models::TimelineStatus, Term};
use serde::{Deserialize, Serialize};
use std::{
cmp::min,
@@ -21,11 +22,7 @@ use tracing::{error, info, instrument};
use crate::{
control_file::CONTROL_FILE_NAME,
debug_dump,
http::{
client::{self, Client},
routes::TimelineStatus,
},
safekeeper::Term,
http::client::{self, Client},
state::{EvictionState, TimelinePersistentState},
timeline::{Timeline, WalResidentTimeline},
timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline},

View File

@@ -9,9 +9,7 @@ use crate::metrics::{
};
use crate::safekeeper::AcceptorProposerMessage;
use crate::safekeeper::ProposerAcceptorMessage;
use crate::safekeeper::ServerInfo;
use crate::timeline::WalResidentTimeline;
use crate::wal_service::ConnectionId;
use crate::GlobalTimelines;
use anyhow::{anyhow, Context};
use bytes::BytesMut;
@@ -23,8 +21,8 @@ use postgres_backend::PostgresBackend;
use postgres_backend::PostgresBackendReader;
use postgres_backend::QueryError;
use pq_proto::BeMessage;
use serde::Deserialize;
use serde::Serialize;
use safekeeper_api::models::{ConnectionId, WalReceiverState, WalReceiverStatus};
use safekeeper_api::ServerInfo;
use std::future;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -171,21 +169,6 @@ impl WalReceiversShared {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalReceiverState {
/// None means it is recovery initiated by us (this safekeeper).
pub conn_id: Option<ConnectionId>,
pub status: WalReceiverStatus,
}
/// Walreceiver status. Currently only whether it passed voting stage and
/// started receiving the stream, but it is easy to add more if needed.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WalReceiverStatus {
Voting,
Streaming,
}
/// Scope guard to access slot in WalReceivers registry and unregister from
/// it in Drop.
pub struct WalReceiverGuard {

View File

@@ -7,6 +7,8 @@ use std::{fmt, pin::pin};
use anyhow::{bail, Context};
use futures::StreamExt;
use postgres_protocol::message::backend::ReplicationMessage;
use safekeeper_api::models::{PeerInfo, TimelineStatus};
use safekeeper_api::Term;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::timeout;
use tokio::{
@@ -24,13 +26,11 @@ use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
use crate::safekeeper::{AppendRequest, AppendRequestHeader};
use crate::timeline::WalResidentTimeline;
use crate::{
http::routes::TimelineStatus,
receive_wal::MSG_QUEUE_SIZE,
safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, Term, TermHistory,
TermLsn, VoteRequest,
AcceptorProposerMessage, ProposerAcceptorMessage, ProposerElected, TermHistory, TermLsn,
VoteRequest,
},
timeline::PeerInfo,
SafeKeeperConf,
};

View File

@@ -5,6 +5,9 @@ use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
use safekeeper_api::models::HotStandbyFeedback;
use safekeeper_api::Term;
use safekeeper_api::INVALID_TERM;
use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::cmp::min;
@@ -16,7 +19,6 @@ use tracing::*;
use crate::control_file;
use crate::metrics::MISC_OPERATION_SECONDS;
use crate::send_wal::HotStandbyFeedback;
use crate::state::TimelineState;
use crate::wal_storage;
@@ -31,10 +33,6 @@ use utils::{
const SK_PROTOCOL_VERSION: u32 = 2;
pub const UNKNOWN_SERVER_VERSION: u32 = 0;
/// Consensus logical timestamp.
pub type Term = u64;
pub const INVALID_TERM: Term = 0;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct TermLsn {
pub term: Term,
@@ -198,16 +196,6 @@ impl AcceptorState {
}
}
/// Information about Postgres. Safekeeper gets it once and then verifies
/// all further connections from computes match.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ServerInfo {
/// Postgres server version
pub pg_version: u32,
pub system_id: SystemId,
pub wal_seg_size: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistedPeerInfo {
/// LSN up to which safekeeper offloaded WAL to s3.
@@ -1041,6 +1029,7 @@ where
mod tests {
use futures::future::BoxFuture;
use postgres_ffi::{XLogSegNo, WAL_SEGMENT_SIZE};
use safekeeper_api::ServerInfo;
use super::*;
use crate::state::{EvictionState, PersistedPeers, TimelinePersistentState};

View File

@@ -4,11 +4,10 @@
use crate::handler::SafekeeperPostgresHandler;
use crate::metrics::RECEIVED_PS_FEEDBACKS;
use crate::receive_wal::WalReceivers;
use crate::safekeeper::{Term, TermLsn};
use crate::safekeeper::TermLsn;
use crate::send_interpreted_wal::InterpretedWalSender;
use crate::timeline::WalResidentTimeline;
use crate::wal_reader_stream::WalReaderStreamBuilder;
use crate::wal_service::ConnectionId;
use crate::wal_storage::WalReader;
use anyhow::{bail, Context as AnyhowContext};
use bytes::Bytes;
@@ -19,7 +18,11 @@ use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
use postgres_ffi::get_current_timestamp;
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
use serde::{Deserialize, Serialize};
use safekeeper_api::models::{
ConnectionId, HotStandbyFeedback, ReplicationFeedback, StandbyFeedback, StandbyReply,
WalSenderState, INVALID_FULL_TRANSACTION_ID,
};
use safekeeper_api::Term;
use tokio::io::{AsyncRead, AsyncWrite};
use utils::failpoint_support;
use utils::id::TenantTimelineId;
@@ -28,7 +31,6 @@ use utils::postgres_client::PostgresClientProtocol;
use std::cmp::{max, min};
use std::net::SocketAddr;
use std::str;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch::Receiver;
@@ -42,65 +44,6 @@ const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
// neon extension of replication protocol
const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
type FullTransactionId = u64;
/// Hot standby feedback received from replica
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct HotStandbyFeedback {
pub ts: TimestampTz,
pub xmin: FullTransactionId,
pub catalog_xmin: FullTransactionId,
}
const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
impl HotStandbyFeedback {
pub fn empty() -> HotStandbyFeedback {
HotStandbyFeedback {
ts: 0,
xmin: 0,
catalog_xmin: 0,
}
}
}
/// Standby status update
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StandbyReply {
pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
pub reply_requested: bool,
}
impl StandbyReply {
fn empty() -> Self {
StandbyReply {
write_lsn: Lsn::INVALID,
flush_lsn: Lsn::INVALID,
apply_lsn: Lsn::INVALID,
reply_ts: 0,
reply_requested: false,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StandbyFeedback {
pub reply: StandbyReply,
pub hs_feedback: HotStandbyFeedback,
}
impl StandbyFeedback {
pub fn empty() -> Self {
StandbyFeedback {
reply: StandbyReply::empty(),
hs_feedback: HotStandbyFeedback::empty(),
}
}
}
/// WalSenders registry. Timeline holds it (wrapped in Arc).
pub struct WalSenders {
mutex: Mutex<WalSendersShared>,
@@ -341,25 +284,6 @@ impl WalSendersShared {
}
}
// Serialized is used only for pretty printing in json.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalSenderState {
ttid: TenantTimelineId,
addr: SocketAddr,
conn_id: ConnectionId,
// postgres application_name
appname: Option<String>,
feedback: ReplicationFeedback,
}
// Receiver is either pageserver or regular standby, which have different
// feedbacks.
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
enum ReplicationFeedback {
Pageserver(PageserverFeedback),
Standby(StandbyFeedback),
}
// id of the occupied slot in WalSenders to access it (and save in the
// WalSenderGuard). We could give Arc directly to the slot, but there is not
// much sense in that as values aggregation which is performed on each feedback
@@ -888,6 +812,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
#[cfg(test)]
mod tests {
use safekeeper_api::models::FullTransactionId;
use utils::id::{TenantId, TimelineId};
use super::*;

View File

@@ -5,7 +5,7 @@ use std::{cmp::max, ops::Deref};
use anyhow::{bail, Result};
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::TimelineTermBumpResponse;
use safekeeper_api::{models::TimelineTermBumpResponse, ServerInfo, Term};
use serde::{Deserialize, Serialize};
use utils::{
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -14,10 +14,7 @@ use utils::{
use crate::{
control_file,
safekeeper::{
AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory,
UNKNOWN_SERVER_VERSION,
},
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, TermHistory, UNKNOWN_SERVER_VERSION},
timeline::TimelineError,
wal_backup_partial::{self},
};

View File

@@ -4,8 +4,8 @@
use anyhow::{anyhow, bail, Result};
use camino::{Utf8Path, Utf8PathBuf};
use remote_storage::RemotePath;
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use safekeeper_api::models::{PeerInfo, TimelineTermBumpResponse};
use safekeeper_api::Term;
use tokio::fs::{self};
use tokio_util::sync::CancellationToken;
use utils::id::TenantId;
@@ -31,9 +31,7 @@ use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use crate::control_file;
use crate::rate_limit::RateLimiter;
use crate::receive_wal::WalReceivers;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, Term, TermLsn,
};
use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, TermLsn};
use crate::send_wal::WalSenders;
use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, TimelineState};
use crate::timeline_guard::ResidenceGuard;
@@ -47,40 +45,17 @@ use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
use crate::SafeKeeperConf;
use crate::{debug_dump, timeline_manager, wal_storage};
/// Things safekeeper should know about timeline state on peers.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PeerInfo {
pub sk_id: NodeId,
pub term: Term,
/// Term of the last entry.
pub last_log_term: Term,
/// LSN of the last record.
pub flush_lsn: Lsn,
pub commit_lsn: Lsn,
/// Since which LSN safekeeper has WAL.
pub local_start_lsn: Lsn,
/// When info was received. Serde annotations are not very useful but make
/// the code compile -- we don't rely on this field externally.
#[serde(skip)]
#[serde(default = "Instant::now")]
ts: Instant,
pub pg_connstr: String,
pub http_connstr: String,
}
impl PeerInfo {
fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
PeerInfo {
sk_id: NodeId(sk_info.safekeeper_id),
term: sk_info.term,
last_log_term: sk_info.last_log_term,
flush_lsn: Lsn(sk_info.flush_lsn),
commit_lsn: Lsn(sk_info.commit_lsn),
local_start_lsn: Lsn(sk_info.local_start_lsn),
pg_connstr: sk_info.safekeeper_connstr.clone(),
http_connstr: sk_info.http_connstr.clone(),
ts,
}
fn peer_info_from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
PeerInfo {
sk_id: NodeId(sk_info.safekeeper_id),
term: sk_info.term,
last_log_term: sk_info.last_log_term,
flush_lsn: Lsn(sk_info.flush_lsn),
commit_lsn: Lsn(sk_info.commit_lsn),
local_start_lsn: Lsn(sk_info.local_start_lsn),
pg_connstr: sk_info.safekeeper_connstr.clone(),
http_connstr: sk_info.http_connstr.clone(),
ts,
}
}
@@ -697,7 +672,7 @@ impl Timeline {
{
let mut shared_state = self.write_shared_state().await;
shared_state.sk.record_safekeeper_info(&sk_info).await?;
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
let peer_info = peer_info_from_sk_info(&sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info);
}
Ok(())

View File

@@ -14,6 +14,7 @@ use std::{
use futures::channel::oneshot;
use postgres_ffi::XLogSegNo;
use safekeeper_api::{models::PeerInfo, Term};
use serde::{Deserialize, Serialize};
use tokio::{
task::{JoinError, JoinHandle},
@@ -32,10 +33,9 @@ use crate::{
rate_limit::{rand_duration, RateLimiter},
recovery::recovery_main,
remove_wal::calc_horizon_lsn,
safekeeper::Term,
send_wal::WalSenders,
state::TimelineState,
timeline::{ManagerTimeline, PeerInfo, ReadGuardSharedState, StateSK, WalResidentTimeline},
timeline::{ManagerTimeline, ReadGuardSharedState, StateSK, WalResidentTimeline},
timeline_guard::{AccessService, GuardId, ResidenceGuard},
timelines_set::{TimelineSetGuard, TimelinesSet},
wal_backup::{self, WalBackupTaskHandle},

View File

@@ -4,7 +4,6 @@
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
use crate::rate_limit::RateLimiter;
use crate::safekeeper::ServerInfo;
use crate::state::TimelinePersistentState;
use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
use crate::timelines_set::TimelinesSet;
@@ -13,6 +12,7 @@ use crate::{control_file, wal_storage, SafeKeeperConf};
use anyhow::{bail, Context, Result};
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use safekeeper_api::ServerInfo;
use serde::Serialize;
use std::collections::HashMap;
use std::str::FromStr;

View File

@@ -3,6 +3,7 @@ use anyhow::{Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use safekeeper_api::models::PeerInfo;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use utils::backoff;
@@ -30,7 +31,7 @@ use tracing::*;
use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS, WAL_BACKUP_TASKS};
use crate::timeline::{PeerInfo, WalResidentTimeline};
use crate::timeline::WalResidentTimeline;
use crate::timeline_manager::{Manager, StateSnapshot};
use crate::{SafeKeeperConf, WAL_BACKUP_RUNTIME};

View File

@@ -22,6 +22,7 @@
use camino::Utf8PathBuf;
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
use remote_storage::RemotePath;
use safekeeper_api::Term;
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
@@ -31,7 +32,6 @@ use utils::{id::NodeId, lsn::Lsn};
use crate::{
metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
rate_limit::{rand_duration, RateLimiter},
safekeeper::Term,
timeline::WalResidentTimeline,
timeline_manager::StateSnapshot,
wal_backup::{self},

View File

@@ -4,12 +4,12 @@ use async_stream::try_stream;
use bytes::Bytes;
use futures::Stream;
use postgres_backend::CopyStreamHandlerEnd;
use safekeeper_api::Term;
use std::time::Duration;
use tokio::time::timeout;
use utils::lsn::Lsn;
use crate::{
safekeeper::Term,
send_wal::{EndWatch, WalSenderGuard},
timeline::WalResidentTimeline,
};

View File

@@ -4,6 +4,7 @@
//!
use anyhow::{Context, Result};
use postgres_backend::QueryError;
use safekeeper_api::models::ConnectionId;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
@@ -114,8 +115,6 @@ async fn handle_socket(
.await
}
/// Unique WAL service connection ids are logged in spans for observability.
pub type ConnectionId = u32;
pub type ConnectionCount = u32;
pub fn issue_connection_id(count: &mut ConnectionCount) -> ConnectionId {

View File

@@ -15,12 +15,13 @@ use desim::{
};
use http::Uri;
use safekeeper::{
safekeeper::{ProposerAcceptorMessage, SafeKeeper, ServerInfo, UNKNOWN_SERVER_VERSION},
safekeeper::{ProposerAcceptorMessage, SafeKeeper, UNKNOWN_SERVER_VERSION},
state::{TimelinePersistentState, TimelineState},
timeline::TimelineError,
wal_storage::Storage,
SafeKeeperConf,
};
use safekeeper_api::ServerInfo;
use tracing::{debug, info_span, warn};
use utils::{
id::{NodeId, TenantId, TenantTimelineId, TimelineId},