From 3797566c36c767155648ddde6c21ece4a24827cd Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 20 May 2024 16:21:57 +0300 Subject: [PATCH] safekeeper: test pull_timeline with WAL gc. Do pull_timeline while WAL is being removed. To this end - extract pausable_failpoint to utils, sprinkle pull_timeline with it - add 'checkpoint' sk http endpoint to force WAL removal. After fixing checking for pull file status code test fails so far which is expected. --- libs/utils/src/failpoint_support.rs | 27 +++++++ pageserver/src/tenant.rs | 27 +------ pageserver/src/tenant/delete.rs | 2 +- .../src/tenant/remote_timeline_client.rs | 1 + .../tenant/remote_timeline_client/upload.rs | 2 +- pageserver/src/tenant/tasks.rs | 2 +- pageserver/src/tenant/timeline.rs | 2 +- pageserver/src/tenant/timeline/delete.rs | 2 +- safekeeper/src/http/routes.rs | 24 ++++++ safekeeper/src/pull_timeline.rs | 10 +++ safekeeper/src/remove_wal.rs | 2 +- safekeeper/src/safekeeper.rs | 4 +- safekeeper/src/timeline.rs | 4 +- safekeeper/src/timeline_manager.rs | 2 +- test_runner/fixtures/common_types.py | 5 ++ test_runner/fixtures/neon_fixtures.py | 67 +++++++++++++++- test_runner/fixtures/safekeeper/http.py | 7 ++ test_runner/fixtures/utils.py | 22 ++++++ test_runner/regress/test_wal_acceptor.py | 79 ++++++++++++++++--- 19 files changed, 241 insertions(+), 50 deletions(-) diff --git a/libs/utils/src/failpoint_support.rs b/libs/utils/src/failpoint_support.rs index 8704b72921..870684b399 100644 --- a/libs/utils/src/failpoint_support.rs +++ b/libs/utils/src/failpoint_support.rs @@ -9,6 +9,33 @@ use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; use tracing::*; +/// Declare a failpoint that can use the `pause` failpoint action. +/// We don't want to block the executor thread, hence, spawn_blocking + await. +#[macro_export] +macro_rules! pausable_failpoint { + ($name:literal) => { + if cfg!(feature = "testing") { + tokio::task::spawn_blocking({ + let current = tracing::Span::current(); + move || { + let _entered = current.entered(); + tracing::info!("at failpoint {}", $name); + fail::fail_point!($name); + } + }) + .await + .expect("spawn_blocking"); + } + }; + ($name:literal, $cond:expr) => { + if cfg!(feature = "testing") { + if $cond { + pausable_failpoint!($name) + } + } + }; +} + /// use with fail::cfg("$name", "return(2000)") /// /// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 540eb10ed2..e6bfd57a44 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -42,6 +42,7 @@ use utils::completion; use utils::crashsafe::path_with_suffix_extension; use utils::failpoint_support; use utils::fs_ext; +use utils::pausable_failpoint; use utils::sync::gate::Gate; use utils::sync::gate::GateGuard; use utils::timeout::timeout_cancellable; @@ -122,32 +123,6 @@ use utils::{ lsn::{Lsn, RecordLsn}, }; -/// Declare a failpoint that can use the `pause` failpoint action. -/// We don't want to block the executor thread, hence, spawn_blocking + await. -macro_rules! pausable_failpoint { - ($name:literal) => { - if cfg!(feature = "testing") { - tokio::task::spawn_blocking({ - let current = tracing::Span::current(); - move || { - let _entered = current.entered(); - tracing::info!("at failpoint {}", $name); - fail::fail_point!($name); - } - }) - .await - .expect("spawn_blocking"); - } - }; - ($name:literal, $cond:expr) => { - if cfg!(feature = "testing") { - if $cond { - pausable_failpoint!($name) - } - } - }; -} - pub mod blob_io; pub mod block_io; pub mod vectored_blob_io; diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 3173a33dad..7c6640eaac 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -8,7 +8,7 @@ use tokio::sync::OwnedMutexGuard; use tokio_util::sync::CancellationToken; use tracing::{error, instrument, Instrument}; -use utils::{backoff, completion, crashsafe, fs_ext, id::TimelineId}; +use utils::{backoff, completion, crashsafe, fs_ext, id::TimelineId, pausable_failpoint}; use crate::{ config::PageServerConf, diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 23904b9da4..73438a790f 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -197,6 +197,7 @@ pub(crate) use upload::upload_initdb_dir; use utils::backoff::{ self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; +use utils::pausable_failpoint; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicU32, Ordering}; diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index caa843316f..e8e824f415 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -9,7 +9,7 @@ use std::time::SystemTime; use tokio::fs::{self, File}; use tokio::io::AsyncSeekExt; use tokio_util::sync::CancellationToken; -use utils::backoff; +use utils::{backoff, pausable_failpoint}; use super::Generation; use crate::tenant::remote_timeline_client::{ diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index ba2b8afd03..bf2d8a47b4 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -17,7 +17,7 @@ use crate::tenant::{Tenant, TenantState}; use rand::Rng; use tokio_util::sync::CancellationToken; use tracing::*; -use utils::{backoff, completion}; +use utils::{backoff, completion, pausable_failpoint}; static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 1bdbddd95f..d4f6e25843 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -41,7 +41,7 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::{ bin_ser::BeSer, - fs_ext, + fs_ext, pausable_failpoint, sync::gate::{Gate, GateGuard}, vec_map::VecMap, }; diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index b5dfc86e77..5ca8544d49 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -7,7 +7,7 @@ use anyhow::Context; use pageserver_api::{models::TimelineState, shard::TenantShardId}; use tokio::sync::OwnedMutexGuard; use tracing::{error, info, instrument, Instrument}; -use utils::{crashsafe, fs_ext, id::TimelineId}; +use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint}; use crate::{ config::PageServerConf, diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 808bb1e490..4aacd3421d 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -287,6 +287,26 @@ async fn timeline_files_handler(request: Request) -> Result .map_err(|e| ApiError::InternalServerError(e.into())) } +/// Force persist control file and remove old WAL. +async fn timeline_checkpoint_handler(request: Request) -> Result, ApiError> { + check_permission(&request, None)?; + + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + + let tli = GlobalTimelines::get(ttid)?; + tli.maybe_persist_control_file(true) + .await + .map_err(ApiError::InternalServerError)?; + tli.remove_old_wal() + .await + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, ()) +} + /// Deactivates the timeline and removes its data directory. async fn timeline_delete_handler(mut request: Request) -> Result, ApiError> { let ttid = TenantTimelineId::new( @@ -553,6 +573,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder "/v1/tenant/:tenant_id/timeline/:timeline_id/control_file", |r| request_span(r, patch_control_file_handler), ) + .post( + "/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint", + |r| request_span(r, timeline_checkpoint_handler), + ) // for tests .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| { request_span(r, record_safekeeper_info) diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 93b51f32c0..f7cc40f58a 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -11,6 +11,7 @@ use tracing::info; use utils::{ id::{TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, + pausable_failpoint, }; use crate::{ @@ -162,6 +163,8 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result filenames.remove(control_file_index); filenames.insert(0, "safekeeper.control".to_string()); + pausable_failpoint!("sk-pull-timeline-after-list-pausable"); + info!( "downloading {} files from safekeeper {}", filenames.len(), @@ -183,6 +186,13 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result let mut file = tokio::fs::File::create(&file_path).await?; let mut response = client.get(&http_url).send().await?; + if response.status() != reqwest::StatusCode::OK { + bail!( + "pulling file {} failed: status is {}", + filename, + response.status() + ); + } while let Some(chunk) = response.chunk().await? { file.write_all(&chunk).await?; file.flush().await?; diff --git a/safekeeper/src/remove_wal.rs b/safekeeper/src/remove_wal.rs index 98ce671182..3400eee9b7 100644 --- a/safekeeper/src/remove_wal.rs +++ b/safekeeper/src/remove_wal.rs @@ -15,7 +15,7 @@ pub async fn task_main(_conf: SafeKeeperConf) -> anyhow::Result<()> { for tli in &tlis { let ttid = tli.ttid; async { - if let Err(e) = tli.maybe_persist_control_file().await { + if let Err(e) = tli.maybe_persist_control_file(false).await { warn!("failed to persist control file: {e}"); } if let Err(e) = tli.remove_old_wal().await { diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 4b1481a397..2a620f5fef 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -827,9 +827,9 @@ where /// Persist control file if there is something to save and enough time /// passed after the last save. - pub async fn maybe_persist_inmem_control_file(&mut self) -> Result { + pub async fn maybe_persist_inmem_control_file(&mut self, force: bool) -> Result { const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300); - if self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL { + if !force && self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL { return Ok(false); } let need_persist = self.state.inmem.commit_lsn > self.state.commit_lsn diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 0cc6153373..f30c503382 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -821,9 +821,9 @@ impl Timeline { /// passed after the last save. This helps to keep remote_consistent_lsn up /// to date so that storage nodes restart doesn't cause many pageserver -> /// safekeeper reconnections. - pub async fn maybe_persist_control_file(self: &Arc) -> Result<()> { + pub async fn maybe_persist_control_file(self: &Arc, force: bool) -> Result<()> { let mut guard = self.write_shared_state().await; - let changed = guard.sk.maybe_persist_inmem_control_file().await?; + let changed = guard.sk.maybe_persist_inmem_control_file(force).await?; guard.skip_update = !changed; Ok(()) } diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index e74ba37ad8..ed544352f9 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -106,7 +106,7 @@ pub async fn main_task( if !is_active { // TODO: maybe use tokio::spawn? - if let Err(e) = tli.maybe_persist_control_file().await { + if let Err(e) = tli.maybe_persist_control_file(false).await { warn!("control file save in update_status failed: {:?}", e); } } diff --git a/test_runner/fixtures/common_types.py b/test_runner/fixtures/common_types.py index b5458b5c26..e9be765669 100644 --- a/test_runner/fixtures/common_types.py +++ b/test_runner/fixtures/common_types.py @@ -5,6 +5,8 @@ from typing import Any, Type, TypeVar, Union T = TypeVar("T", bound="Id") +DEFAULT_WAL_SEG_SIZE = 16 * 1024 * 1024 + @total_ordering class Lsn: @@ -67,6 +69,9 @@ class Lsn: def as_int(self) -> int: return self.lsn_int + def segment_lsn(self, seg_sz: int = DEFAULT_WAL_SEG_SIZE) -> "Lsn": + return Lsn(self.lsn_int - (self.lsn_int % seg_sz)) + @dataclass(frozen=True) class Key: diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 36aa18f1f9..c9d0acb967 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3771,7 +3771,7 @@ class SafekeeperPort: @dataclass -class Safekeeper: +class Safekeeper(LogUtils): """An object representing a running safekeeper daemon.""" env: NeonEnv @@ -3779,6 +3779,13 @@ class Safekeeper: id: int running: bool = False + def __init__(self, env: NeonEnv, port: SafekeeperPort, id: int, running: bool = False): + self.env = env + self.port = port + self.id = id + self.running = running + self.logfile = Path(self.data_dir) / f"safekeeper-{id}.log" + def start(self, extra_opts: Optional[List[str]] = None) -> "Safekeeper": assert self.running is False self.env.neon_cli.safekeeper_start(self.id, extra_opts=extra_opts) @@ -3839,11 +3846,38 @@ class Safekeeper: port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled ) + def get_timeline_start_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn: + timeline_status = self.http_client().timeline_status(tenant_id, timeline_id) + timeline_start_lsn = timeline_status.timeline_start_lsn + log.info(f"sk {self.id} timeline start LSN: {timeline_start_lsn}") + return timeline_start_lsn + + def get_flush_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn: + timeline_status = self.http_client().timeline_status(tenant_id, timeline_id) + flush_lsn = timeline_status.flush_lsn + log.info(f"sk {self.id} flush LSN: {flush_lsn}") + return flush_lsn + + def pull_timeline( + self, srcs: list[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId + ) -> Dict[str, Any]: + """ + pull_timeline from srcs to self. + """ + src_https = [f"http://localhost:{sk.port.http}" for sk in srcs] + res = self.http_client().pull_timeline( + {"tenant_id": str(tenant_id), "timeline_id": str(timeline_id), "http_hosts": src_https} + ) + src_ids = [sk.id for sk in srcs] + log.info(f"finished pulling timeline from {src_ids} to {self.id}") + return res + + @property def data_dir(self) -> str: return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}") def timeline_dir(self, tenant_id, timeline_id) -> str: - return os.path.join(self.data_dir(), str(tenant_id), str(timeline_id)) + return os.path.join(self.data_dir, str(tenant_id), str(timeline_id)) def list_segments(self, tenant_id, timeline_id) -> List[str]: """ @@ -3856,6 +3890,35 @@ class Safekeeper: segments.sort() return segments + def checkpoint_up_to(self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn): + """ + Assuming pageserver(s) uploaded to s3 up to `lsn`, + 1) wait for remote_consistent_lsn and wal_backup_lsn on safekeeper to reach it. + 2) checkpoint timeline on safekeeper, which should remove WAL before this LSN. + """ + cli = self.http_client() + + def are_lsns_advanced(): + stat = cli.timeline_status(tenant_id, timeline_id) + log.info( + f"waiting for remote_consistent_lsn and backup_lsn on sk {self.id} to reach {lsn}, currently remote_consistent_lsn={stat.remote_consistent_lsn}, backup_lsn={stat.backup_lsn}" + ) + assert stat.remote_consistent_lsn >= lsn and stat.backup_lsn >= lsn.segment_lsn() + + # xxx: max wait is long because we might be waiting for reconnection from + # pageserver to this safekeeper + wait_until(30, 1, are_lsns_advanced) + cli.checkpoint(tenant_id, timeline_id) + + def wait_until_paused(self, failpoint: str): + msg = f"at failpoint {failpoint}" + + def paused(): + log.info(f"waiting for hitting failpoint {failpoint}") + self.assert_log_contains(msg) + + wait_until(20, 0.5, paused) + class S3Scrubber: def __init__(self, env: NeonEnvBuilder, log_dir: Optional[Path] = None): diff --git a/test_runner/fixtures/safekeeper/http.py b/test_runner/fixtures/safekeeper/http.py index 82148d0556..a5480f557f 100644 --- a/test_runner/fixtures/safekeeper/http.py +++ b/test_runner/fixtures/safekeeper/http.py @@ -177,6 +177,13 @@ class SafekeeperHttpClient(requests.Session): ) res.raise_for_status() + def checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId): + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint", + json={}, + ) + res.raise_for_status() + # only_local doesn't remove segments in the remote storage. def timeline_delete( self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 89e116df28..c05cb3e744 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -560,3 +560,25 @@ def assert_pageserver_backups_equal(left: Path, right: Path, skip_files: Set[str elapsed = time.time() - started_at log.info(f"assert_pageserver_backups_equal completed in {elapsed}s") + + +class PropagatingThread(threading.Thread): + _target: Any + _args: Any + _kwargs: Any + """ + Simple Thread wrapper with join() propagating the possible exception in the thread. + """ + + def run(self): + self.exc = None + try: + self.ret = self._target(*self._args, **self._kwargs) + except BaseException as e: + self.exc = e + + def join(self, timeout=None): + super(PropagatingThread, self).join(timeout) + if self.exc: + raise self.exc + return self.ret diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index ea66eeff63..0c37711f7a 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -23,7 +23,6 @@ from fixtures.log_helper import log from fixtures.metrics import parse_metrics from fixtures.neon_fixtures import ( Endpoint, - NeonEnv, NeonEnvBuilder, NeonPageserver, PgBin, @@ -48,7 +47,7 @@ from fixtures.remote_storage import ( ) from fixtures.safekeeper.http import SafekeeperHttpClient from fixtures.safekeeper.utils import are_walreceivers_absent -from fixtures.utils import get_dir_size, query_scalar, start_in_background +from fixtures.utils import PropagatingThread, get_dir_size, query_scalar, start_in_background def wait_lsn_force_checkpoint( @@ -360,7 +359,7 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): # We will wait for first segment removal. Make sure they exist for starter. first_segments = [ - os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id), "000000010000000000000001") + os.path.join(sk.data_dir, str(tenant_id), str(timeline_id), "000000010000000000000001") for sk in env.safekeepers ] assert all(os.path.exists(p) for p in first_segments) @@ -445,7 +444,7 @@ def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: Tim def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb): http_cli = sk.http_client() tli_status = http_cli.timeline_status(tenant_id, timeline_id) - sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id))) + sk_wal_size = get_dir_size(os.path.join(sk.data_dir, str(tenant_id), str(timeline_id))) sk_wal_size_mb = sk_wal_size / 1024 / 1024 log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}") return sk_wal_size_mb <= target_size_mb @@ -591,10 +590,10 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder): # save the last (partial) file to put it back after recreation; others will be fetched from s3 sk = env.safekeepers[0] - tli_dir = Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) + tli_dir = Path(sk.data_dir) / str(tenant_id) / str(timeline_id) f_partial = Path([f for f in os.listdir(tli_dir) if f.endswith(".partial")][0]) f_partial_path = tli_dir / f_partial - f_partial_saved = Path(sk.data_dir()) / f_partial.name + f_partial_saved = Path(sk.data_dir) / f_partial.name f_partial_path.rename(f_partial_saved) pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version @@ -616,7 +615,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder): cli = sk.http_client() cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn) f_partial_path = ( - Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name + Path(sk.data_dir) / str(tenant_id) / str(timeline_id) / f_partial_saved.name ) shutil.copy(f_partial_saved, f_partial_path) @@ -1631,7 +1630,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): with conn.cursor() as cur: cur.execute("CREATE TABLE t(key int primary key)") sk = env.safekeepers[0] - sk_data_dir = Path(sk.data_dir()) + sk_data_dir = Path(sk.data_dir) if not auth_enabled: sk_http = sk.http_client() sk_http_other = sk_http @@ -1724,9 +1723,6 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): def test_pull_timeline(neon_env_builder: NeonEnvBuilder): - def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str: - return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names]) - def execute_payload(endpoint: Endpoint): with closing(endpoint.connect()) as conn: with conn.cursor() as cur: @@ -1812,6 +1808,67 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder): show_statuses(env.safekeepers, tenant_id, timeline_id) +# Test pull_timeline while concurrently gc'ing WAL on safekeeper: +# 1) Start pull_timeline, listing files to fetch. +# 2) Write segment, do gc. +# 3) Finish pull_timeline. +# 4) Do some write, verify integrity with timeline_digest. +# Expected to fail while holding off WAL gc plus fetching commit_lsn WAL +# segment is not implemented. +@pytest.mark.xfail +def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 3 + neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage()) + env = neon_env_builder.init_start() + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + (src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2]) + + log.info("use only first 2 safekeepers, 3rd will be seeded") + endpoint = env.endpoints.create("main") + endpoint.active_safekeepers = [1, 2] + endpoint.start() + endpoint.safe_psql("create table t(key int, value text)") + endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'") + + src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id) + log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}") + + dst_http = dst_sk.http_client() + # run pull_timeline which will halt before downloading files + dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "pause")) + pt_handle = PropagatingThread( + target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id) + ) + pt_handle.start() + dst_sk.wait_until_paused("sk-pull-timeline-after-list-pausable") + + # ensure segment exists + endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'") + lsn = last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id) + assert lsn > Lsn("0/2000000") + # Checkpoint timeline beyond lsn. + src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn) + first_segment_p = os.path.join( + src_sk.timeline_dir(tenant_id, timeline_id), "000000010000000000000001" + ) + log.info(f"first segment exist={os.path.exists(first_segment_p)}") + + dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "off")) + pt_handle.join() + + timeline_start_lsn = src_sk.get_timeline_start_lsn(tenant_id, timeline_id) + dst_flush_lsn = dst_sk.get_flush_lsn(tenant_id, timeline_id) + log.info(f"flush_lsn on dst after pull_timeline: {dst_flush_lsn}") + assert dst_flush_lsn >= src_flush_lsn + digests = [ + sk.http_client().timeline_digest(tenant_id, timeline_id, timeline_start_lsn, dst_flush_lsn) + for sk in [src_sk, dst_sk] + ] + assert digests[0] == digests[1], f"digest on src is {digests[0]} but on dst is {digests[1]}" + + # In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries # when compute is active, but there are no writes to the timeline. In that case # pageserver should maintain a single connection to safekeeper and don't attempt