mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
Upload partial segments (#6530)
Add support for backing up partial segments to remote storage. Disabled
by default, can be enabled with `--partial-backup-enabled`.
Safekeeper timeline has a background task which is subscribed to
`commit_lsn` and `flush_lsn` updates. After the partial segment was
updated (`flush_lsn` was changed), the segment will be uploaded to S3 in
about 15 minutes.
The filename format for partial segments is
`Segment_Term_Flush_Commit_skNN.partial`, where:
- `Segment` – the segment name, like `000000010000000000000001`
- `Term` – current term
- `Flush` – flush_lsn in hex format `{:016X}`, e.g. `00000000346BC568`
- `Commit` – commit_lsn in the same hex format
- `NN` – safekeeper_id, like `1`
The full object name example:
`000000010000000000000002_2_0000000002534868_0000000002534410_sk1.partial`
Each safekeeper will keep info about remote partial segments in its
control file. Code updates state in the control file before doing any S3
operations. This way control file stores information about all
potentially existing remote partial segments and can clean them up after
uploading a newer version.
Closes #6336
This commit is contained in:
committed by
GitHub
parent
8b10407be4
commit
3f77f26aa2
@@ -565,6 +565,16 @@ impl GenericRemoteStorage {
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct StorageMetadata(HashMap<String, String>);
|
||||
|
||||
impl<const N: usize> From<[(&str, &str); N]> for StorageMetadata {
|
||||
fn from(arr: [(&str, &str); N]) -> Self {
|
||||
let map: HashMap<String, String> = arr
|
||||
.iter()
|
||||
.map(|(k, v)| (k.to_string(), v.to_string()))
|
||||
.collect();
|
||||
Self(map)
|
||||
}
|
||||
}
|
||||
|
||||
/// External backup storage configuration, enough for creating a client for that storage.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RemoteStorageConfig {
|
||||
|
||||
@@ -33,6 +33,7 @@ once_cell.workspace = true
|
||||
parking_lot.workspace = true
|
||||
postgres.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
rand.workspace = true
|
||||
regex.workspace = true
|
||||
scopeguard.workspace = true
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
|
||||
@@ -28,7 +28,7 @@ use utils::pid_file;
|
||||
use metrics::set_build_info_metric;
|
||||
use safekeeper::defaults::{
|
||||
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
DEFAULT_PG_LISTEN_ADDR,
|
||||
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR,
|
||||
};
|
||||
use safekeeper::wal_service;
|
||||
use safekeeper::GlobalTimelines;
|
||||
@@ -170,6 +170,13 @@ struct Args {
|
||||
/// still needed for existing replication connection.
|
||||
#[arg(long)]
|
||||
walsenders_keep_horizon: bool,
|
||||
/// Enable partial backup. If disabled, safekeeper will not upload partial
|
||||
/// segments to remote storage.
|
||||
#[arg(long)]
|
||||
partial_backup_enabled: bool,
|
||||
/// Controls how long backup will wait until uploading the partial segment.
|
||||
#[arg(long, value_parser = humantime::parse_duration, default_value = DEFAULT_PARTIAL_BACKUP_TIMEOUT, verbatim_doc_comment)]
|
||||
partial_backup_timeout: Duration,
|
||||
}
|
||||
|
||||
// Like PathBufValueParser, but allows empty string.
|
||||
@@ -300,6 +307,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
http_auth,
|
||||
current_thread_runtime: args.current_thread_runtime,
|
||||
walsenders_keep_horizon: args.walsenders_keep_horizon,
|
||||
partial_backup_enabled: args.partial_backup_enabled,
|
||||
partial_backup_timeout: args.partial_backup_timeout,
|
||||
};
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
@@ -365,6 +374,8 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
|
||||
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
|
||||
|
||||
wal_backup::init_remote_storage(&conf);
|
||||
|
||||
// Keep handles to main tasks to die if any of them disappears.
|
||||
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
|
||||
FuturesUnordered::new();
|
||||
|
||||
@@ -20,7 +20,7 @@ use utils::{bin_ser::LeSer, id::TenantTimelineId};
|
||||
use crate::SafeKeeperConf;
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 7;
|
||||
pub const SK_FORMAT_VERSION: u32 = 8;
|
||||
|
||||
// contains persistent metadata for safekeeper
|
||||
const CONTROL_FILE_NAME: &str = "safekeeper.control";
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
use crate::{
|
||||
safekeeper::{AcceptorState, PgUuid, ServerInfo, Term, TermHistory, TermLsn},
|
||||
state::{PersistedPeers, TimelinePersistentState},
|
||||
wal_backup_partial,
|
||||
};
|
||||
use anyhow::{bail, Result};
|
||||
use pq_proto::SystemId;
|
||||
@@ -138,6 +139,50 @@ pub struct SafeKeeperStateV4 {
|
||||
pub peers: PersistedPeers,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct SafeKeeperStateV7 {
|
||||
#[serde(with = "hex")]
|
||||
pub tenant_id: TenantId,
|
||||
#[serde(with = "hex")]
|
||||
pub timeline_id: TimelineId,
|
||||
/// persistent acceptor state
|
||||
pub acceptor_state: AcceptorState,
|
||||
/// information about server
|
||||
pub server: ServerInfo,
|
||||
/// Unique id of the last *elected* proposer we dealt with. Not needed
|
||||
/// for correctness, exists for monitoring purposes.
|
||||
#[serde(with = "hex")]
|
||||
pub proposer_uuid: PgUuid,
|
||||
/// Since which LSN this timeline generally starts. Safekeeper might have
|
||||
/// joined later.
|
||||
pub timeline_start_lsn: Lsn,
|
||||
/// Since which LSN safekeeper has (had) WAL for this timeline.
|
||||
/// All WAL segments next to one containing local_start_lsn are
|
||||
/// filled with data from the beginning.
|
||||
pub local_start_lsn: Lsn,
|
||||
/// Part of WAL acknowledged by quorum *and available locally*. Always points
|
||||
/// to record boundary.
|
||||
pub commit_lsn: Lsn,
|
||||
/// LSN that points to the end of the last backed up segment. Useful to
|
||||
/// persist to avoid finding out offloading progress on boot.
|
||||
pub backup_lsn: Lsn,
|
||||
/// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
|
||||
/// of last record streamed to everyone). Persisting it helps skipping
|
||||
/// recovery in walproposer, generally we compute it from peers. In
|
||||
/// walproposer proto called 'truncate_lsn'. Updates are currently drived
|
||||
/// only by walproposer.
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
/// LSN of the oldest known checkpoint made by pageserver and successfully
|
||||
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
|
||||
/// informational purposes, we receive it from pageserver (or broker).
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
// Peers and their state as we remember it. Knowing peers themselves is
|
||||
// fundamental; but state is saved here only for informational purposes and
|
||||
// obviously can be stale. (Currently not saved at all, but let's provision
|
||||
// place to have less file version upgrades).
|
||||
pub peers: PersistedPeers,
|
||||
}
|
||||
|
||||
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersistentState> {
|
||||
// migrate to storing full term history
|
||||
if version == 1 {
|
||||
@@ -167,6 +212,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
peer_horizon_lsn: oldstate.truncate_lsn,
|
||||
remote_consistent_lsn: Lsn(0),
|
||||
peers: PersistedPeers(vec![]),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
});
|
||||
// migrate to hexing some ids
|
||||
} else if version == 2 {
|
||||
@@ -190,6 +236,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
peer_horizon_lsn: oldstate.truncate_lsn,
|
||||
remote_consistent_lsn: Lsn(0),
|
||||
peers: PersistedPeers(vec![]),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
});
|
||||
// migrate to moving tenant_id/timeline_id to the top and adding some lsns
|
||||
} else if version == 3 {
|
||||
@@ -213,6 +260,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
peer_horizon_lsn: oldstate.truncate_lsn,
|
||||
remote_consistent_lsn: Lsn(0),
|
||||
peers: PersistedPeers(vec![]),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
});
|
||||
// migrate to having timeline_start_lsn
|
||||
} else if version == 4 {
|
||||
@@ -236,6 +284,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
peer_horizon_lsn: oldstate.peer_horizon_lsn,
|
||||
remote_consistent_lsn: Lsn(0),
|
||||
peers: PersistedPeers(vec![]),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
});
|
||||
} else if version == 5 {
|
||||
info!("reading safekeeper control file version {}", version);
|
||||
@@ -262,7 +311,30 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
oldstate.server.pg_version = 140005;
|
||||
|
||||
return Ok(oldstate);
|
||||
} else if version == 7 {
|
||||
info!("reading safekeeper control file version {}", version);
|
||||
let oldstate = SafeKeeperStateV7::des(&buf[..buf.len()])?;
|
||||
|
||||
return Ok(TimelinePersistentState {
|
||||
tenant_id: oldstate.tenant_id,
|
||||
timeline_id: oldstate.timeline_id,
|
||||
acceptor_state: oldstate.acceptor_state,
|
||||
server: oldstate.server,
|
||||
proposer_uuid: oldstate.proposer_uuid,
|
||||
timeline_start_lsn: oldstate.timeline_start_lsn,
|
||||
local_start_lsn: oldstate.local_start_lsn,
|
||||
commit_lsn: oldstate.commit_lsn,
|
||||
backup_lsn: oldstate.backup_lsn,
|
||||
peer_horizon_lsn: oldstate.peer_horizon_lsn,
|
||||
remote_consistent_lsn: oldstate.remote_consistent_lsn,
|
||||
peers: oldstate.peers,
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: persist the file back to the disk after upgrade
|
||||
// TODO: think about backward compatibility and rollbacks
|
||||
|
||||
bail!("unsupported safekeeper control file version {}", version)
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ pub mod send_wal;
|
||||
pub mod state;
|
||||
pub mod timeline;
|
||||
pub mod wal_backup;
|
||||
pub mod wal_backup_partial;
|
||||
pub mod wal_service;
|
||||
pub mod wal_storage;
|
||||
|
||||
@@ -48,6 +49,7 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
|
||||
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
|
||||
pub const DEFAULT_PARTIAL_BACKUP_TIMEOUT: &str = "15m";
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -79,6 +81,8 @@ pub struct SafeKeeperConf {
|
||||
pub http_auth: Option<Arc<SwappableJwtAuth>>,
|
||||
pub current_thread_runtime: bool,
|
||||
pub walsenders_keep_horizon: bool,
|
||||
pub partial_backup_enabled: bool,
|
||||
pub partial_backup_timeout: Duration,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -123,6 +127,8 @@ impl SafeKeeperConf {
|
||||
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
current_thread_runtime: false,
|
||||
walsenders_keep_horizon: false,
|
||||
partial_backup_enabled: false,
|
||||
partial_backup_timeout: Duration::from_secs(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,6 +147,21 @@ pub static RECEIVED_PS_FEEDBACKS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
)
|
||||
.expect("Failed to register safekeeper_received_ps_feedbacks_total counter")
|
||||
});
|
||||
pub static PARTIAL_BACKUP_UPLOADS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"safekeeper_partial_backup_uploads_total",
|
||||
"Number of partial backup uploads to the S3",
|
||||
&["result"]
|
||||
)
|
||||
.expect("Failed to register safekeeper_partial_backup_uploads_total counter")
|
||||
});
|
||||
pub static PARTIAL_BACKUP_UPLOADED_BYTES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"safekeeper_partial_backup_uploaded_bytes_total",
|
||||
"Number of bytes uploaded to the S3 during partial backup"
|
||||
)
|
||||
.expect("Failed to register safekeeper_partial_backup_uploaded_bytes_total counter")
|
||||
});
|
||||
|
||||
pub const LABEL_UNKNOWN: &str = "unknown";
|
||||
|
||||
|
||||
@@ -1221,6 +1221,7 @@ mod tests {
|
||||
commit_lsn: Lsn(1234567600),
|
||||
},
|
||||
)]),
|
||||
partial_backup: crate::wal_backup_partial::State::default(),
|
||||
};
|
||||
|
||||
let ser = state.ser().unwrap();
|
||||
@@ -1266,6 +1267,8 @@ mod tests {
|
||||
0x2a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x70, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
|
||||
0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
|
||||
// partial_backup
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
];
|
||||
|
||||
assert_eq!(Hex(&ser), Hex(&expected));
|
||||
|
||||
@@ -13,6 +13,7 @@ use utils::{
|
||||
use crate::{
|
||||
control_file,
|
||||
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
|
||||
wal_backup_partial::{self},
|
||||
};
|
||||
|
||||
/// Persistent information stored on safekeeper node about timeline.
|
||||
@@ -54,11 +55,14 @@ pub struct TimelinePersistentState {
|
||||
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
|
||||
/// informational purposes, we receive it from pageserver (or broker).
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
// Peers and their state as we remember it. Knowing peers themselves is
|
||||
// fundamental; but state is saved here only for informational purposes and
|
||||
// obviously can be stale. (Currently not saved at all, but let's provision
|
||||
// place to have less file version upgrades).
|
||||
/// Peers and their state as we remember it. Knowing peers themselves is
|
||||
/// fundamental; but state is saved here only for informational purposes and
|
||||
/// obviously can be stale. (Currently not saved at all, but let's provision
|
||||
/// place to have less file version upgrades).
|
||||
pub peers: PersistedPeers,
|
||||
/// Holds names of partial segments uploaded to remote storage. Used to
|
||||
/// clean up old objects without leaving garbage in remote storage.
|
||||
pub partial_backup: wal_backup_partial::State,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
@@ -93,6 +97,7 @@ impl TimelinePersistentState {
|
||||
.map(|p| (*p, PersistedPeerInfo::new()))
|
||||
.collect(),
|
||||
),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -38,7 +38,7 @@ use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
|
||||
|
||||
use crate::metrics::FullTimelineInfo;
|
||||
use crate::wal_storage::Storage as wal_storage_iface;
|
||||
use crate::{debug_dump, wal_storage};
|
||||
use crate::{debug_dump, wal_backup_partial, wal_storage};
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
|
||||
/// Things safekeeper should know about timeline state on peers.
|
||||
@@ -503,6 +503,9 @@ impl Timeline {
|
||||
if conf.peer_recovery_enabled {
|
||||
tokio::spawn(recovery_main(self.clone(), conf.clone()));
|
||||
}
|
||||
if conf.is_wal_backup_enabled() && conf.partial_backup_enabled {
|
||||
tokio::spawn(wal_backup_partial::main_task(self.clone(), conf.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete timeline from disk completely, by removing timeline directory.
|
||||
@@ -667,8 +670,8 @@ impl Timeline {
|
||||
term_flush_lsn =
|
||||
TermLsn::from((shared_state.sk.get_term(), shared_state.sk.flush_lsn()));
|
||||
}
|
||||
self.commit_lsn_watch_tx.send(commit_lsn)?;
|
||||
self.term_flush_lsn_watch_tx.send(term_flush_lsn)?;
|
||||
self.commit_lsn_watch_tx.send(commit_lsn)?;
|
||||
Ok(rmsg)
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::time::Duration;
|
||||
use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
|
||||
use postgres_ffi::XLogFileName;
|
||||
use postgres_ffi::{XLogSegNo, PG_TLI};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath};
|
||||
use remote_storage::{GenericRemoteStorage, RemotePath, StorageMetadata};
|
||||
use tokio::fs::File;
|
||||
|
||||
use tokio::select;
|
||||
@@ -180,6 +180,16 @@ fn get_configured_remote_storage() -> &'static GenericRemoteStorage {
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn init_remote_storage(conf: &SafeKeeperConf) {
|
||||
// TODO: refactor REMOTE_STORAGE to avoid using global variables, and provide
|
||||
// dependencies to all tasks instead.
|
||||
REMOTE_STORAGE.get_or_init(|| {
|
||||
conf.remote_storage
|
||||
.as_ref()
|
||||
.map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
|
||||
});
|
||||
}
|
||||
|
||||
const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000;
|
||||
|
||||
/// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup
|
||||
@@ -194,14 +204,6 @@ pub async fn wal_backup_launcher_task_main(
|
||||
conf.remote_storage
|
||||
);
|
||||
|
||||
let conf_ = conf.clone();
|
||||
REMOTE_STORAGE.get_or_init(|| {
|
||||
conf_
|
||||
.remote_storage
|
||||
.as_ref()
|
||||
.map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
|
||||
});
|
||||
|
||||
// Presence in this map means launcher is aware s3 offloading is needed for
|
||||
// the timeline, but task is started only if it makes sense for to offload
|
||||
// from this safekeeper.
|
||||
@@ -518,6 +520,35 @@ async fn backup_object(
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn backup_partial_segment(
|
||||
source_file: &Utf8Path,
|
||||
target_file: &RemotePath,
|
||||
size: usize,
|
||||
) -> Result<()> {
|
||||
let storage = get_configured_remote_storage();
|
||||
|
||||
let file = File::open(&source_file)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
|
||||
|
||||
// limiting the file to read only the first `size` bytes
|
||||
let limited_file = tokio::io::AsyncReadExt::take(file, size as u64);
|
||||
|
||||
let file = tokio_util::io::ReaderStream::with_capacity(limited_file, BUFFER_SIZE);
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
storage
|
||||
.upload(
|
||||
file,
|
||||
size,
|
||||
target_file,
|
||||
Some(StorageMetadata::from([("sk_type", "partial_segment")])),
|
||||
&cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn read_object(
|
||||
file_path: &RemotePath,
|
||||
offset: u64,
|
||||
@@ -604,6 +635,13 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Used by wal_backup_partial.
|
||||
pub async fn delete_objects(paths: &[RemotePath]) -> Result<()> {
|
||||
let cancel = CancellationToken::new(); // not really used
|
||||
let storage = get_configured_remote_storage();
|
||||
storage.delete_objects(paths, &cancel).await
|
||||
}
|
||||
|
||||
/// Copy segments from one timeline to another. Used in copy_timeline.
|
||||
pub async fn copy_s3_segments(
|
||||
wal_seg_size: usize,
|
||||
|
||||
396
safekeeper/src/wal_backup_partial.rs
Normal file
396
safekeeper/src/wal_backup_partial.rs
Normal file
@@ -0,0 +1,396 @@
|
||||
//! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
|
||||
//! and `flush_lsn` updates. After the partial segment was updated (`flush_lsn`
|
||||
//! was changed), the segment will be uploaded to S3 in about 15 minutes.
|
||||
//!
|
||||
//! The filename format for partial segments is
|
||||
//! `Segment_Term_Flush_Commit_skNN.partial`, where:
|
||||
//! - `Segment` – the segment name, like `000000010000000000000001`
|
||||
//! - `Term` – current term
|
||||
//! - `Flush` – flush_lsn in hex format `{:016X}`, e.g. `00000000346BC568`
|
||||
//! - `Commit` – commit_lsn in the same hex format
|
||||
//! - `NN` – safekeeper_id, like `1`
|
||||
//!
|
||||
//! The full object name example:
|
||||
//! `000000010000000000000002_2_0000000002534868_0000000002534410_sk1.partial`
|
||||
//!
|
||||
//! Each safekeeper will keep info about remote partial segments in its control
|
||||
//! file. Code updates state in the control file before doing any S3 operations.
|
||||
//! This way control file stores information about all potentially existing
|
||||
//! remote partial segments and can clean them up after uploading a newer version.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
|
||||
use rand::Rng;
|
||||
use remote_storage::RemotePath;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use tracing::{debug, error, info, instrument};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::{
|
||||
metrics::{PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
|
||||
safekeeper::Term,
|
||||
timeline::Timeline,
|
||||
wal_backup, SafeKeeperConf,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub enum UploadStatus {
|
||||
/// Upload is in progress
|
||||
InProgress,
|
||||
/// Upload is finished
|
||||
Uploaded,
|
||||
/// Deletion is in progress
|
||||
Deleting,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct PartialRemoteSegment {
|
||||
pub status: UploadStatus,
|
||||
pub name: String,
|
||||
pub commit_lsn: Lsn,
|
||||
pub flush_lsn: Lsn,
|
||||
pub term: Term,
|
||||
}
|
||||
|
||||
impl PartialRemoteSegment {
|
||||
fn eq_without_status(&self, other: &Self) -> bool {
|
||||
self.name == other.name
|
||||
&& self.commit_lsn == other.commit_lsn
|
||||
&& self.flush_lsn == other.flush_lsn
|
||||
&& self.term == other.term
|
||||
}
|
||||
}
|
||||
|
||||
// NB: these structures are a part of a control_file, you can't change them without
|
||||
// changing the control file format version.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
|
||||
pub struct State {
|
||||
pub segments: Vec<PartialRemoteSegment>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
/// Find an Uploaded segment. There should be only one Uploaded segment at a time.
|
||||
fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
|
||||
self.segments
|
||||
.iter()
|
||||
.find(|seg| seg.status == UploadStatus::Uploaded)
|
||||
.cloned()
|
||||
}
|
||||
}
|
||||
|
||||
struct PartialBackup {
|
||||
wal_seg_size: usize,
|
||||
tli: Arc<Timeline>,
|
||||
conf: SafeKeeperConf,
|
||||
local_prefix: Utf8PathBuf,
|
||||
remote_prefix: Utf8PathBuf,
|
||||
|
||||
state: State,
|
||||
}
|
||||
|
||||
// Read-only methods for getting segment names
|
||||
impl PartialBackup {
|
||||
fn segno(&self, lsn: Lsn) -> XLogSegNo {
|
||||
lsn.segment_number(self.wal_seg_size)
|
||||
}
|
||||
|
||||
fn segment_name(&self, segno: u64) -> String {
|
||||
XLogFileName(PG_TLI, segno, self.wal_seg_size)
|
||||
}
|
||||
|
||||
fn remote_segment_name(
|
||||
&self,
|
||||
segno: u64,
|
||||
term: u64,
|
||||
commit_lsn: Lsn,
|
||||
flush_lsn: Lsn,
|
||||
) -> String {
|
||||
format!(
|
||||
"{}_{}_{:016X}_{:016X}_sk{}.partial",
|
||||
self.segment_name(segno),
|
||||
term,
|
||||
flush_lsn.0,
|
||||
commit_lsn.0,
|
||||
self.conf.my_id.0,
|
||||
)
|
||||
}
|
||||
|
||||
fn local_segment_name(&self, segno: u64) -> String {
|
||||
format!("{}.partial", self.segment_name(segno))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialBackup {
|
||||
/// Takes a lock to read actual safekeeper state and returns a segment that should be uploaded.
|
||||
async fn prepare_upload(&self) -> PartialRemoteSegment {
|
||||
// this operation takes a lock to get the actual state
|
||||
let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
|
||||
let flush_lsn = Lsn(sk_info.flush_lsn);
|
||||
let commit_lsn = Lsn(sk_info.commit_lsn);
|
||||
let term = sk_info.term;
|
||||
let segno = self.segno(flush_lsn);
|
||||
|
||||
let name = self.remote_segment_name(segno, term, commit_lsn, flush_lsn);
|
||||
|
||||
PartialRemoteSegment {
|
||||
status: UploadStatus::InProgress,
|
||||
name,
|
||||
commit_lsn,
|
||||
flush_lsn,
|
||||
term,
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads segment from disk and uploads it to the remote storage.
|
||||
async fn upload_segment(&mut self, prepared: PartialRemoteSegment) -> anyhow::Result<()> {
|
||||
let flush_lsn = prepared.flush_lsn;
|
||||
let segno = self.segno(flush_lsn);
|
||||
|
||||
// We're going to backup bytes from the start of the segment up to flush_lsn.
|
||||
let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
|
||||
|
||||
let local_path = self.local_prefix.join(self.local_segment_name(segno));
|
||||
let remote_path = RemotePath::new(self.remote_prefix.join(&prepared.name).as_ref())?;
|
||||
|
||||
// Upload first `backup_bytes` bytes of the segment to the remote storage.
|
||||
wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?;
|
||||
PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
|
||||
|
||||
// We uploaded the segment, now let's verify that the data is still actual.
|
||||
// If the term changed, we cannot guarantee the validity of the uploaded data.
|
||||
// If the term is the same, we know the data is not corrupted.
|
||||
let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
|
||||
if sk_info.term != prepared.term {
|
||||
anyhow::bail!("term changed during upload");
|
||||
}
|
||||
assert!(prepared.commit_lsn <= Lsn(sk_info.commit_lsn));
|
||||
assert!(prepared.flush_lsn <= Lsn(sk_info.flush_lsn));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write new state to disk. If in-memory and on-disk states diverged, returns an error.
|
||||
async fn commit_state(&mut self, new_state: State) -> anyhow::Result<()> {
|
||||
self.tli
|
||||
.map_control_file(|cf| {
|
||||
if cf.partial_backup != self.state {
|
||||
let memory = self.state.clone();
|
||||
self.state = cf.partial_backup.clone();
|
||||
anyhow::bail!(
|
||||
"partial backup state diverged, memory={:?}, disk={:?}",
|
||||
memory,
|
||||
cf.partial_backup
|
||||
);
|
||||
}
|
||||
|
||||
cf.partial_backup = new_state.clone();
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
// update in-memory state
|
||||
self.state = new_state;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Upload the latest version of the partial segment and garbage collect older versions.
|
||||
#[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
|
||||
async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
|
||||
info!("starting upload {:?}", prepared);
|
||||
|
||||
let state_0 = self.state.clone();
|
||||
let state_1 = {
|
||||
let mut state = state_0.clone();
|
||||
state.segments.push(prepared.clone());
|
||||
state
|
||||
};
|
||||
|
||||
// we're going to upload a new segment, let's write it to disk to make GC later
|
||||
self.commit_state(state_1).await?;
|
||||
|
||||
self.upload_segment(prepared.clone()).await?;
|
||||
|
||||
let state_2 = {
|
||||
let mut state = state_0.clone();
|
||||
for seg in state.segments.iter_mut() {
|
||||
seg.status = UploadStatus::Deleting;
|
||||
}
|
||||
let mut actual_remote_segment = prepared.clone();
|
||||
actual_remote_segment.status = UploadStatus::Uploaded;
|
||||
state.segments.push(actual_remote_segment);
|
||||
state
|
||||
};
|
||||
|
||||
// we've uploaded new segment, it's actual, all other segments should be GCed
|
||||
self.commit_state(state_2).await?;
|
||||
self.gc().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete all non-Uploaded segments from the remote storage. There should be only one
|
||||
/// Uploaded segment at a time.
|
||||
#[instrument(name = "gc", skip_all)]
|
||||
async fn gc(&mut self) -> anyhow::Result<()> {
|
||||
let mut segments_to_delete = vec![];
|
||||
|
||||
let new_segments: Vec<PartialRemoteSegment> = self
|
||||
.state
|
||||
.segments
|
||||
.iter()
|
||||
.filter_map(|seg| {
|
||||
if seg.status == UploadStatus::Uploaded {
|
||||
Some(seg.clone())
|
||||
} else {
|
||||
segments_to_delete.push(seg.name.clone());
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
info!("deleting objects: {:?}", segments_to_delete);
|
||||
let mut objects_to_delete = vec![];
|
||||
for seg in segments_to_delete.iter() {
|
||||
let remote_path = RemotePath::new(self.remote_prefix.join(seg).as_ref())?;
|
||||
objects_to_delete.push(remote_path);
|
||||
}
|
||||
|
||||
// removing segments from remote storage
|
||||
wal_backup::delete_objects(&objects_to_delete).await?;
|
||||
|
||||
// now we can update the state on disk
|
||||
let new_state = {
|
||||
let mut state = self.state.clone();
|
||||
state.segments = new_segments;
|
||||
state
|
||||
};
|
||||
self.commit_state(new_state).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(name = "Partial backup", skip_all, fields(ttid = %tli.ttid))]
|
||||
pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
|
||||
debug!("started");
|
||||
let await_duration = conf.partial_backup_timeout;
|
||||
|
||||
let mut cancellation_rx = match tli.get_cancellation_rx() {
|
||||
Ok(rx) => rx,
|
||||
Err(_) => {
|
||||
info!("timeline canceled during task start");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// sleep for random time to avoid thundering herd
|
||||
{
|
||||
let randf64 = rand::thread_rng().gen_range(0.0..1.0);
|
||||
let sleep_duration = await_duration.mul_f64(randf64);
|
||||
tokio::time::sleep(sleep_duration).await;
|
||||
}
|
||||
|
||||
let (_, persistent_state) = tli.get_state().await;
|
||||
let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
|
||||
let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
|
||||
let wal_seg_size = tli.get_wal_seg_size().await;
|
||||
|
||||
let local_prefix = tli.timeline_dir.clone();
|
||||
let remote_prefix = match tli.timeline_dir.strip_prefix(&conf.workdir) {
|
||||
Ok(path) => path.to_owned(),
|
||||
Err(e) => {
|
||||
error!("failed to strip workspace dir prefix: {:?}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut backup = PartialBackup {
|
||||
wal_seg_size,
|
||||
tli,
|
||||
state: persistent_state.partial_backup,
|
||||
conf,
|
||||
local_prefix,
|
||||
remote_prefix,
|
||||
};
|
||||
|
||||
debug!("state: {:?}", backup.state);
|
||||
|
||||
'outer: loop {
|
||||
// wait until we have something to upload
|
||||
let uploaded_segment = backup.state.uploaded_segment();
|
||||
if let Some(seg) = &uploaded_segment {
|
||||
// if we already uploaded something, wait until we have something new
|
||||
while flush_lsn_rx.borrow().lsn == seg.flush_lsn
|
||||
&& *commit_lsn_rx.borrow() == seg.commit_lsn
|
||||
&& flush_lsn_rx.borrow().term == seg.term
|
||||
{
|
||||
tokio::select! {
|
||||
_ = cancellation_rx.changed() => {
|
||||
info!("timeline canceled");
|
||||
return;
|
||||
}
|
||||
_ = commit_lsn_rx.changed() => {}
|
||||
_ = flush_lsn_rx.changed() => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fixing the segno and waiting some time to prevent reuploading the same segment too often
|
||||
let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
|
||||
let timeout = tokio::time::sleep(await_duration);
|
||||
tokio::pin!(timeout);
|
||||
let mut timeout_expired = false;
|
||||
|
||||
// waiting until timeout expires OR segno changes
|
||||
'inner: loop {
|
||||
tokio::select! {
|
||||
_ = cancellation_rx.changed() => {
|
||||
info!("timeline canceled");
|
||||
return;
|
||||
}
|
||||
_ = commit_lsn_rx.changed() => {}
|
||||
_ = flush_lsn_rx.changed() => {
|
||||
let segno = backup.segno(flush_lsn_rx.borrow().lsn);
|
||||
if segno != pending_segno {
|
||||
// previous segment is no longer partial, aborting the wait
|
||||
break 'inner;
|
||||
}
|
||||
}
|
||||
_ = &mut timeout => {
|
||||
// timeout expired, now we are ready for upload
|
||||
timeout_expired = true;
|
||||
break 'inner;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !timeout_expired {
|
||||
// likely segno has changed, let's try again in the next iteration
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
let prepared = backup.prepare_upload().await;
|
||||
if let Some(seg) = &uploaded_segment {
|
||||
if seg.eq_without_status(&prepared) {
|
||||
// we already uploaded this segment, nothing to do
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
|
||||
match backup.do_upload(&prepared).await {
|
||||
Ok(()) => {
|
||||
debug!(
|
||||
"uploaded {} up to flush_lsn {}",
|
||||
prepared.name, prepared.flush_lsn
|
||||
);
|
||||
PARTIAL_BACKUP_UPLOADS.with_label_values(&["ok"]).inc();
|
||||
}
|
||||
Err(e) => {
|
||||
info!("failed to upload {}: {:#}", prepared.name, e);
|
||||
PARTIAL_BACKUP_UPLOADS.with_label_values(&["error"]).inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -176,6 +176,8 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
http_auth: None,
|
||||
current_thread_runtime: false,
|
||||
walsenders_keep_horizon: false,
|
||||
partial_backup_enabled: false,
|
||||
partial_backup_timeout: Duration::from_secs(0),
|
||||
};
|
||||
|
||||
let mut global = GlobalMap::new(disk, conf.clone())?;
|
||||
|
||||
@@ -192,6 +192,9 @@ def test_backward_compatibility(
|
||||
assert not breaking_changes_allowed, "Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
|
||||
|
||||
|
||||
# Forward compatibility is broken due to https://github.com/neondatabase/neon/pull/6530
|
||||
# The test is disabled until the next release deployment
|
||||
@pytest.mark.xfail
|
||||
@check_ondisk_data_compatibility_if_enabled
|
||||
@pytest.mark.xdist_group("compatibility")
|
||||
@pytest.mark.order(after="test_create_snapshot")
|
||||
|
||||
@@ -10,6 +10,7 @@ import pytest
|
||||
import toml
|
||||
from fixtures.log_helper import getLogger
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder, Safekeeper
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
log = getLogger("root.safekeeper_async")
|
||||
@@ -199,7 +200,9 @@ async def run_restarts_under_load(
|
||||
# assert that at least one transaction has completed in every worker
|
||||
stats.check_progress()
|
||||
|
||||
victim.start()
|
||||
# testing #6530, temporary here
|
||||
# TODO: remove afer partial backup is enabled by default
|
||||
victim.start(extra_opts=["--partial-backup-enabled", "--partial-backup-timeout=2s"])
|
||||
|
||||
log.info("Iterations are finished, exiting coroutines...")
|
||||
stats.running = False
|
||||
@@ -213,6 +216,7 @@ async def run_restarts_under_load(
|
||||
# Restart acceptors one by one, while executing and validating bank transactions
|
||||
def test_restarts_under_load(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.enable_safekeeper_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_safekeepers_restarts_under_load")
|
||||
|
||||
Reference in New Issue
Block a user