Compare commits

...

3 Commits

Author SHA1 Message Date
Arthur Petukhovsky
99e3e5ad36 Refactor WIP 2022-08-04 11:24:03 +00:00
Arthur Petukhovsky
f2efe2496e Refactor safekeeper.rs 2022-08-02 16:26:26 +00:00
Arthur Petukhovsky
e2813b4144 Refactor wal_storage.rs 2022-08-02 11:29:23 +00:00
6 changed files with 159 additions and 206 deletions

View File

@@ -1,8 +1,3 @@
use serde::{Deserialize, Serialize};
use utils::zid::{NodeId, ZTimelineId};
#[derive(Serialize, Deserialize)]
pub struct TimelineCreateRequest {
pub timeline_id: ZTimelineId,
pub peer_ids: Vec<NodeId>,
}

View File

@@ -25,8 +25,6 @@ use utils::{
zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
};
use super::models::TimelineCreateRequest;
#[derive(Debug, Serialize)]
struct SafekeeperStatus {
id: NodeId,
@@ -122,20 +120,6 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
json_response(StatusCode::OK, status)
}
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
let zttid = ZTenantTimelineId {
tenant_id: parse_request_param(&request, "tenant_id")?,
timeline_id: request_data.timeline_id,
};
check_permission(&request, Some(zttid.tenant_id))?;
GlobalTimelines::create(get_conf(&request), zttid, request_data.peer_ids)
.map_err(ApiError::from_err)?;
json_response(StatusCode::CREATED, ())
}
/// Deactivates the timeline and removes its data directory.
///
/// It does not try to stop any processing of the timeline; there is no such code at the time of writing.
@@ -221,8 +205,7 @@ pub fn make_router(
.data(Arc::new(conf))
.data(auth)
.get("/v1/status", status_handler)
// Will be used in the future instead of implicit timeline creation
.post("/v1/tenant/:tenant_id/timeline", timeline_create_handler)
// TODO: update OpenAPI spec
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_status_handler,

View File

@@ -484,8 +484,12 @@ impl AcceptorProposerMessage {
}
}
/// SafeKeeper which consumes events (messages from compute) and provides
/// replies.
/// Safekeeper implements consensus to reliably persist WAL across nodes.
/// It controls all WAL disk writes and updates of control file.
///
/// Currently safekeeper processes:
/// - messages from compute (proposers) and provides replies
/// - messages from etcd peers
pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
/// Note: be careful to set only if we are sure our WAL (term history) matches
@@ -508,19 +512,27 @@ where
CTRL: control_file::Storage,
WAL: wal_storage::Storage,
{
// constructor
/// Accepts a control file storage containing the safekeeper state.
/// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
/// and `server` (`wal_seg_size` inside it) fields.
pub fn new(
ztli: ZTimelineId,
state: CTRL,
mut wal_store: WAL,
node_id: NodeId,
) -> Result<SafeKeeper<CTRL, WAL>> {
if state.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.timeline_id {
bail!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.timeline_id);
if state.tenant_id == ZTenantId::from([0u8; 16])
|| state.timeline_id == ZTimelineId::from([0u8; 16])
{
bail!(
"Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
state.tenant_id,
state.timeline_id
);
}
if state.server.wal_seg_size == 0 {
bail!("Calling SafeKeeper::new with empty wal_seg_size");
}
// initialize wal_store, if state is already initialized
wal_store.init_storage(&state)?;
Ok(SafeKeeper {
global_commit_lsn: state.commit_lsn,
@@ -579,7 +591,7 @@ where
&mut self,
msg: &ProposerGreeting,
) -> Result<Option<AcceptorProposerMessage>> {
/* Check protocol compatibility */
// Check protocol compatibility
if msg.protocol_version != SK_PROTOCOL_VERSION {
bail!(
"incompatible protocol version {}, expected {}",
@@ -587,11 +599,11 @@ where
SK_PROTOCOL_VERSION
);
}
/* Postgres upgrade is not treated as fatal error */
// Postgres upgrade is not treated as fatal error
if msg.pg_version != self.state.server.pg_version
&& self.state.server.pg_version != UNKNOWN_SERVER_VERSION
{
info!(
warn!(
"incompatible server version {}, expected {}",
msg.pg_version, self.state.server.pg_version
);
@@ -610,17 +622,25 @@ where
self.state.timeline_id
);
}
// set basic info about server, if not yet
// TODO: verify that is doesn't change after
{
let mut state = self.state.clone();
state.server.system_id = msg.system_id;
state.server.wal_seg_size = msg.wal_seg_size;
self.state.persist(&state)?;
if self.state.server.wal_seg_size != msg.wal_seg_size {
bail!(
"invalid wal_seg_size, got {}, expected {}",
msg.wal_seg_size,
self.state.server.wal_seg_size
);
}
self.wal_store.init_storage(&self.state)?;
// system_id will be updated on mismatch
if self.state.server.system_id != msg.system_id {
warn!(
"unexpected system ID arrived, got {}, expected {}",
msg.system_id, self.state.server.system_id
);
let mut state = self.state.clone();
state.server.system_id = msg.system_id;
self.state.persist(&state)?;
}
info!(
"processed greeting from proposer {:?}, sending term {:?}",
@@ -670,16 +690,6 @@ where
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
}
/// Bump our term if received a note from elected proposer with higher one
fn bump_if_higher(&mut self, term: Term) -> Result<()> {
if self.state.acceptor_state.term < term {
let mut state = self.state.clone();
state.acceptor_state.term = term;
self.state.persist(&state)?;
}
Ok(())
}
/// Form AppendResponse from current state.
fn append_response(&self) -> AppendResponse {
let ar = AppendResponse {
@@ -696,7 +706,12 @@ where
fn handle_elected(&mut self, msg: &ProposerElected) -> Result<Option<AcceptorProposerMessage>> {
info!("received ProposerElected {:?}", msg);
self.bump_if_higher(msg.term)?;
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.clone();
state.acceptor_state.term = msg.term;
self.state.persist(&state)?;
}
// If our term is higher, ignore the message (next feedback will inform the compute)
if self.state.acceptor_state.term > msg.term {
return Ok(None);
@@ -749,7 +764,7 @@ where
}
/// Advance commit_lsn taking into account what we have locally
pub fn update_commit_lsn(&mut self) -> Result<()> {
fn update_commit_lsn(&mut self) -> Result<()> {
let commit_lsn = min(self.global_commit_lsn, self.flush_lsn());
assert!(commit_lsn >= self.inmem.commit_lsn);
@@ -952,10 +967,6 @@ mod tests {
self.lsn
}
fn init_storage(&mut self, _state: &SafeKeeperState) -> Result<()> {
Ok(())
}
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
self.lsn = startpos + buf.len() as u64;
Ok(())

View File

@@ -557,6 +557,7 @@ impl TimelineTools for Option<Arc<Timeline>> {
struct GlobalTimelinesState {
timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>,
wal_backup_launcher_tx: Option<Sender<ZTenantTimelineId>>,
conf: Option<SafeKeeperConf>,
}
lazy_static! {
@@ -576,15 +577,16 @@ pub struct TimelineDeleteForceResult {
pub struct GlobalTimelines;
impl GlobalTimelines {
pub fn init(wal_backup_launcher_tx: Sender<ZTenantTimelineId>) {
pub fn init(wal_backup_launcher_tx: Sender<ZTenantTimelineId>, conf: &SafeKeeperConf) {
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(state.wal_backup_launcher_tx.is_none());
assert!(state.conf.is_none());
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
state.conf = Some(conf.clone());
}
fn create_internal(
mut state: MutexGuard<GlobalTimelinesState>,
conf: &SafeKeeperConf,
zttid: ZTenantTimelineId,
peer_ids: Vec<NodeId>,
) -> Result<Arc<Timeline>> {
@@ -609,15 +611,6 @@ impl GlobalTimelines {
}
}
pub fn create(
conf: &SafeKeeperConf,
zttid: ZTenantTimelineId,
peer_ids: Vec<NodeId>,
) -> Result<Arc<Timeline>> {
let state = TIMELINES_STATE.lock().unwrap();
GlobalTimelines::create_internal(state, conf, zttid, peer_ids)
}
/// Get a timeline with control file loaded from the global TIMELINES_STATE.timelines map.
/// If control file doesn't exist and create=false, bails out.
pub fn get(
@@ -625,7 +618,7 @@ impl GlobalTimelines {
zttid: ZTenantTimelineId,
create: bool,
) -> Result<Arc<Timeline>> {
let _enter = info_span!("", timeline = %zttid.tenant_id).entered();
let _enter = info_span!("", tenant = %zttid.tenant_id).entered();
let mut state = TIMELINES_STATE.lock().unwrap();

View File

@@ -53,7 +53,7 @@ pub fn wal_backup_launcher_thread_main(
/// Check whether wal backup is required for timeline. If yes, mark that launcher is
/// aware of current status and return the timeline.
fn is_wal_backup_required(zttid: ZTenantTimelineId) -> Option<Arc<Timeline>> {
GlobalTimelines::get_loaded(zttid).filter(|t| t.wal_backup_attend())
GlobalTimelines::get_loaded(zttid).filter(|t| t.wal_backup-_attend())
}
struct WalBackupTaskHandle {

View File

@@ -7,7 +7,7 @@
//!
//! Note that last file has `.partial` suffix, that's different from postgres.
use anyhow::{anyhow, bail, Context, Result};
use anyhow::{bail, Context, Result};
use std::io::{self, Seek, SeekFrom};
use std::pin::Pin;
use tokio::io::AsyncRead;
@@ -16,7 +16,7 @@ use lazy_static::lazy_static;
use postgres_ffi::xlog_utils::{
find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName, XLogSegNo, PG_TLI,
};
use std::cmp::min;
use std::cmp::{max, min};
use std::fs::{self, remove_file, File, OpenOptions};
use std::io::Write;
@@ -86,9 +86,6 @@ pub trait Storage {
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;
/// Init storage with wal_seg_size and read WAL from disk to get latest LSN.
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()>;
/// Write piece of WAL from buf to disk, but not necessarily sync it.
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
@@ -104,7 +101,7 @@ pub trait Storage {
}
/// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
/// for better performance. Storage must be initialized before use.
/// for better performance. Storage is initialized in the constructor.
///
/// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
/// its filename and may be not fully flushed.
@@ -112,16 +109,15 @@ pub trait Storage {
/// Relationship of LSNs:
/// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
///
/// When storage is just created, all LSNs are zeroes and there are no segments on disk.
/// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
pub struct PhysicalStorage {
metrics: WalStorageMetrics,
zttid: ZTenantTimelineId,
timeline_dir: PathBuf,
conf: SafeKeeperConf,
// fields below are filled upon initialization
/// None if uninitialized, Some(usize) if storage is initialized.
wal_seg_size: Option<usize>,
/// Size of WAL segment in bytes.
wal_seg_size: usize,
/// Written to disk, but possibly still in the cache and not fully persisted.
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
@@ -146,25 +142,50 @@ pub struct PhysicalStorage {
}
impl PhysicalStorage {
pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> PhysicalStorage {
pub fn new(
zttid: &ZTenantTimelineId,
conf: &SafeKeeperConf,
state: &SafeKeeperState,
) -> Result<PhysicalStorage> {
let timeline_dir = conf.timeline_dir(zttid);
PhysicalStorage {
if state.server.wal_seg_size == 0 {
bail!("wal_seg_size must be initialized before creating PhysicalStorage");
}
let wal_seg_size = state.server.wal_seg_size as usize;
// Find out where stored WAL ends, starting at commit_lsn which is a
// known recent record boundary (unless we don't have WAL at all).
let write_lsn = if state.commit_lsn == Lsn(0) {
Lsn(0)
} else {
Lsn(find_end_of_wal(&timeline_dir, wal_seg_size, true, state.commit_lsn)?.0)
};
// TODO: do we really know that write_lsn is fully flushed to disk?
// If not, maybe it's better to call fsync() here to be sure?
let flush_lsn = write_lsn;
info!(
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
zttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
);
if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", zttid.timeline_id);
}
Ok(PhysicalStorage {
metrics: WalStorageMetrics::new(zttid),
zttid: *zttid,
timeline_dir,
conf: conf.clone(),
wal_seg_size: None,
write_lsn: Lsn(0),
write_record_lsn: Lsn(0),
flush_record_lsn: Lsn(0),
decoder: WalStreamDecoder::new(Lsn(0)),
wal_seg_size,
write_lsn,
write_record_lsn: write_lsn,
flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(write_lsn),
file: None,
}
}
/// Wrapper for flush_lsn updates that also updates metrics.
fn update_flush_lsn(&mut self) {
self.flush_record_lsn = self.write_record_lsn;
})
}
/// Call fdatasync if config requires so.
@@ -189,9 +210,9 @@ impl PhysicalStorage {
/// Open or create WAL segment file. Caller must call seek to the wanted position.
/// Returns `file` and `is_partial`.
fn open_or_create(&self, segno: XLogSegNo, wal_seg_size: usize) -> Result<(File, bool)> {
fn open_or_create(&self, segno: XLogSegNo) -> Result<(File, bool)> {
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
// Try to open already completed segment
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
@@ -207,24 +228,18 @@ impl PhysicalStorage {
.open(&wal_file_partial_path)
.with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
write_zeroes(&mut file, wal_seg_size)?;
write_zeroes(&mut file, self.wal_seg_size)?;
self.fsync_file(&mut file)?;
Ok((file, true))
}
}
/// Write WAL bytes, which are known to be located in a single WAL segment.
fn write_in_segment(
&mut self,
segno: u64,
xlogoff: usize,
buf: &[u8],
wal_seg_size: usize,
) -> Result<()> {
fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
let mut file = if let Some(file) = self.file.take() {
file
} else {
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
let (mut file, is_partial) = self.open_or_create(segno)?;
assert!(is_partial, "unexpected write into non-partial segment file");
file.seek(SeekFrom::Start(xlogoff as u64))?;
file
@@ -232,13 +247,13 @@ impl PhysicalStorage {
file.write_all(buf)?;
if xlogoff + buf.len() == wal_seg_size {
if xlogoff + buf.len() == self.wal_seg_size {
// If we reached the end of a WAL segment, flush and close it.
self.fdatasync_file(&mut file)?;
// Rename partial file to completed file
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(&wal_file_partial_path, &wal_file_path)?;
} else {
// otherwise, file can be reused later
@@ -254,10 +269,6 @@ impl PhysicalStorage {
///
/// Updates `write_lsn`.
fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
let wal_seg_size = self
.wal_seg_size
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
if self.write_lsn != pos {
// need to flush the file before discarding it
if let Some(mut file) = self.file.take() {
@@ -269,17 +280,17 @@ impl PhysicalStorage {
while !buf.is_empty() {
// Extract WAL location for this block
let xlogoff = self.write_lsn.segment_offset(wal_seg_size) as usize;
let segno = self.write_lsn.segment_number(wal_seg_size);
let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size) as usize;
let segno = self.write_lsn.segment_number(self.wal_seg_size);
// If crossing a WAL boundary, only write up until we reach wal segment size.
let bytes_write = if xlogoff + buf.len() > wal_seg_size {
wal_seg_size - xlogoff
let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
self.wal_seg_size - xlogoff
} else {
buf.len()
};
self.write_in_segment(segno, xlogoff, &buf[..bytes_write], wal_seg_size)?;
self.write_in_segment(segno, xlogoff, &buf[..bytes_write])?;
self.write_lsn += bytes_write as u64;
buf = &buf[bytes_write..];
}
@@ -294,53 +305,6 @@ impl Storage for PhysicalStorage {
self.flush_record_lsn
}
/// Storage needs to know wal_seg_size to know which segment to read/write, but
/// wal_seg_size is not always known at the moment of storage creation. This method
/// allows to postpone its initialization.
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()> {
if state.server.wal_seg_size == 0 {
// wal_seg_size is still unknown. This is dead path normally, should
// be used only in tests.
return Ok(());
}
if let Some(wal_seg_size) = self.wal_seg_size {
// physical storage is already initialized
assert_eq!(wal_seg_size, state.server.wal_seg_size as usize);
return Ok(());
}
// initialize physical storage
let wal_seg_size = state.server.wal_seg_size as usize;
self.wal_seg_size = Some(wal_seg_size);
// Find out where stored WAL ends, starting at commit_lsn which is a
// known recent record boundary (unless we don't have WAL at all).
self.write_lsn = if state.commit_lsn == Lsn(0) {
Lsn(0)
} else {
Lsn(find_end_of_wal(&self.timeline_dir, wal_seg_size, true, state.commit_lsn)?.0)
};
self.write_record_lsn = self.write_lsn;
// TODO: do we really know that write_lsn is fully flushed to disk?
// If not, maybe it's better to call fsync() here to be sure?
self.update_flush_lsn();
info!(
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
self.zttid.timeline_id, self.flush_record_lsn, state.commit_lsn, state.peer_horizon_lsn,
);
if self.flush_record_lsn < state.commit_lsn
|| self.flush_record_lsn < state.peer_horizon_lsn
{
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", self.zttid.timeline_id);
}
Ok(())
}
/// Write WAL to disk.
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
// Disallow any non-sequential writes, which can result in gaps or overwrites.
@@ -404,80 +368,83 @@ impl Storage for PhysicalStorage {
// We have unflushed data (write_lsn != flush_lsn), but no file.
// This should only happen if last file was fully written and flushed,
// but haven't updated flush_lsn yet.
assert!(self.write_lsn.segment_offset(self.wal_seg_size.unwrap()) == 0);
if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
bail!(
"unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
self.write_lsn,
self.flush_record_lsn
);
}
}
// everything is flushed now, let's update flush_lsn
self.update_flush_lsn();
self.flush_record_lsn = self.write_record_lsn;
Ok(())
}
/// Truncate written WAL by removing all WAL segments after the given LSN.
/// end_pos must point to the end of the WAL record.
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
let wal_seg_size = self
.wal_seg_size
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
// Streaming must not create a hole, so truncate cannot be called on non-written lsn
assert!(self.write_lsn == Lsn(0) || self.write_lsn >= end_pos);
if self.write_lsn != Lsn(0) && end_pos >= self.write_lsn {
bail!(
"truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
self.write_lsn,
end_pos
);
}
// Close previously opened file, if any
if let Some(mut unflushed_file) = self.file.take() {
self.fdatasync_file(&mut unflushed_file)?;
}
let xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
let segno = end_pos.segment_number(wal_seg_size);
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
let xlogoff = end_pos.segment_offset(self.wal_seg_size) as usize;
let segno = end_pos.segment_number(self.wal_seg_size);
// Remove all segments after the given LSN.
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno)?;
let (mut file, is_partial) = self.open_or_create(segno)?;
// Fill end with zeroes
file.seek(SeekFrom::Start(xlogoff as u64))?;
write_zeroes(&mut file, wal_seg_size - xlogoff)?;
write_zeroes(&mut file, self.wal_seg_size - xlogoff)?;
self.fdatasync_file(&mut file)?;
if !is_partial {
// Make segment partial once again
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(&wal_file_path, &wal_file_partial_path)?;
}
// Remove all subsequent segments
let mut segno = segno;
loop {
segno += 1;
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
// TODO: better use fs::try_exists which is currently available only in nightly build
if wal_file_path.exists() {
fs::remove_file(&wal_file_path)?;
} else if wal_file_partial_path.exists() {
fs::remove_file(&wal_file_partial_path)?;
} else {
break;
}
}
// Update LSNs
self.write_lsn = end_pos;
self.write_record_lsn = end_pos;
self.update_flush_lsn();
self.flush_record_lsn = end_pos;
Ok(())
}
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
let timeline_dir = self.timeline_dir.clone();
let wal_seg_size = self.wal_seg_size.unwrap();
let wal_seg_size = self.wal_seg_size;
Box::new(move |segno_up_to: XLogSegNo| {
remove_up_to(&timeline_dir, wal_seg_size, segno_up_to)
remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to)
})
}
}
/// Remove all WAL segments in timeline_dir <= given segno.
fn remove_up_to(timeline_dir: &Path, wal_seg_size: usize, segno_up_to: XLogSegNo) -> Result<()> {
/// Remove all WAL segments in timeline_dir that match the given predicate.
fn remove_segments_from_disk(
timeline_dir: &Path,
wal_seg_size: usize,
remove_predicate: impl Fn(XLogSegNo) -> bool,
) -> Result<()> {
let mut n_removed = 0;
let mut min_removed = u64::MAX;
let mut max_removed = u64::MIN;
for entry in fs::read_dir(&timeline_dir)? {
let entry = entry?;
let entry_path = entry.path();
@@ -489,19 +456,21 @@ fn remove_up_to(timeline_dir: &Path, wal_seg_size: usize, segno_up_to: XLogSegNo
continue;
}
let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
if segno <= segno_up_to {
if remove_predicate(segno) {
remove_file(entry_path)?;
n_removed += 1;
min_removed = min(min_removed, segno);
max_removed = max(max_removed, segno);
}
}
}
let segno_from = segno_up_to - n_removed + 1;
info!(
"removed {} WAL segments [{}; {}]",
n_removed,
XLogFileName(PG_TLI, segno_from, wal_seg_size),
XLogFileName(PG_TLI, segno_up_to, wal_seg_size)
);
if n_removed > 0 {
info!(
"removed {} WAL segments [{}; {}]",
n_removed, min_removed, max_removed
);
}
Ok(())
}
@@ -511,8 +480,10 @@ pub struct WalReader {
pos: Lsn,
wal_segment: Option<Pin<Box<dyn AsyncRead>>>,
enable_remote_read: bool,
// S3 will be used to read WAL if LSN is not available locally
enable_remote_read: bool,
// We don't have WAL locally if LSN is less than local_start_lsn
local_start_lsn: Lsn,
}