diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 14aef1ee5e..1c9e5bade5 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -1,5 +1,6 @@ use std::cmp::min; use std::io::{self, ErrorKind}; +use std::ops::RangeInclusive; use std::sync::Arc; use anyhow::{Context, Result, anyhow, bail}; @@ -34,7 +35,7 @@ use crate::control_file::CONTROL_FILE_NAME; use crate::state::{EvictionState, TimelinePersistentState}; use crate::timeline::{Timeline, TimelineError, WalResidentTimeline}; use crate::timelines_global_map::{create_temp_timeline_dir, validate_temp_timeline}; -use crate::wal_storage::open_wal_file; +use crate::wal_storage::{open_wal_file, wal_file_paths}; use crate::{GlobalTimelines, debug_dump, wal_backup}; /// Stream tar archive of timeline to tx. @@ -95,8 +96,8 @@ pub async fn stream_snapshot( /// State needed while streaming the snapshot. pub struct SnapshotContext { - pub from_segno: XLogSegNo, // including - pub upto_segno: XLogSegNo, // including + /// The interval of segment numbers. If None, the timeline hasn't had writes yet, so only send the control file + pub from_to_segno: Option>, pub term: Term, pub last_log_term: Term, pub flush_lsn: Lsn, @@ -174,23 +175,35 @@ pub async fn stream_snapshot_resident_guts( .await?; pausable_failpoint!("sk-snapshot-after-list-pausable"); - let tli_dir = tli.get_timeline_dir(); - info!( - "sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}", - bctx.upto_segno - bctx.from_segno + 1, - bctx.from_segno, - bctx.upto_segno, - bctx.term, - bctx.last_log_term, - bctx.flush_lsn, - ); - for segno in bctx.from_segno..=bctx.upto_segno { - let (mut sf, is_partial) = open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await?; - let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size); - if is_partial { - wal_file_name.push_str(".partial"); + if let Some(from_to_segno) = &bctx.from_to_segno { + let tli_dir = tli.get_timeline_dir(); + info!( + "sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}", + from_to_segno.end() - from_to_segno.start() + 1, + from_to_segno.start(), + from_to_segno.end(), + bctx.term, + bctx.last_log_term, + bctx.flush_lsn, + ); + for segno in from_to_segno.clone() { + let Some((mut sf, is_partial)) = + open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await? + else { + // File is not found + let (wal_file_path, _wal_file_partial_path) = + wal_file_paths(&tli_dir, segno, bctx.wal_seg_size); + tracing::warn!("couldn't find WAL segment file {wal_file_path}"); + bail!("couldn't find WAL segment file {wal_file_path}") + }; + let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size); + if is_partial { + wal_file_name.push_str(".partial"); + } + ar.append_file(&wal_file_name, &mut sf).await?; } - ar.append_file(&wal_file_name, &mut sf).await?; + } else { + info!("Not including any segments into the snapshot"); } // Do the term check before ar.finish to make archive corrupted in case of @@ -338,19 +351,26 @@ impl WalResidentTimeline { // removed further than `backup_lsn`. Since we're holding shared_state // lock and setting `wal_removal_on_hold` later, it guarantees that WAL // won't be removed until we're done. + let timeline_state = shared_state.sk.state(); let from_lsn = min( - shared_state.sk.state().remote_consistent_lsn, - shared_state.sk.state().backup_lsn, + timeline_state.remote_consistent_lsn, + timeline_state.backup_lsn, + ); + let flush_lsn = shared_state.sk.flush_lsn(); + let (send_segments, msg) = if from_lsn == Lsn::INVALID { + (false, "snapshot is called on uninitialized timeline") + } else { + (true, "timeline is initialized") + }; + tracing::info!( + remote_consistent_lsn=%timeline_state.remote_consistent_lsn, + backup_lsn=%timeline_state.backup_lsn, + %flush_lsn, + "{msg}" ); - if from_lsn == Lsn::INVALID { - // this is possible if snapshot is called before handling first - // elected message - bail!("snapshot is called on uninitialized timeline"); - } let from_segno = from_lsn.segment_number(wal_seg_size); let term = shared_state.sk.state().acceptor_state.term; let last_log_term = shared_state.sk.last_log_term(); - let flush_lsn = shared_state.sk.flush_lsn(); let upto_segno = flush_lsn.segment_number(wal_seg_size); // have some limit on max number of segments as a sanity check const MAX_ALLOWED_SEGS: u64 = 1000; @@ -376,9 +396,9 @@ impl WalResidentTimeline { drop(shared_state); let tli_copy = self.wal_residence_guard().await?; + let from_to_segno = send_segments.then_some(from_segno..=upto_segno); let bctx = SnapshotContext { - from_segno, - upto_segno, + from_to_segno, term, last_log_term, flush_lsn, diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index da00df2dd7..33310706be 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -9,7 +9,7 @@ use std::cmp::{max, min}; use std::future::Future; -use std::io::{self, SeekFrom}; +use std::io::{ErrorKind, SeekFrom}; use std::pin::Pin; use anyhow::{Context, Result, bail}; @@ -794,26 +794,13 @@ impl WalReader { // Try to open local file, if we may have WAL locally if self.pos >= self.local_start_lsn { - let res = open_wal_file(&self.timeline_dir, segno, self.wal_seg_size).await; - match res { - Ok((mut file, _)) => { - file.seek(SeekFrom::Start(xlogoff as u64)).await?; - return Ok(Box::pin(file)); - } - Err(e) => { - let is_not_found = e.chain().any(|e| { - if let Some(e) = e.downcast_ref::() { - e.kind() == io::ErrorKind::NotFound - } else { - false - } - }); - if !is_not_found { - return Err(e); - } - // NotFound is expected, fall through to remote read - } - }; + let res = open_wal_file(&self.timeline_dir, segno, self.wal_seg_size).await?; + if let Some((mut file, _)) = res { + file.seek(SeekFrom::Start(xlogoff as u64)).await?; + return Ok(Box::pin(file)); + } else { + // NotFound is expected, fall through to remote read + } } // Try to open remote file, if remote reads are enabled @@ -832,26 +819,31 @@ pub(crate) async fn open_wal_file( timeline_dir: &Utf8Path, segno: XLogSegNo, wal_seg_size: usize, -) -> Result<(tokio::fs::File, bool)> { +) -> Result> { let (wal_file_path, wal_file_partial_path) = wal_file_paths(timeline_dir, segno, wal_seg_size); // First try to open the .partial file. let mut partial_path = wal_file_path.to_owned(); partial_path.set_extension("partial"); if let Ok(opened_file) = tokio::fs::File::open(&wal_file_partial_path).await { - return Ok((opened_file, true)); + return Ok(Some((opened_file, true))); } // If that failed, try it without the .partial extension. - let pf = tokio::fs::File::open(&wal_file_path) - .await + let pf_res = tokio::fs::File::open(&wal_file_path).await; + if let Err(e) = &pf_res { + if e.kind() == ErrorKind::NotFound { + return Ok(None); + } + } + let pf = pf_res .with_context(|| format!("failed to open WAL file {wal_file_path:#}")) .map_err(|e| { - warn!("{}", e); + warn!("{e}"); e })?; - Ok((pf, false)) + Ok(Some((pf, false))) } /// Helper returning full path to WAL segment file and its .partial brother. diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 70772766d7..290ebe456b 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -4168,13 +4168,20 @@ class DeletionSubject(Enum): TENANT = "tenant" +class EmptyTimeline(Enum): + EMPTY = "empty" + NONEMPTY = "nonempty" + + @run_only_on_default_postgres("PG version is not interesting here") @pytest.mark.parametrize("restart_storcon", [RestartStorcon.RESTART, RestartStorcon.ONLINE]) @pytest.mark.parametrize("deletetion_subject", [DeletionSubject.TENANT, DeletionSubject.TIMELINE]) +@pytest.mark.parametrize("empty_timeline", [EmptyTimeline.EMPTY, EmptyTimeline.NONEMPTY]) def test_storcon_create_delete_sk_down( neon_env_builder: NeonEnvBuilder, restart_storcon: RestartStorcon, deletetion_subject: DeletionSubject, + empty_timeline: EmptyTimeline, ): """ Test that the storcon can create and delete tenants and timelines with a safekeeper being down. @@ -4226,10 +4233,11 @@ def test_storcon_create_delete_sk_down( ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3]) ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)") - with env.endpoints.create("child_of_main", tenant_id=tenant_id) as ep: - # endpoint should start. - ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3]) - ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)") + if empty_timeline == EmptyTimeline.NONEMPTY: + with env.endpoints.create("child_of_main", tenant_id=tenant_id) as ep: + # endpoint should start. + ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3]) + ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)") env.storage_controller.assert_log_contains("writing pending op for sk id 1") env.safekeepers[0].start()