From 80512e2779f40af7602fe3221ccc7eaa0499e61e Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 30 Aug 2024 12:35:41 +0300 Subject: [PATCH] safekeeper: add endpoint resetting uploaded partial segment state. Endpoint implementation sends msg to manager requesting to do the reset. Manager stops current partial backup upload task if it exists and performs the reset. Also slightly tweak eviction condition: all full segments before flush_lsn must be uploaded (and committed) and there must be only one segment left on disk (partial). This allows to evict timelines which started not on the first segment and didn't fill the whole segment (previous condition wasn't good because last_removed_segno was 0). ref https://github.com/neondatabase/neon/issues/8759 --- safekeeper/src/http/routes.rs | 23 ++++++ safekeeper/src/timeline.rs | 4 + safekeeper/src/timeline_eviction.rs | 21 +++-- safekeeper/src/timeline_manager.rs | 88 ++++++++++++++++++-- safekeeper/src/wal_backup.rs | 6 +- safekeeper/src/wal_backup_partial.rs | 101 +++++++++++++++++------ test_runner/fixtures/neon_fixtures.py | 6 +- test_runner/fixtures/safekeeper/http.py | 24 ++++++ test_runner/regress/test_wal_acceptor.py | 99 +++++++++++++++++++++- 9 files changed, 325 insertions(+), 47 deletions(-) diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 91ffa95c21..9b7424a818 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -389,6 +389,25 @@ async fn timeline_digest_handler(request: Request) -> Result) -> Result, ApiError> { + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + check_permission(&request, Some(ttid.tenant_id))?; + + let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; + + let response = tli + .backup_partial_reset() + .await + .map_err(ApiError::InternalServerError)?; + json_response(StatusCode::OK, response) +} + /// Used only in tests to hand craft required data. async fn record_safekeeper_info(mut request: Request) -> Result, ApiError> { let ttid = TenantTimelineId::new( @@ -607,6 +626,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder .get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| { request_span(r, timeline_digest_handler) }) + .post( + "/v1/tenant/:tenant_id/timeline/:timeline_id/backup_partial_reset", + |r| request_span(r, timeline_backup_partial_reset), + ) .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| { request_span(r, record_safekeeper_info) }) diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index f7c96d4f02..95ee925e1a 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -908,6 +908,10 @@ impl Timeline { Ok(WalResidentTimeline::new(self.clone(), guard)) } + + pub async fn backup_partial_reset(self: &Arc) -> Result> { + self.manager_ctl.backup_partial_reset().await + } } /// This is a guard that allows to read/write disk timeline state. diff --git a/safekeeper/src/timeline_eviction.rs b/safekeeper/src/timeline_eviction.rs index 2ccb058720..5d0567575c 100644 --- a/safekeeper/src/timeline_eviction.rs +++ b/safekeeper/src/timeline_eviction.rs @@ -28,28 +28,38 @@ impl Manager { /// - control file is flushed (no next event scheduled) /// - no WAL residence guards /// - no pushes to the broker - /// - partial WAL backup is uploaded + /// - last partial WAL segment is uploaded + /// - all local segments before the uploaded partial are committed and uploaded pub(crate) fn ready_for_eviction( &self, next_event: &Option, state: &StateSnapshot, ) -> bool { - self.backup_task.is_none() + let ready = self.backup_task.is_none() && self.recovery_task.is_none() && self.wal_removal_task.is_none() && self.partial_backup_task.is_none() - && self.partial_backup_uploaded.is_some() && next_event.is_none() && self.access_service.is_empty() && !self.tli_broker_active.get() + // Partial segment of current flush_lsn is uploaded up to this flush_lsn. && !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded) + // And it is the next one after the last removed. Given that local + // WAL is removed only after it is uploaded to s3 (and pageserver + // advancing remote_consistent_lsn) which happens only after WAL is + // committed, true means all this is done. + // + // This also works for the first segment despite last_removed_segno + // being 0 on init because this 0 triggers run of wal_removal_task + // on success of which manager updates the horizon. && self .partial_backup_uploaded .as_ref() .unwrap() .flush_lsn .segment_number(self.wal_seg_size) - == self.last_removed_segno + 1 + == self.last_removed_segno + 1; + ready } /// Evict the timeline to remote storage. @@ -83,7 +93,8 @@ impl Manager { info!("successfully evicted timeline"); } - /// Restore evicted timeline from remote storage. + /// Attempt to restore evicted timeline from remote storage; it must be + /// offloaded. #[instrument(name = "unevict_timeline", skip_all)] pub(crate) async fn unevict_timeline(&mut self) { assert!(self.is_offloaded); diff --git a/safekeeper/src/timeline_manager.rs b/safekeeper/src/timeline_manager.rs index 482614fac7..f997f48454 100644 --- a/safekeeper/src/timeline_manager.rs +++ b/safekeeper/src/timeline_manager.rs @@ -11,12 +11,14 @@ use std::{ time::Duration, }; +use futures::channel::oneshot; use postgres_ffi::XLogSegNo; use serde::{Deserialize, Serialize}; use tokio::{ task::{JoinError, JoinHandle}, time::Instant, }; +use tokio_util::sync::CancellationToken; use tracing::{debug, info, info_span, instrument, warn, Instrument}; use utils::lsn::Lsn; @@ -33,7 +35,7 @@ use crate::{ timeline_guard::{AccessService, GuardId, ResidenceGuard}, timelines_set::{TimelineSetGuard, TimelinesSet}, wal_backup::{self, WalBackupTaskHandle}, - wal_backup_partial::{self, PartialRemoteSegment}, + wal_backup_partial::{self, PartialBackup, PartialRemoteSegment}, SafeKeeperConf, }; @@ -96,6 +98,8 @@ pub enum ManagerCtlMessage { GuardRequest(tokio::sync::oneshot::Sender>), /// Request to drop the guard. GuardDrop(GuardId), + /// Request to reset uploaded partial backup state. + BackupPartialReset(oneshot::Sender>>), } impl std::fmt::Debug for ManagerCtlMessage { @@ -103,6 +107,7 @@ impl std::fmt::Debug for ManagerCtlMessage { match self { ManagerCtlMessage::GuardRequest(_) => write!(f, "GuardRequest"), ManagerCtlMessage::GuardDrop(id) => write!(f, "GuardDrop({:?})", id), + ManagerCtlMessage::BackupPartialReset(_) => write!(f, "BackupPartialReset"), } } } @@ -143,6 +148,19 @@ impl ManagerCtl { .and_then(std::convert::identity) } + /// Request timeline manager to reset uploaded partial segment state and + /// wait for the result. + pub async fn backup_partial_reset(&self) -> anyhow::Result> { + let (tx, rx) = oneshot::channel(); + self.manager_tx + .send(ManagerCtlMessage::BackupPartialReset(tx)) + .expect("manager task is not running"); + match rx.await { + Ok(res) => res, + Err(_) => anyhow::bail!("timeline manager is gone"), + } + } + /// Must be called exactly once to bootstrap the manager. pub fn bootstrap_manager( &self, @@ -181,7 +199,8 @@ pub(crate) struct Manager { pub(crate) wal_removal_task: Option>>, // partial backup - pub(crate) partial_backup_task: Option>>, + pub(crate) partial_backup_task: + Option<(JoinHandle>, CancellationToken)>, pub(crate) partial_backup_uploaded: Option, // misc @@ -302,12 +321,12 @@ pub async fn main_task( _ = sleep_until(&next_event) => { // we were waiting for some event (e.g. cfile save) } - res = await_task_finish(&mut mgr.wal_removal_task) => { + res = await_task_finish(mgr.wal_removal_task.as_mut()) => { // WAL removal task finished mgr.wal_removal_task = None; mgr.update_wal_removal_end(res); } - res = await_task_finish(&mut mgr.partial_backup_task) => { + res = await_task_finish(mgr.partial_backup_task.as_mut().map(|(handle, _)| handle)) => { // partial backup task finished mgr.partial_backup_task = None; mgr.update_partial_backup_end(res); @@ -335,8 +354,9 @@ pub async fn main_task( } } - if let Some(partial_backup_task) = &mut mgr.partial_backup_task { - if let Err(e) = partial_backup_task.await { + if let Some((handle, cancel)) = &mut mgr.partial_backup_task { + cancel.cancel(); + if let Err(e) = handle.await { warn!("partial backup task failed: {:?}", e); } } @@ -560,11 +580,14 @@ impl Manager { } // Get WalResidentTimeline and start partial backup task. - self.partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task( + let cancel = CancellationToken::new(); + let handle = tokio::spawn(wal_backup_partial::main_task( self.wal_resident_timeline(), self.conf.clone(), self.global_rate_limiter.clone(), - ))); + cancel.clone(), + )); + self.partial_backup_task = Some((handle, cancel)); } /// Update the state after partial WAL backup task finished. @@ -579,6 +602,39 @@ impl Manager { } } + /// Reset partial backup state and remove its remote storage data. Since it + /// might concurrently uploading something, cancel the task first. + async fn backup_partial_reset(&mut self) -> anyhow::Result> { + info!("resetting partial backup state"); + // Force unevict timeline if it is evicted before erasing partial backup + // state. The intended use of this function is to drop corrupted remote + // state; we haven't enabled local files deletion yet anywhere, + // so direct switch is safe. + if self.is_offloaded { + self.tli.switch_to_present().await?; + // switch manager state as soon as possible + self.is_offloaded = false; + } + + if let Some((handle, cancel)) = &mut self.partial_backup_task { + cancel.cancel(); + info!("cancelled partial backup task, awaiting it"); + // we're going to reset .partial_backup_uploaded to None anyway, so ignore the result + handle.await.ok(); + self.partial_backup_task = None; + } + + let tli = self.wal_resident_timeline(); + let mut partial_backup = PartialBackup::new(tli, self.conf.clone()).await; + // Reset might fail e.g. when cfile is already reset but s3 removal + // failed, so set manager state to None beforehand. In any case caller + // is expected to retry until success. + self.partial_backup_uploaded = None; + let res = partial_backup.reset().await?; + info!("reset is done"); + Ok(res) + } + /// Handle message arrived from ManagerCtl. async fn handle_message(&mut self, msg: Option) { debug!("received manager message: {:?}", msg); @@ -602,6 +658,16 @@ impl Manager { Some(ManagerCtlMessage::GuardDrop(guard_id)) => { self.access_service.drop_guard(guard_id); } + Some(ManagerCtlMessage::BackupPartialReset(tx)) => { + info!("resetting uploaded partial backup state"); + let res = self.backup_partial_reset().await; + if let Err(ref e) = res { + warn!("failed to reset partial backup state: {:?}", e); + } + if tx.send(res).is_err() { + warn!("failed to send partial backup reset result, receiver dropped"); + } + } None => { // can't happen, we're holding the sender unreachable!(); @@ -619,7 +685,11 @@ async fn sleep_until(option: &Option) { } } -async fn await_task_finish(option: &mut Option>) -> Result { +/// Future that resolves when the task is finished or never if the task is None. +/// +/// Note: it accepts Option<&mut> instead of &mut Option<> because mapping the +/// option to get the latter is hard. +async fn await_task_finish(option: Option<&mut JoinHandle>) -> Result { if let Some(task) = option { task.await } else { diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 1c9ec5c007..95012bb004 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -328,11 +328,7 @@ async fn backup_lsn_range( loop { let added_task = match iter.next() { Some(s) => { - uploads.push_back(backup_single_segment( - s, - timeline_dir, - &remote_timeline_path, - )); + uploads.push_back(backup_single_segment(s, timeline_dir, remote_timeline_path)); true } None => false, diff --git a/safekeeper/src/wal_backup_partial.rs b/safekeeper/src/wal_backup_partial.rs index 4022c9409b..4f320f43f8 100644 --- a/safekeeper/src/wal_backup_partial.rs +++ b/safekeeper/src/wal_backup_partial.rs @@ -22,6 +22,7 @@ use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; use remote_storage::RemotePath; use serde::{Deserialize, Serialize}; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, instrument, warn}; use utils::{id::NodeId, lsn::Lsn}; @@ -145,7 +146,7 @@ impl State { } } -struct PartialBackup { +pub struct PartialBackup { wal_seg_size: usize, tli: WalResidentTimeline, conf: SafeKeeperConf, @@ -155,8 +156,25 @@ struct PartialBackup { state: State, } -// Read-only methods for getting segment names impl PartialBackup { + pub async fn new(tli: WalResidentTimeline, conf: SafeKeeperConf) -> PartialBackup { + let (_, persistent_state) = tli.get_state().await; + let wal_seg_size = tli.get_wal_seg_size().await; + + let local_prefix = tli.get_timeline_dir(); + let remote_timeline_path = tli.remote_path.clone(); + + PartialBackup { + wal_seg_size, + tli, + state: persistent_state.partial_backup, + conf, + local_prefix, + remote_timeline_path, + } + } + + // Read-only methods for getting segment names fn segno(&self, lsn: Lsn) -> XLogSegNo { lsn.segment_number(self.wal_seg_size) } @@ -297,6 +315,18 @@ impl PartialBackup { Ok(()) } + // Prepend to the given segments remote prefix and delete them from the + // remote storage. + async fn delete_segments(&self, segments_to_delete: &Vec) -> anyhow::Result<()> { + info!("deleting objects: {:?}", segments_to_delete); + let mut objects_to_delete = vec![]; + for seg in segments_to_delete.iter() { + let remote_path = self.remote_timeline_path.join(seg); + objects_to_delete.push(remote_path); + } + wal_backup::delete_objects(&objects_to_delete).await + } + /// Delete all non-Uploaded segments from the remote storage. There should be only one /// Uploaded segment at a time. #[instrument(name = "gc", skip_all)] @@ -329,15 +359,8 @@ impl PartialBackup { ); } - info!("deleting objects: {:?}", segments_to_delete); - let mut objects_to_delete = vec![]; - for seg in segments_to_delete.iter() { - let remote_path = self.remote_timeline_path.join(seg); - objects_to_delete.push(remote_path); - } - - // removing segments from remote storage - wal_backup::delete_objects(&objects_to_delete).await?; + // execute the deletion + self.delete_segments(&segments_to_delete).await?; // now we can update the state on disk let new_state = { @@ -349,6 +372,27 @@ impl PartialBackup { Ok(()) } + + /// Remove uploaded segment(s) from the state and remote storage. Aimed for + /// manual intervention, not normally needed. + /// Returns list of segments which potentially existed in the remote storage. + pub async fn reset(&mut self) -> anyhow::Result> { + let segments_to_delete = self + .state + .segments + .iter() + .map(|seg| seg.name.clone()) + .collect(); + + // First reset cfile state, and only then objects themselves. If the + // later fails we might leave some garbage behind; that's ok for this + // single time usage. + let new_state = State { segments: vec![] }; + self.commit_state(new_state).await?; + + self.delete_segments(&segments_to_delete).await?; + Ok(segments_to_delete) + } } /// Check if everything is uploaded and partial backup task doesn't need to run. @@ -377,27 +421,16 @@ pub async fn main_task( tli: WalResidentTimeline, conf: SafeKeeperConf, limiter: RateLimiter, + cancel: CancellationToken, ) -> Option { debug!("started"); let await_duration = conf.partial_backup_timeout; let mut first_iteration = true; - let (_, persistent_state) = tli.get_state().await; let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx(); let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx(); - let wal_seg_size = tli.get_wal_seg_size().await; - let local_prefix = tli.get_timeline_dir(); - let remote_timeline_path = tli.remote_path.clone(); - - let mut backup = PartialBackup { - wal_seg_size, - tli, - state: persistent_state.partial_backup, - conf, - local_prefix, - remote_timeline_path, - }; + let mut backup = PartialBackup::new(tli, conf).await; debug!("state: {:?}", backup.state); @@ -427,6 +460,10 @@ pub async fn main_task( && flush_lsn_rx.borrow().term == seg.term { // we have nothing to do, the last segment is already uploaded + debug!( + "exiting, uploaded up to term={} flush_lsn={} commit_lsn={}", + seg.term, seg.flush_lsn, seg.commit_lsn + ); return Some(seg.clone()); } } @@ -438,6 +475,10 @@ pub async fn main_task( info!("timeline canceled"); return None; } + _ = cancel.cancelled() => { + info!("task canceled"); + return None; + } _ = flush_lsn_rx.changed() => {} } } @@ -464,6 +505,10 @@ pub async fn main_task( info!("timeline canceled"); return None; } + _ = cancel.cancelled() => { + info!("task canceled"); + return None; + } _ = commit_lsn_rx.changed() => {} _ = flush_lsn_rx.changed() => { let segno = backup.segno(flush_lsn_rx.borrow().lsn); @@ -486,7 +531,13 @@ pub async fn main_task( } // limit concurrent uploads - let _upload_permit = limiter.acquire_partial_backup().await; + let _upload_permit = tokio::select! { + acq = limiter.acquire_partial_backup() => acq, + _ = cancel.cancelled() => { + info!("task canceled"); + return None; + } + }; let prepared = backup.prepare_upload().await; if let Some(seg) = &uploaded_segment { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 0cbab71cc3..8c99408cfb 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4553,6 +4553,8 @@ class Safekeeper(LogUtils): def timeline_dir(self, tenant_id, timeline_id) -> Path: return self.data_dir / str(tenant_id) / str(timeline_id) + # List partial uploaded segments of this safekeeper. Works only for + # RemoteStorageKind.LOCAL_FS. def list_uploaded_segments(self, tenant_id: TenantId, timeline_id: TimelineId): tline_path = ( self.env.repo_dir @@ -4562,9 +4564,11 @@ class Safekeeper(LogUtils): / str(timeline_id) ) assert isinstance(self.env.safekeepers_remote_storage, LocalFsStorage) - return self._list_segments_in_dir( + segs = self._list_segments_in_dir( tline_path, lambda name: ".metadata" not in name and ".___temp" not in name ) + mysegs = [s for s in segs if f"sk{self.id}" in s] + return mysegs def list_segments(self, tenant_id, timeline_id) -> List[str]: """ diff --git a/test_runner/fixtures/safekeeper/http.py b/test_runner/fixtures/safekeeper/http.py index 05b43cfb72..9bf03554e7 100644 --- a/test_runner/fixtures/safekeeper/http.py +++ b/test_runner/fixtures/safekeeper/http.py @@ -174,6 +174,22 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter): assert isinstance(res_json, dict) return res_json + def debug_dump_timeline( + self, timeline_id: TimelineId, params: Optional[Dict[str, str]] = None + ) -> Any: + params = params or {} + params["timeline_id"] = str(timeline_id) + dump = self.debug_dump(params) + return dump["timelines"][0] + + def get_partial_backup(self, timeline_id: TimelineId) -> Any: + dump = self.debug_dump_timeline(timeline_id, {"dump_control_file": "true"}) + return dump["control_file"]["partial_backup"] + + def get_eviction_state(self, timeline_id: TimelineId) -> Any: + dump = self.debug_dump_timeline(timeline_id, {"dump_control_file": "true"}) + return dump["control_file"]["eviction_state"] + def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]: res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body) res.raise_for_status() @@ -228,6 +244,14 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter): assert isinstance(res_json, dict) return res_json + def backup_partial_reset(self, tenant_id: TenantId, timeline_id: TimelineId): + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/backup_partial_reset", + json={}, + ) + res.raise_for_status() + return res.json() + def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body): res = self.post( f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}", diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 19df834b81..3785651aed 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -72,6 +72,17 @@ def wait_lsn_force_checkpoint( wait_lsn_force_checkpoint_at(lsn, tenant_id, timeline_id, ps, pageserver_conn_options) +def wait_lsn_force_checkpoint_at_sk( + safekeeper: Safekeeper, + tenant_id: TenantId, + timeline_id: TimelineId, + ps: NeonPageserver, + pageserver_conn_options=None, +): + sk_flush_lsn = safekeeper.get_flush_lsn(tenant_id, timeline_id) + wait_lsn_force_checkpoint_at(sk_flush_lsn, tenant_id, timeline_id, ps, pageserver_conn_options) + + def wait_lsn_force_checkpoint_at( lsn: Lsn, tenant_id: TenantId, @@ -79,6 +90,10 @@ def wait_lsn_force_checkpoint_at( ps: NeonPageserver, pageserver_conn_options=None, ): + """ + Wait until pageserver receives given lsn, force checkpoint and wait for + upload, i.e. remote_consistent_lsn advancement. + """ pageserver_conn_options = pageserver_conn_options or {} auth_token = None @@ -2330,6 +2345,77 @@ def test_s3_eviction( assert event_metrics_seen +# Test resetting uploaded partial segment state. +def test_backup_partial_reset(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 1 + neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage()) + # We want to upload/evict quickly, but not too quickly to check that s3 is + # empty before next round of upload happens. + # Note: this test fails with --delete-offloaded-wal, this is expected. + neon_env_builder.safekeeper_extra_opts = [ + "--enable-offload", + "--partial-backup-timeout", + "1s", + "--control-file-save-interval", + "1s", + "--eviction-min-resident=1s", + ] + # XXX: pageserver currently connects to safekeeper as long as connection + # manager doesn't remove its entry (default lagging_wal_timeout is 10s), + # causing uneviction. It should be fixed to not reconnect if last + # remote_consistent_lsn is communicated and there is nothing to fetch. Make + # value lower to speed up the test. + initial_tenant_conf = { + "lagging_wal_timeout": "1s", + } + env = neon_env_builder.init_start(initial_tenant_conf) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + endpoint = env.endpoints.create("main") + endpoint.start() + endpoint.safe_psql("create table t(key int, value text)") + endpoint.stop() + sk = env.safekeepers[0] + # eviction won't happen until remote_consistent_lsn catches up. + wait_lsn_force_checkpoint_at_sk(sk, tenant_id, timeline_id, env.pageserver) + + http_cli = env.safekeepers[0].http_client() + + # wait until eviction happens + def evicted(): + eviction_state = http_cli.get_eviction_state(timeline_id) + log.info(f"eviction_state: {eviction_state}") + if isinstance(eviction_state, str) and eviction_state == "Present": + raise Exception("eviction didn't happen yet") + + wait_until(30, 1, evicted) + # it must have uploaded something + uploaded_segs = sk.list_uploaded_segments(tenant_id, timeline_id) + log.info(f"uploaded segments before reset: {uploaded_segs}") + assert len(uploaded_segs) > 0 + + reset_res = http_cli.backup_partial_reset(tenant_id, timeline_id) + log.info(f"reset res: {reset_res}") + + # Backup_partial_reset must have reset the state and dropped s3 segment. + # + # Note: if listing takes more than --partial-backup-timeout test becomes + # flaky because file might be reuploaded. With local fs it shouldn't be an + # issue, but can add retry if this appears. + uploaded_segs = sk.list_uploaded_segments(tenant_id, timeline_id) + log.info(f"uploaded segments after reset: {uploaded_segs}") + assert len(uploaded_segs) == 0 + + # calling second time should be ok + http_cli.backup_partial_reset(tenant_id, timeline_id) + + # inserting data should be ok + endpoint.start() + endpoint.safe_psql("insert into t values(1, 'hehe')") + + def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilder): """ Verify that pulling timeline from a SK with an uploaded partial segment @@ -2357,7 +2443,16 @@ def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilde "--eviction-min-resident=500ms", ] - env = neon_env_builder.init_start(initial_tenant_conf={"checkpoint_timeout": "100ms"}) + # XXX: pageserver currently connects to safekeeper as long as connection + # manager doesn't remove its entry (default lagging_wal_timeout is 10s), + # causing uneviction. It should be fixed to not reconnect if last + # remote_consistent_lsn is communicated and there is nothing to fetch. Until + # this is fixed make value lower to speed up the test. + initial_tenant_conf = { + "lagging_wal_timeout": "1s", + "checkpoint_timeout": "100ms", + } + env = neon_env_builder.init_start(initial_tenant_conf=initial_tenant_conf) tenant_id = env.initial_tenant timeline_id = env.initial_timeline @@ -2421,7 +2516,7 @@ def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilde endpoint.start(safekeepers=[2, 3]) def new_partial_segment_uploaded(): - segs = src_sk.list_uploaded_segments(tenant_id, timeline_id) + segs = dst_sk.list_uploaded_segments(tenant_id, timeline_id) for seg in segs: if "partial" in seg and "sk3" in seg: return seg