diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 43514997d4..97fb3654d2 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -124,7 +124,7 @@ fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: L term, start_streaming_at: lsn, term_history: history, - timeline_start_lsn: Lsn(0), + timeline_start_lsn: lsn, }); spg.timeline.get().process_msg(&proposer_elected_request)?; diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 11e5b963c9..a6b9de2050 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -13,9 +13,11 @@ use serde::{Deserialize, Serialize}; use std::cmp::min; use std::net::Shutdown; use std::sync::Arc; -use std::thread::sleep; use std::time::Duration; use std::{str, thread}; + +use tokio::sync::watch::Receiver; +use tokio::time::timeout; use tracing::*; use utils::{ bin_ser::BeSer, @@ -191,100 +193,143 @@ impl ReplicationConn { } })?; - let mut wal_seg_size: usize; - loop { - wal_seg_size = spg.timeline.get().get_state().1.server.wal_seg_size as usize; - if wal_seg_size == 0 { - error!("Cannot start replication before connecting to wal_proposer"); - sleep(Duration::from_secs(1)); + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + runtime.block_on(async move { + let (_, persisted_state) = spg.timeline.get().get_state(); + if persisted_state.server.wal_seg_size == 0 + || persisted_state.timeline_start_lsn == Lsn(0) + { + bail!("Cannot start replication before connecting to walproposer"); + } + + let wal_end = spg.timeline.get().get_end_of_wal(); + // Walproposer gets special handling: safekeeper must give proposer all + // local WAL till the end, whether committed or not (walproposer will + // hang otherwise). That's because walproposer runs the consensus and + // synchronizes safekeepers on the most advanced one. + // + // There is a small risk of this WAL getting concurrently garbaged if + // another compute rises which collects majority and starts fixing log + // on this safekeeper itself. That's ok as (old) proposer will never be + // able to commit such WAL. + let stop_pos: Option = if spg.appname == Some("wal_proposer_recovery".to_string()) + { + Some(wal_end) } else { + None + }; + + info!("Start replication from {:?} till {:?}", start_pos, stop_pos); + + // switch to copy + pgb.write_message(&BeMessage::CopyBothResponse)?; + + let mut end_pos = Lsn(0); + + let mut wal_reader = WalReader::new( + spg.conf.timeline_dir(&spg.timeline.get().zttid), + &persisted_state, + start_pos, + spg.conf.wal_backup_enabled, + )?; + + // buffer for wal sending, limited by MAX_SEND_SIZE + let mut send_buf = vec![0u8; MAX_SEND_SIZE]; + + // watcher for commit_lsn updates + let mut commit_lsn_watch_rx = spg.timeline.get().get_commit_lsn_watch_rx(); + + loop { + if let Some(stop_pos) = stop_pos { + if start_pos >= stop_pos { + break; /* recovery finished */ + } + end_pos = stop_pos; + } else { + /* Wait until we have some data to stream */ + let lsn = wait_for_lsn(&mut commit_lsn_watch_rx, start_pos).await?; + + if let Some(lsn) = lsn { + end_pos = lsn; + } else { + // TODO: also check once in a while whether we are walsender + // to right pageserver. + if spg.timeline.get().stop_walsender(replica_id)? { + // Shut down, timeline is suspended. + // TODO create proper error type for this + bail!("end streaming to {:?}", spg.appname); + } + + // timeout expired: request pageserver status + pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive { + sent_ptr: end_pos.0, + timestamp: get_current_timestamp(), + request_reply: true, + })) + .context("Failed to send KeepAlive message")?; + continue; + } + } + + let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; + let send_size = min(send_size, send_buf.len()); + + let send_buf = &mut send_buf[..send_size]; + + // read wal into buffer + let send_size = wal_reader.read(send_buf).await?; + let send_buf = &send_buf[..send_size]; + + // Write some data to the network socket. + pgb.write_message(&BeMessage::XLogData(XLogDataBody { + wal_start: start_pos.0, + wal_end: end_pos.0, + timestamp: get_current_timestamp(), + data: send_buf, + })) + .context("Failed to send XLogData")?; + + start_pos += send_size as u64; + trace!("sent WAL up to {}", start_pos); + } + + Ok(()) + }) + } +} + +const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); + +// Wait until we have commit_lsn > lsn or timeout expires. Returns latest commit_lsn. +async fn wait_for_lsn(rx: &mut Receiver, lsn: Lsn) -> Result> { + let commit_lsn: Lsn = *rx.borrow(); + if commit_lsn > lsn { + return Ok(Some(commit_lsn)); + } + + let res = timeout(POLL_STATE_TIMEOUT, async move { + let mut commit_lsn; + loop { + rx.changed().await?; + commit_lsn = *rx.borrow(); + if commit_lsn > lsn { break; } } - let wal_end = spg.timeline.get().get_end_of_wal(); - // Walproposer gets special handling: safekeeper must give proposer all - // local WAL till the end, whether committed or not (walproposer will - // hang otherwise). That's because walproposer runs the consensus and - // synchronizes safekeepers on the most advanced one. - // - // There is a small risk of this WAL getting concurrently garbaged if - // another compute rises which collects majority and starts fixing log - // on this safekeeper itself. That's ok as (old) proposer will never be - // able to commit such WAL. - let stop_pos: Option = if spg.appname == Some("wal_proposer_recovery".to_string()) { - Some(wal_end) - } else { - None - }; - info!("Start replication from {:?} till {:?}", start_pos, stop_pos); - // switch to copy - pgb.write_message(&BeMessage::CopyBothResponse)?; + Ok(commit_lsn) + }) + .await; - let mut end_pos = Lsn(0); - - let mut wal_reader = WalReader::new( - spg.conf.timeline_dir(&spg.timeline.get().zttid), - wal_seg_size, - start_pos, - ); - - // buffer for wal sending, limited by MAX_SEND_SIZE - let mut send_buf = vec![0u8; MAX_SEND_SIZE]; - - loop { - if let Some(stop_pos) = stop_pos { - if start_pos >= stop_pos { - break; /* recovery finished */ - } - end_pos = stop_pos; - } else { - /* Wait until we have some data to stream */ - let lsn = spg.timeline.get().wait_for_lsn(start_pos); - - if let Some(lsn) = lsn { - end_pos = lsn; - } else { - // TODO: also check once in a while whether we are walsender - // to right pageserver. - if spg.timeline.get().stop_walsender(replica_id)? { - // Shut down, timeline is suspended. - // TODO create proper error type for this - bail!("end streaming to {:?}", spg.appname); - } - - // timeout expired: request pageserver status - pgb.write_message(&BeMessage::KeepAlive(WalSndKeepAlive { - sent_ptr: end_pos.0, - timestamp: get_current_timestamp(), - request_reply: true, - })) - .context("Failed to send KeepAlive message")?; - continue; - } - } - - let send_size = end_pos.checked_sub(start_pos).unwrap().0 as usize; - let send_size = min(send_size, send_buf.len()); - - let send_buf = &mut send_buf[..send_size]; - - // read wal into buffer - let send_size = wal_reader.read(send_buf)?; - let send_buf = &send_buf[..send_size]; - - // Write some data to the network socket. - pgb.write_message(&BeMessage::XLogData(XLogDataBody { - wal_start: start_pos.0, - wal_end: end_pos.0, - timestamp: get_current_timestamp(), - data: send_buf, - })) - .context("Failed to send XLogData")?; - - start_pos += send_size as u64; - trace!("sent WAL up to {}", start_pos); - } - Ok(()) + match res { + // success + Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)), + // error inside closure + Ok(Err(err)) => Err(err), + // timeout + Err(_) => Ok(None), } } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 39f2593dbc..2e415a53d0 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -14,8 +14,8 @@ use std::cmp::{max, min}; use std::collections::HashMap; use std::fs::{self}; -use std::sync::{Arc, Condvar, Mutex, MutexGuard}; -use std::time::Duration; +use std::sync::{Arc, Mutex, MutexGuard}; + use tokio::sync::mpsc::Sender; use tracing::*; @@ -37,8 +37,6 @@ use crate::wal_storage; use crate::wal_storage::Storage as wal_storage_iface; use crate::SafeKeeperConf; -const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1); - /// Replica status update + hot standby feedback #[derive(Debug, Clone, Copy)] pub struct ReplicaState { @@ -77,9 +75,6 @@ impl ReplicaState { struct SharedState { /// Safekeeper object sk: SafeKeeper, - /// For receiving-sending wal cooperation - /// quorum commit LSN we've notified walsenders about - notified_commit_lsn: Lsn, /// State of replicas replicas: Vec>, /// True when WAL backup launcher oversees the timeline, making sure WAL is @@ -112,7 +107,6 @@ impl SharedState { let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?; Ok(Self { - notified_commit_lsn: Lsn(0), sk, replicas: Vec::new(), wal_backup_active: false, @@ -131,7 +125,6 @@ impl SharedState { info!("timeline {} restored", zttid.timeline_id); Ok(Self { - notified_commit_lsn: Lsn(0), sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?, replicas: Vec::new(), wal_backup_active: false, @@ -271,8 +264,6 @@ pub struct Timeline { /// For breeding receivers. commit_lsn_watch_rx: watch::Receiver, mutex: Mutex, - /// conditional variable used to notify wal senders - cond: Condvar, } impl Timeline { @@ -289,7 +280,6 @@ impl Timeline { commit_lsn_watch_tx, commit_lsn_watch_rx, mutex: Mutex::new(shared_state), - cond: Condvar::new(), } } @@ -333,7 +323,7 @@ impl Timeline { let mut shared_state = self.mutex.lock().unwrap(); if shared_state.num_computes == 0 { let replica_state = shared_state.replicas[replica_id].unwrap(); - let stop = shared_state.notified_commit_lsn == Lsn(0) || // no data at all yet + let stop = shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet (replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet. replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn); if stop { @@ -405,39 +395,6 @@ impl Timeline { }) } - /// Timed wait for an LSN to be committed. - /// - /// Returns the last committed LSN, which will be at least - /// as high as the LSN waited for, or None if timeout expired. - /// - pub fn wait_for_lsn(&self, lsn: Lsn) -> Option { - let mut shared_state = self.mutex.lock().unwrap(); - loop { - let commit_lsn = shared_state.notified_commit_lsn; - // This must be `>`, not `>=`. - if commit_lsn > lsn { - return Some(commit_lsn); - } - let result = self - .cond - .wait_timeout(shared_state, POLL_STATE_TIMEOUT) - .unwrap(); - if result.1.timed_out() { - return None; - } - shared_state = result.0 - } - } - - // Notify caught-up WAL senders about new WAL data received - // TODO: replace-unify it with commit_lsn_watch. - fn notify_wal_senders(&self, shared_state: &mut MutexGuard) { - if shared_state.notified_commit_lsn < shared_state.sk.inmem.commit_lsn { - shared_state.notified_commit_lsn = shared_state.sk.inmem.commit_lsn; - self.cond.notify_all(); - } - } - pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver { self.commit_lsn_watch_rx.clone() } @@ -462,8 +419,6 @@ impl Timeline { } } - // Ping wal sender that new data might be available. - self.notify_wal_senders(&mut shared_state); commit_lsn = shared_state.sk.inmem.commit_lsn; } self.commit_lsn_watch_tx.send(commit_lsn)?; @@ -524,7 +479,6 @@ impl Timeline { return Ok(()); } shared_state.sk.record_safekeeper_info(sk_info)?; - self.notify_wal_senders(&mut shared_state); is_wal_backup_action_pending = shared_state.update_status(self.zttid); commit_lsn = shared_state.sk.inmem.commit_lsn; } diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 1d7c8de3b8..8fada70e8b 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -2,6 +2,7 @@ use anyhow::{Context, Result}; use etcd_broker::subscription_key::{ NodeKind, OperationKind, SkOperationKind, SubscriptionKey, SubscriptionKind, }; +use tokio::io::AsyncRead; use tokio::task::JoinHandle; use std::cmp::min; @@ -10,7 +11,9 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; -use postgres_ffi::xlog_utils::{XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, PG_TLI}; +use postgres_ffi::xlog_utils::{ + XLogFileName, XLogSegNo, XLogSegNoOffsetToRecPtr, MAX_SEND_SIZE, PG_TLI, +}; use remote_storage::{GenericRemoteStorage, RemoteStorage}; use tokio::fs::File; use tokio::runtime::Builder; @@ -445,3 +448,49 @@ async fn backup_object(source_file: &Path, size: usize) -> Result<()> { Ok(()) } + +pub async fn read_object( + file_path: PathBuf, + offset: u64, +) -> (impl AsyncRead, JoinHandle>) { + let storage = REMOTE_STORAGE.get().expect("failed to get remote storage"); + + let (mut pipe_writer, pipe_reader) = tokio::io::duplex(MAX_SEND_SIZE); + + let copy_result = tokio::spawn(async move { + let res = match storage.as_ref().unwrap() { + GenericRemoteStorage::Local(local_storage) => { + let source = local_storage.remote_object_id(&file_path)?; + + info!( + "local download about to start from {} at offset {}", + source.display(), + offset + ); + local_storage + .download_byte_range(&source, offset, None, &mut pipe_writer) + .await + } + GenericRemoteStorage::S3(s3_storage) => { + let s3key = s3_storage.remote_object_id(&file_path)?; + + info!( + "S3 download about to start from {:?} at offset {}", + s3key, offset + ); + s3_storage + .download_byte_range(&s3key, offset, None, &mut pipe_writer) + .await + } + }; + + if let Err(e) = res { + error!("failed to download WAL segment from remote storage: {}", e); + Err(e) + } else { + Ok(()) + } + }); + + (pipe_reader, copy_result) +} diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index e3f1ce7333..5cfc96c84b 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -8,7 +8,9 @@ //! Note that last file has `.partial` suffix, that's different from postgres. use anyhow::{anyhow, bail, Context, Result}; -use std::io::{Read, Seek, SeekFrom}; +use std::io::{self, Seek, SeekFrom}; +use std::pin::Pin; +use tokio::io::AsyncRead; use lazy_static::lazy_static; use postgres_ffi::xlog_utils::{ @@ -26,6 +28,7 @@ use utils::{lsn::Lsn, zid::ZTenantTimelineId}; use crate::safekeeper::SafeKeeperState; +use crate::wal_backup::read_object; use crate::SafeKeeperConf; use postgres_ffi::xlog_utils::{XLogFileName, XLOG_BLCKSZ}; @@ -33,6 +36,8 @@ use postgres_ffi::waldecoder::WalStreamDecoder; use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS}; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; + lazy_static! { // The prometheus crate does not support u64 yet, i64 only (see `IntGauge`). // i64 is faster than f64, so update to u64 when available. @@ -504,69 +509,125 @@ pub struct WalReader { timeline_dir: PathBuf, wal_seg_size: usize, pos: Lsn, - file: Option, + wal_segment: Option>>, + + enable_remote_read: bool, + // S3 will be used to read WAL if LSN is not available locally + local_start_lsn: Lsn, } impl WalReader { - pub fn new(timeline_dir: PathBuf, wal_seg_size: usize, pos: Lsn) -> Self { - Self { - timeline_dir, - wal_seg_size, - pos, - file: None, + pub fn new( + timeline_dir: PathBuf, + state: &SafeKeeperState, + start_pos: Lsn, + enable_remote_read: bool, + ) -> Result { + if start_pos < state.timeline_start_lsn { + bail!( + "Requested streaming from {}, which is before the start of the timeline {}", + start_pos, + state.timeline_start_lsn + ); } + + if state.server.wal_seg_size == 0 + || state.timeline_start_lsn == Lsn(0) + || state.local_start_lsn == Lsn(0) + { + bail!("state uninitialized, no data to read"); + } + + Ok(Self { + timeline_dir, + wal_seg_size: state.server.wal_seg_size as usize, + pos: start_pos, + wal_segment: None, + enable_remote_read, + local_start_lsn: state.local_start_lsn, + }) } - pub fn read(&mut self, buf: &mut [u8]) -> Result { - // Take the `File` from `wal_file`, or open a new file. - let mut file = match self.file.take() { - Some(file) => file, - None => { - // Open a new file. - let segno = self.pos.segment_number(self.wal_seg_size); - let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size); - let wal_file_path = self.timeline_dir.join(wal_file_name); - Self::open_wal_file(&wal_file_path)? - } + pub async fn read(&mut self, buf: &mut [u8]) -> Result { + let mut wal_segment = match self.wal_segment.take() { + Some(reader) => reader, + None => self.open_segment().await?, }; - let xlogoff = self.pos.segment_offset(self.wal_seg_size) as usize; - // How much to read and send in message? We cannot cross the WAL file // boundary, and we don't want send more than provided buffer. + let xlogoff = self.pos.segment_offset(self.wal_seg_size) as usize; let send_size = min(buf.len(), self.wal_seg_size - xlogoff); // Read some data from the file. let buf = &mut buf[0..send_size]; - file.seek(SeekFrom::Start(xlogoff as u64)) - .and_then(|_| file.read_exact(buf)) - .context("Failed to read data from WAL file")?; - + let send_size = wal_segment.read_exact(buf).await?; self.pos += send_size as u64; - // Decide whether to reuse this file. If we don't set wal_file here - // a new file will be opened next time. + // Decide whether to reuse this file. If we don't set wal_segment here + // a new reader will be opened next time. if self.pos.segment_offset(self.wal_seg_size) != 0 { - self.file = Some(file); + self.wal_segment = Some(wal_segment); } Ok(send_size) } + /// Open WAL segment at the current position of the reader. + async fn open_segment(&self) -> Result>> { + let xlogoff = self.pos.segment_offset(self.wal_seg_size) as usize; + let segno = self.pos.segment_number(self.wal_seg_size); + let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size); + let wal_file_path = self.timeline_dir.join(wal_file_name); + + // Try to open local file, if we may have WAL locally + if self.pos >= self.local_start_lsn { + let res = Self::open_wal_file(&wal_file_path).await; + match res { + Ok(mut file) => { + file.seek(SeekFrom::Start(xlogoff as u64)).await?; + return Ok(Box::pin(file)); + } + Err(e) => { + let is_not_found = e.chain().any(|e| { + if let Some(e) = e.downcast_ref::() { + e.kind() == io::ErrorKind::NotFound + } else { + false + } + }); + if !is_not_found { + return Err(e); + } + // NotFound is expected, fall through to remote read + } + }; + } + + // Try to open remote file, if remote reads are enabled + if self.enable_remote_read { + let (reader, _) = read_object(wal_file_path, xlogoff as u64).await; + return Ok(Box::pin(reader)); + } + + bail!("WAL segment is not found") + } + /// Helper function for opening a wal file. - fn open_wal_file(wal_file_path: &Path) -> Result { + async fn open_wal_file(wal_file_path: &Path) -> Result { // First try to open the .partial file. let mut partial_path = wal_file_path.to_owned(); partial_path.set_extension("partial"); - if let Ok(opened_file) = File::open(&partial_path) { + if let Ok(opened_file) = tokio::fs::File::open(&partial_path).await { return Ok(opened_file); } // If that failed, try it without the .partial extension. - File::open(&wal_file_path) + tokio::fs::File::open(&wal_file_path) + .await .with_context(|| format!("Failed to open WAL file {:?}", wal_file_path)) .map_err(|e| { - error!("{}", e); + warn!("{}", e); e }) } diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index e4970272d4..05827baf86 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -2,6 +2,7 @@ import pytest import random import time import os +import shutil import signal import subprocess import sys @@ -353,7 +354,7 @@ def test_broker(neon_env_builder: NeonEnvBuilder): @pytest.mark.parametrize('auth_enabled', [False, True]) def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): neon_env_builder.num_safekeepers = 2 - # to advance remote_consistent_llsn + # to advance remote_consistent_lsn neon_env_builder.enable_local_fs_remote_storage() neon_env_builder.auth_enabled = auth_enabled env = neon_env_builder.init_start() @@ -437,6 +438,26 @@ def wait_segment_offload(tenant_id, timeline_id, live_sk, seg_end): time.sleep(0.5) +def wait_wal_trim(tenant_id, timeline_id, sk, target_size): + started_at = time.time() + http_cli = sk.http_client() + while True: + tli_status = http_cli.timeline_status(tenant_id, timeline_id) + sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), tenant_id, + timeline_id)) / 1024 / 1024 + log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size:.2f}MB status={tli_status}") + + if sk_wal_size <= target_size: + break + + elapsed = time.time() - started_at + if elapsed > 20: + raise RuntimeError( + f"timed out waiting {elapsed:.0f}s for sk_id={sk.id} to trim WAL to {target_size:.2f}MB, current size is {sk_wal_size:.2f}MB" + ) + time.sleep(0.5) + + @pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs']) def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str): neon_env_builder.num_safekeepers = 3 @@ -485,6 +506,81 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str): wait_segment_offload(tenant_id, timeline_id, env.safekeepers[1], '0/5000000') +@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs']) +def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, storage_type: str): + neon_env_builder.num_safekeepers = 3 + if storage_type == 'local_fs': + neon_env_builder.enable_local_fs_remote_storage() + elif storage_type == 'mock_s3': + neon_env_builder.enable_s3_mock_remote_storage('test_s3_wal_replay') + else: + raise RuntimeError(f'Unknown storage type: {storage_type}') + neon_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER + + env = neon_env_builder.init_start() + env.neon_cli.create_branch('test_s3_wal_replay') + + env.pageserver.stop() + pageserver_tenants_dir = os.path.join(env.repo_dir, 'tenants') + pageserver_fresh_copy = os.path.join(env.repo_dir, 'tenants_fresh') + log.info(f"Creating a copy of pageserver in a fresh state at {pageserver_fresh_copy}") + shutil.copytree(pageserver_tenants_dir, pageserver_fresh_copy) + env.pageserver.start() + + pg = env.postgres.create_start('test_s3_wal_replay') + + # learn neon timeline from compute + tenant_id = pg.safe_psql("show neon.tenant_id")[0][0] + timeline_id = pg.safe_psql("show neon.timeline_id")[0][0] + + expected_sum = 0 + + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + cur.execute("create table t(key int, value text)") + cur.execute("insert into t values (1, 'payload')") + expected_sum += 1 + + offloaded_seg_end = ['0/3000000'] + for seg_end in offloaded_seg_end: + # roughly fills two segments + cur.execute("insert into t select generate_series(1,500000), 'payload'") + expected_sum += 500000 * 500001 // 2 + + cur.execute("select sum(key) from t") + assert cur.fetchone()[0] == expected_sum + + for sk in env.safekeepers: + wait_segment_offload(tenant_id, timeline_id, sk, seg_end) + + # advance remote_consistent_lsn to trigger WAL trimming + # this LSN should be less than commit_lsn, so timeline will be active=true in safekeepers, to push etcd updates + env.safekeepers[0].http_client().record_safekeeper_info( + tenant_id, timeline_id, {'remote_consistent_lsn': offloaded_seg_end[-1]}) + + for sk in env.safekeepers: + # require WAL to be trimmed, so no more than one segment is left on disk + wait_wal_trim(tenant_id, timeline_id, sk, 16 * 1.5) + + # replace pageserver with a fresh copy + pg.stop_and_destroy() + env.pageserver.stop() + + log.info(f'Removing current pageserver state at {pageserver_tenants_dir}') + shutil.rmtree(pageserver_tenants_dir) + log.info(f'Copying fresh pageserver state from {pageserver_fresh_copy}') + shutil.move(pageserver_fresh_copy, pageserver_tenants_dir) + + # start everything, verify data + env.pageserver.start() + pg.create_start('test_s3_wal_replay') + + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + cur.execute("select sum(key) from t") + assert cur.fetchone()[0] == expected_sum + + class ProposerPostgres(PgProtocol): """Object for running postgres without NeonEnv""" def __init__(self,