mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
Download WAL from S3 if it's not available in safekeeper dir (#1932)
`send_wal.rs` and `WalReader` are now async. `test_s3_wal_replay` checks that WAL can be replayed after offloaded.
This commit is contained in:
committed by
GitHub
parent
36ee182d26
commit
699f46cd84
@@ -124,7 +124,7 @@ fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: L
|
||||
term,
|
||||
start_streaming_at: lsn,
|
||||
term_history: history,
|
||||
timeline_start_lsn: Lsn(0),
|
||||
timeline_start_lsn: lsn,
|
||||
});
|
||||
|
||||
spg.timeline.get().process_msg(&proposer_elected_request)?;
|
||||
|
||||
@@ -13,9 +13,11 @@ use serde::{Deserialize, Serialize};
|
||||
use std::cmp::min;
|
||||
use std::net::Shutdown;
|
||||
use std::sync::Arc;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
use std::{str, thread};
|
||||
|
||||
use tokio::sync::watch::Receiver;
|
||||
use tokio::time::timeout;
|
||||
use tracing::*;
|
||||
use utils::{
|
||||
bin_ser::BeSer,
|
||||
@@ -191,100 +193,143 @@ impl ReplicationConn {
|
||||
}
|
||||
})?;
|
||||
|
||||
let mut wal_seg_size: usize;
|
||||
loop {
|
||||
wal_seg_size = spg.timeline.get().get_state().1.server.wal_seg_size as usize;
|
||||
if wal_seg_size == 0 {
|
||||
error!("Cannot start replication before connecting to wal_proposer");
|
||||
sleep(Duration::from_secs(1));
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
runtime.block_on(async move {
|
||||
let (_, persisted_state) = spg.timeline.get().get_state();
|
||||
if persisted_state.server.wal_seg_size == 0
|
||||
|| persisted_state.timeline_start_lsn == Lsn(0)
|
||||
{
|
||||
bail!("Cannot start replication before connecting to walproposer");
|
||||
}
|
||||
|
||||
let wal_end = spg.timeline.get().get_end_of_wal();
|
||||
// Walproposer gets special handling: safekeeper must give proposer all
|
||||
// local WAL till the end, whether committed or not (walproposer will
|
||||
// hang otherwise). That's because walproposer runs the consensus and
|
||||
// synchronizes safekeepers on the most advanced one.
|
||||
//
|
||||
// There is a small risk of this WAL getting concurrently garbaged if
|
||||
// another compute rises which collects majority and starts fixing log
|
||||
// on this safekeeper itself. That's ok as (old) proposer will never be
|
||||
// able to commit such WAL.
|
||||
let stop_pos: Option<Lsn> = if spg.appname == Some("wal_proposer_recovery".to_string())
|
||||
{
|
||||
Some(wal_end)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
info!("Start replication from {:?} till {:?}", start_pos, stop_pos);
|
||||
|
||||
// switch to copy
|
||||
pgb.write_message(&BeMessage::CopyBothResponse)?;
|
||||
|
||||
let mut end_pos = Lsn(0);
|
||||
|
||||
let mut wal_reader = WalReader::new(
|
||||
spg.conf.timeline_dir(&spg.timeline.get().zttid),
|
||||
&persisted_state,
|
||||
start_pos,
|
||||
spg.conf.wal_backup_enabled,
|
||||
)?;
|
||||
|
||||
// buffer for wal sending, limited by MAX_SEND_SIZE
|
||||
let mut send_buf = vec![0u8; MAX_SEND_SIZE];
|
||||
|
||||
// watcher for commit_lsn updates
|
||||
let mut commit_lsn_watch_rx = spg.timeline.get().get_commit_lsn_watch_rx();
|
||||
|
||||
loop {
|
||||
if let Some(stop_pos) = stop_pos {
|
||||
if start_pos >= stop_pos {
|
||||
break; /* recovery finished */
|
||||
}
|
||||
end_pos = stop_pos;
|
||||
} else {
|
||||
/* Wait until we have some data to stream */
|
||||
let lsn = wait_for_lsn(&mut commit_lsn_watch_rx, start_pos).await?;
|
||||
|
||||
if let Some(lsn) = lsn {
|
||||
end_pos = lsn;
|
||||
} else {
|
||||
// TODO: also check once in a while whether we are walsender
|
||||
// to right pageserver.
|
||||
if spg.timeline.get().stop_walsender(replica_id)? {
|
||||
// Shut down, timeline is suspended.
|
||||
// TODO create proper error type for this
|
||||
bail!("end streaming to {:?}", spg.appname);
|
||||
}
|
||||
|
||||
// timeout expired: request pageserver status
|
||||
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
|
||||
sent_ptr: end_pos.0,
|
||||
timestamp: get_current_timestamp(),
|
||||
request_reply: true,
|
||||
}))
|
||||
.context("Failed to send KeepAlive message")?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
|
||||
let send_size = min(send_size, send_buf.len());
|
||||
|
||||
let send_buf = &mut send_buf[..send_size];
|
||||
|
||||
// read wal into buffer
|
||||
let send_size = wal_reader.read(send_buf).await?;
|
||||
let send_buf = &send_buf[..send_size];
|
||||
|
||||
// Write some data to the network socket.
|
||||
pgb.write_message(&BeMessage::XLogData(XLogDataBody {
|
||||
wal_start: start_pos.0,
|
||||
wal_end: end_pos.0,
|
||||
timestamp: get_current_timestamp(),
|
||||
data: send_buf,
|
||||
}))
|
||||
.context("Failed to send XLogData")?;
|
||||
|
||||
start_pos += send_size as u64;
|
||||
trace!("sent WAL up to {}", start_pos);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
// Wait until we have commit_lsn > lsn or timeout expires. Returns latest commit_lsn.
|
||||
async fn wait_for_lsn(rx: &mut Receiver<Lsn>, lsn: Lsn) -> Result<Option<Lsn>> {
|
||||
let commit_lsn: Lsn = *rx.borrow();
|
||||
if commit_lsn > lsn {
|
||||
return Ok(Some(commit_lsn));
|
||||
}
|
||||
|
||||
let res = timeout(POLL_STATE_TIMEOUT, async move {
|
||||
let mut commit_lsn;
|
||||
loop {
|
||||
rx.changed().await?;
|
||||
commit_lsn = *rx.borrow();
|
||||
if commit_lsn > lsn {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let wal_end = spg.timeline.get().get_end_of_wal();
|
||||
// Walproposer gets special handling: safekeeper must give proposer all
|
||||
// local WAL till the end, whether committed or not (walproposer will
|
||||
// hang otherwise). That's because walproposer runs the consensus and
|
||||
// synchronizes safekeepers on the most advanced one.
|
||||
//
|
||||
// There is a small risk of this WAL getting concurrently garbaged if
|
||||
// another compute rises which collects majority and starts fixing log
|
||||
// on this safekeeper itself. That's ok as (old) proposer will never be
|
||||
// able to commit such WAL.
|
||||
let stop_pos: Option<Lsn> = if spg.appname == Some("wal_proposer_recovery".to_string()) {
|
||||
Some(wal_end)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
info!("Start replication from {:?} till {:?}", start_pos, stop_pos);
|
||||
|
||||
// switch to copy
|
||||
pgb.write_message(&BeMessage::CopyBothResponse)?;
|
||||
Ok(commit_lsn)
|
||||
})
|
||||
.await;
|
||||
|
||||
let mut end_pos = Lsn(0);
|
||||
|
||||
let mut wal_reader = WalReader::new(
|
||||
spg.conf.timeline_dir(&spg.timeline.get().zttid),
|
||||
wal_seg_size,
|
||||
start_pos,
|
||||
);
|
||||
|
||||
// buffer for wal sending, limited by MAX_SEND_SIZE
|
||||
let mut send_buf = vec![0u8; MAX_SEND_SIZE];
|
||||
|
||||
loop {
|
||||
if let Some(stop_pos) = stop_pos {
|
||||
if start_pos >= stop_pos {
|
||||
break; /* recovery finished */
|
||||
}
|
||||
end_pos = stop_pos;
|
||||
} else {
|
||||
/* Wait until we have some data to stream */
|
||||
let lsn = spg.timeline.get().wait_for_lsn(start_pos);
|
||||
|
||||
if let Some(lsn) = lsn {
|
||||
end_pos = lsn;
|
||||
} else {
|
||||
// TODO: also check once in a while whether we are walsender
|
||||
// to right pageserver.
|
||||
if spg.timeline.get().stop_walsender(replica_id)? {
|
||||
// Shut down, timeline is suspended.
|
||||
// TODO create proper error type for this
|
||||
bail!("end streaming to {:?}", spg.appname);
|
||||
}
|
||||
|
||||
// timeout expired: request pageserver status
|
||||
pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
|
||||
sent_ptr: end_pos.0,
|
||||
timestamp: get_current_timestamp(),
|
||||
request_reply: true,
|
||||
}))
|
||||
.context("Failed to send KeepAlive message")?;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize;
|
||||
let send_size = min(send_size, send_buf.len());
|
||||
|
||||
let send_buf = &mut send_buf[..send_size];
|
||||
|
||||
// read wal into buffer
|
||||
let send_size = wal_reader.read(send_buf)?;
|
||||
let send_buf = &send_buf[..send_size];
|
||||
|
||||
// Write some data to the network socket.
|
||||
pgb.write_message(&BeMessage::XLogData(XLogDataBody {
|
||||
wal_start: start_pos.0,
|
||||
wal_end: end_pos.0,
|
||||
timestamp: get_current_timestamp(),
|
||||
data: send_buf,
|
||||
}))
|
||||
.context("Failed to send XLogData")?;
|
||||
|
||||
start_pos += send_size as u64;
|
||||
trace!("sent WAL up to {}", start_pos);
|
||||
}
|
||||
Ok(())
|
||||
match res {
|
||||
// success
|
||||
Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
|
||||
// error inside closure
|
||||
Ok(Err(err)) => Err(err),
|
||||
// timeout
|
||||
Err(_) => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,8 +14,8 @@ use std::cmp::{max, min};
|
||||
use std::collections::HashMap;
|
||||
use std::fs::{self};
|
||||
|
||||
use std::sync::{Arc, Condvar, Mutex, MutexGuard};
|
||||
use std::time::Duration;
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing::*;
|
||||
|
||||
@@ -37,8 +37,6 @@ use crate::wal_storage;
|
||||
use crate::wal_storage::Storage as wal_storage_iface;
|
||||
use crate::SafeKeeperConf;
|
||||
|
||||
const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Replica status update + hot standby feedback
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct ReplicaState {
|
||||
@@ -77,9 +75,6 @@ impl ReplicaState {
|
||||
struct SharedState {
|
||||
/// Safekeeper object
|
||||
sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
|
||||
/// For receiving-sending wal cooperation
|
||||
/// quorum commit LSN we've notified walsenders about
|
||||
notified_commit_lsn: Lsn,
|
||||
/// State of replicas
|
||||
replicas: Vec<Option<ReplicaState>>,
|
||||
/// True when WAL backup launcher oversees the timeline, making sure WAL is
|
||||
@@ -112,7 +107,6 @@ impl SharedState {
|
||||
let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?;
|
||||
|
||||
Ok(Self {
|
||||
notified_commit_lsn: Lsn(0),
|
||||
sk,
|
||||
replicas: Vec::new(),
|
||||
wal_backup_active: false,
|
||||
@@ -131,7 +125,6 @@ impl SharedState {
|
||||
info!("timeline {} restored", zttid.timeline_id);
|
||||
|
||||
Ok(Self {
|
||||
notified_commit_lsn: Lsn(0),
|
||||
sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?,
|
||||
replicas: Vec::new(),
|
||||
wal_backup_active: false,
|
||||
@@ -271,8 +264,6 @@ pub struct Timeline {
|
||||
/// For breeding receivers.
|
||||
commit_lsn_watch_rx: watch::Receiver<Lsn>,
|
||||
mutex: Mutex<SharedState>,
|
||||
/// conditional variable used to notify wal senders
|
||||
cond: Condvar,
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
@@ -289,7 +280,6 @@ impl Timeline {
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
mutex: Mutex::new(shared_state),
|
||||
cond: Condvar::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -333,7 +323,7 @@ impl Timeline {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
if shared_state.num_computes == 0 {
|
||||
let replica_state = shared_state.replicas[replica_id].unwrap();
|
||||
let stop = shared_state.notified_commit_lsn == Lsn(0) || // no data at all yet
|
||||
let stop = shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
|
||||
(replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet.
|
||||
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
|
||||
if stop {
|
||||
@@ -405,39 +395,6 @@ impl Timeline {
|
||||
})
|
||||
}
|
||||
|
||||
/// Timed wait for an LSN to be committed.
|
||||
///
|
||||
/// Returns the last committed LSN, which will be at least
|
||||
/// as high as the LSN waited for, or None if timeout expired.
|
||||
///
|
||||
pub fn wait_for_lsn(&self, lsn: Lsn) -> Option<Lsn> {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
loop {
|
||||
let commit_lsn = shared_state.notified_commit_lsn;
|
||||
// This must be `>`, not `>=`.
|
||||
if commit_lsn > lsn {
|
||||
return Some(commit_lsn);
|
||||
}
|
||||
let result = self
|
||||
.cond
|
||||
.wait_timeout(shared_state, POLL_STATE_TIMEOUT)
|
||||
.unwrap();
|
||||
if result.1.timed_out() {
|
||||
return None;
|
||||
}
|
||||
shared_state = result.0
|
||||
}
|
||||
}
|
||||
|
||||
// Notify caught-up WAL senders about new WAL data received
|
||||
// TODO: replace-unify it with commit_lsn_watch.
|
||||
fn notify_wal_senders(&self, shared_state: &mut MutexGuard<SharedState>) {
|
||||
if shared_state.notified_commit_lsn < shared_state.sk.inmem.commit_lsn {
|
||||
shared_state.notified_commit_lsn = shared_state.sk.inmem.commit_lsn;
|
||||
self.cond.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
|
||||
self.commit_lsn_watch_rx.clone()
|
||||
}
|
||||
@@ -462,8 +419,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
// Ping wal sender that new data might be available.
|
||||
self.notify_wal_senders(&mut shared_state);
|
||||
commit_lsn = shared_state.sk.inmem.commit_lsn;
|
||||
}
|
||||
self.commit_lsn_watch_tx.send(commit_lsn)?;
|
||||
@@ -524,7 +479,6 @@ impl Timeline {
|
||||
return Ok(());
|
||||
}
|
||||
shared_state.sk.record_safekeeper_info(sk_info)?;
|
||||
self.notify_wal_senders(&mut shared_state);
|
||||
is_wal_backup_action_pending = shared_state.update_status(self.zttid);
|
||||
commit_lsn = shared_state.sk.inmem.commit_lsn;
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ use anyhow::{Context, Result};
|
||||
use etcd_broker::subscription_key::{
|
||||
NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind,
|
||||
};
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use std::cmp::min;
|
||||
@@ -10,7 +11,9 @@ use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI};
|
||||
use postgres_ffi::xlog_utils::{
|
||||
XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, MAX_SEND_SIZE, PG_TLI,
|
||||
};
|
||||
use remote_storage::{GenericRemoteStorage, RemoteStorage};
|
||||
use tokio::fs::File;
|
||||
use tokio::runtime::Builder;
|
||||
@@ -445,3 +448,49 @@ async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn read_object(
|
||||
file_path: PathBuf,
|
||||
offset: u64,
|
||||
) -> (impl AsyncRead, JoinHandle<Result<()>>) {
|
||||
let storage = REMOTE_STORAGE.get().expect("failed to get remote storage");
|
||||
|
||||
let (mut pipe_writer, pipe_reader) = tokio::io::duplex(MAX_SEND_SIZE);
|
||||
|
||||
let copy_result = tokio::spawn(async move {
|
||||
let res = match storage.as_ref().unwrap() {
|
||||
GenericRemoteStorage::Local(local_storage) => {
|
||||
let source = local_storage.remote_object_id(&file_path)?;
|
||||
|
||||
info!(
|
||||
"local download about to start from {} at offset {}",
|
||||
source.display(),
|
||||
offset
|
||||
);
|
||||
local_storage
|
||||
.download_byte_range(&source, offset, None, &mut pipe_writer)
|
||||
.await
|
||||
}
|
||||
GenericRemoteStorage::S3(s3_storage) => {
|
||||
let s3key = s3_storage.remote_object_id(&file_path)?;
|
||||
|
||||
info!(
|
||||
"S3 download about to start from {:?} at offset {}",
|
||||
s3key, offset
|
||||
);
|
||||
s3_storage
|
||||
.download_byte_range(&s3key, offset, None, &mut pipe_writer)
|
||||
.await
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = res {
|
||||
error!("failed to download WAL segment from remote storage: {}", e);
|
||||
Err(e)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
});
|
||||
|
||||
(pipe_reader, copy_result)
|
||||
}
|
||||
|
||||
@@ -8,7 +8,9 @@
|
||||
//! Note that last file has `.partial` suffix, that's different from postgres.
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use std::io::{Read, Seek, SeekFrom};
|
||||
use std::io::{self, Seek, SeekFrom};
|
||||
use std::pin::Pin;
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use postgres_ffi::xlog_utils::{
|
||||
@@ -26,6 +28,7 @@ use utils::{lsn::Lsn, zid::ZTenantTimelineId};
|
||||
|
||||
use crate::safekeeper::SafeKeeperState;
|
||||
|
||||
use crate::wal_backup::read_object;
|
||||
use crate::SafeKeeperConf;
|
||||
use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ};
|
||||
|
||||
@@ -33,6 +36,8 @@ use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
|
||||
use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
||||
|
||||
lazy_static! {
|
||||
// The prometheus crate does not support u64 yet, i64 only (see `IntGauge`).
|
||||
// i64 is faster than f64, so update to u64 when available.
|
||||
@@ -504,69 +509,125 @@ pub struct WalReader {
|
||||
timeline_dir: PathBuf,
|
||||
wal_seg_size: usize,
|
||||
pos: Lsn,
|
||||
file: Option<File>,
|
||||
wal_segment: Option<Pin<Box<dyn AsyncRead>>>,
|
||||
|
||||
enable_remote_read: bool,
|
||||
// S3 will be used to read WAL if LSN is not available locally
|
||||
local_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl WalReader {
|
||||
pub fn new(timeline_dir: PathBuf, wal_seg_size: usize, pos: Lsn) -> Self {
|
||||
Self {
|
||||
timeline_dir,
|
||||
wal_seg_size,
|
||||
pos,
|
||||
file: None,
|
||||
pub fn new(
|
||||
timeline_dir: PathBuf,
|
||||
state: &SafeKeeperState,
|
||||
start_pos: Lsn,
|
||||
enable_remote_read: bool,
|
||||
) -> Result<Self> {
|
||||
if start_pos < state.timeline_start_lsn {
|
||||
bail!(
|
||||
"Requested streaming from {}, which is before the start of the timeline {}",
|
||||
start_pos,
|
||||
state.timeline_start_lsn
|
||||
);
|
||||
}
|
||||
|
||||
if state.server.wal_seg_size == 0
|
||||
|| state.timeline_start_lsn == Lsn(0)
|
||||
|| state.local_start_lsn == Lsn(0)
|
||||
{
|
||||
bail!("state uninitialized, no data to read");
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
timeline_dir,
|
||||
wal_seg_size: state.server.wal_seg_size as usize,
|
||||
pos: start_pos,
|
||||
wal_segment: None,
|
||||
enable_remote_read,
|
||||
local_start_lsn: state.local_start_lsn,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
|
||||
// Take the `File` from `wal_file`, or open a new file.
|
||||
let mut file = match self.file.take() {
|
||||
Some(file) => file,
|
||||
None => {
|
||||
// Open a new file.
|
||||
let segno = self.pos.segment_number(self.wal_seg_size);
|
||||
let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
|
||||
let wal_file_path = self.timeline_dir.join(wal_file_name);
|
||||
Self::open_wal_file(&wal_file_path)?
|
||||
}
|
||||
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
|
||||
let mut wal_segment = match self.wal_segment.take() {
|
||||
Some(reader) => reader,
|
||||
None => self.open_segment().await?,
|
||||
};
|
||||
|
||||
let xlogoff = self.pos.segment_offset(self.wal_seg_size) as usize;
|
||||
|
||||
// How much to read and send in message? We cannot cross the WAL file
|
||||
// boundary, and we don't want send more than provided buffer.
|
||||
let xlogoff = self.pos.segment_offset(self.wal_seg_size) as usize;
|
||||
let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
|
||||
|
||||
// Read some data from the file.
|
||||
let buf = &mut buf[0..send_size];
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))
|
||||
.and_then(|_| file.read_exact(buf))
|
||||
.context("Failed to read data from WAL file")?;
|
||||
|
||||
let send_size = wal_segment.read_exact(buf).await?;
|
||||
self.pos += send_size as u64;
|
||||
|
||||
// Decide whether to reuse this file. If we don't set wal_file here
|
||||
// a new file will be opened next time.
|
||||
// Decide whether to reuse this file. If we don't set wal_segment here
|
||||
// a new reader will be opened next time.
|
||||
if self.pos.segment_offset(self.wal_seg_size) != 0 {
|
||||
self.file = Some(file);
|
||||
self.wal_segment = Some(wal_segment);
|
||||
}
|
||||
|
||||
Ok(send_size)
|
||||
}
|
||||
|
||||
/// Open WAL segment at the current position of the reader.
|
||||
async fn open_segment(&self) -> Result<Pin<Box<dyn AsyncRead>>> {
|
||||
let xlogoff = self.pos.segment_offset(self.wal_seg_size) as usize;
|
||||
let segno = self.pos.segment_number(self.wal_seg_size);
|
||||
let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
|
||||
let wal_file_path = self.timeline_dir.join(wal_file_name);
|
||||
|
||||
// Try to open local file, if we may have WAL locally
|
||||
if self.pos >= self.local_start_lsn {
|
||||
let res = Self::open_wal_file(&wal_file_path).await;
|
||||
match res {
|
||||
Ok(mut file) => {
|
||||
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
|
||||
return Ok(Box::pin(file));
|
||||
}
|
||||
Err(e) => {
|
||||
let is_not_found = e.chain().any(|e| {
|
||||
if let Some(e) = e.downcast_ref::<io::Error>() {
|
||||
e.kind() == io::ErrorKind::NotFound
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
if !is_not_found {
|
||||
return Err(e);
|
||||
}
|
||||
// NotFound is expected, fall through to remote read
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Try to open remote file, if remote reads are enabled
|
||||
if self.enable_remote_read {
|
||||
let (reader, _) = read_object(wal_file_path, xlogoff as u64).await;
|
||||
return Ok(Box::pin(reader));
|
||||
}
|
||||
|
||||
bail!("WAL segment is not found")
|
||||
}
|
||||
|
||||
/// Helper function for opening a wal file.
|
||||
fn open_wal_file(wal_file_path: &Path) -> Result<File> {
|
||||
async fn open_wal_file(wal_file_path: &Path) -> Result<tokio::fs::File> {
|
||||
// First try to open the .partial file.
|
||||
let mut partial_path = wal_file_path.to_owned();
|
||||
partial_path.set_extension("partial");
|
||||
if let Ok(opened_file) = File::open(&partial_path) {
|
||||
if let Ok(opened_file) = tokio::fs::File::open(&partial_path).await {
|
||||
return Ok(opened_file);
|
||||
}
|
||||
|
||||
// If that failed, try it without the .partial extension.
|
||||
File::open(&wal_file_path)
|
||||
tokio::fs::File::open(&wal_file_path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open WAL file {:?}", wal_file_path))
|
||||
.map_err(|e| {
|
||||
error!("{}", e);
|
||||
warn!("{}", e);
|
||||
e
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import pytest
|
||||
import random
|
||||
import time
|
||||
import os
|
||||
import shutil
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
@@ -353,7 +354,7 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize('auth_enabled', [False, True])
|
||||
def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
neon_env_builder.num_safekeepers = 2
|
||||
# to advance remote_consistent_llsn
|
||||
# to advance remote_consistent_lsn
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
neon_env_builder.auth_enabled = auth_enabled
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -437,6 +438,26 @@ def wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end):
|
||||
time.sleep(0.5)
|
||||
|
||||
|
||||
def wait_wal_trim(tenant_id, timeline_id, sk, target_size):
|
||||
started_at = time.time()
|
||||
http_cli = sk.http_client()
|
||||
while True:
|
||||
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
|
||||
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), tenant_id,
|
||||
timeline_id)) / 1024 / 1024
|
||||
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size:.2f}MB status={tli_status}")
|
||||
|
||||
if sk_wal_size <= target_size:
|
||||
break
|
||||
|
||||
elapsed = time.time() - started_at
|
||||
if elapsed > 20:
|
||||
raise RuntimeError(
|
||||
f"timed out waiting {elapsed:.0f}s for sk_id={sk.id} to trim WAL to {target_size:.2f}MB, current size is {sk_wal_size:.2f}MB"
|
||||
)
|
||||
time.sleep(0.5)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs'])
|
||||
def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
@@ -485,6 +506,81 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str):
|
||||
wait_segment_offload(tenant_id, timeline_id, env.safekeepers[1], '0/5000000')
|
||||
|
||||
|
||||
@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs'])
|
||||
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, storage_type: str):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
if storage_type == 'local_fs':
|
||||
neon_env_builder.enable_local_fs_remote_storage()
|
||||
elif storage_type == 'mock_s3':
|
||||
neon_env_builder.enable_s3_mock_remote_storage('test_s3_wal_replay')
|
||||
else:
|
||||
raise RuntimeError(f'Unknown storage type: {storage_type}')
|
||||
neon_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch('test_s3_wal_replay')
|
||||
|
||||
env.pageserver.stop()
|
||||
pageserver_tenants_dir = os.path.join(env.repo_dir, 'tenants')
|
||||
pageserver_fresh_copy = os.path.join(env.repo_dir, 'tenants_fresh')
|
||||
log.info(f"Creating a copy of pageserver in a fresh state at {pageserver_fresh_copy}")
|
||||
shutil.copytree(pageserver_tenants_dir, pageserver_fresh_copy)
|
||||
env.pageserver.start()
|
||||
|
||||
pg = env.postgres.create_start('test_s3_wal_replay')
|
||||
|
||||
# learn neon timeline from compute
|
||||
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
|
||||
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
|
||||
|
||||
expected_sum = 0
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("create table t(key int, value text)")
|
||||
cur.execute("insert into t values (1, 'payload')")
|
||||
expected_sum += 1
|
||||
|
||||
offloaded_seg_end = ['0/3000000']
|
||||
for seg_end in offloaded_seg_end:
|
||||
# roughly fills two segments
|
||||
cur.execute("insert into t select generate_series(1,500000), 'payload'")
|
||||
expected_sum += 500000 * 500001 // 2
|
||||
|
||||
cur.execute("select sum(key) from t")
|
||||
assert cur.fetchone()[0] == expected_sum
|
||||
|
||||
for sk in env.safekeepers:
|
||||
wait_segment_offload(tenant_id, timeline_id, sk, seg_end)
|
||||
|
||||
# advance remote_consistent_lsn to trigger WAL trimming
|
||||
# this LSN should be less than commit_lsn, so timeline will be active=true in safekeepers, to push etcd updates
|
||||
env.safekeepers[0].http_client().record_safekeeper_info(
|
||||
tenant_id, timeline_id, {'remote_consistent_lsn': offloaded_seg_end[-1]})
|
||||
|
||||
for sk in env.safekeepers:
|
||||
# require WAL to be trimmed, so no more than one segment is left on disk
|
||||
wait_wal_trim(tenant_id, timeline_id, sk, 16 * 1.5)
|
||||
|
||||
# replace pageserver with a fresh copy
|
||||
pg.stop_and_destroy()
|
||||
env.pageserver.stop()
|
||||
|
||||
log.info(f'Removing current pageserver state at {pageserver_tenants_dir}')
|
||||
shutil.rmtree(pageserver_tenants_dir)
|
||||
log.info(f'Copying fresh pageserver state from {pageserver_fresh_copy}')
|
||||
shutil.move(pageserver_fresh_copy, pageserver_tenants_dir)
|
||||
|
||||
# start everything, verify data
|
||||
env.pageserver.start()
|
||||
pg.create_start('test_s3_wal_replay')
|
||||
|
||||
with closing(pg.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("select sum(key) from t")
|
||||
assert cur.fetchone()[0] == expected_sum
|
||||
|
||||
|
||||
class ProposerPostgres(PgProtocol):
|
||||
"""Object for running postgres without NeonEnv"""
|
||||
def __init__(self,
|
||||
|
||||
Reference in New Issue
Block a user