diff --git a/Cargo.lock b/Cargo.lock index 1c8a8b0c0f..5eac648fd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5158,6 +5158,7 @@ dependencies = [ "tokio-io-timeout", "tokio-postgres", "tokio-stream", + "tokio-tar", "tokio-util", "toml_edit", "tracing", diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index c8b732fee1..a650d5e207 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -46,6 +46,7 @@ tokio = { workspace = true, features = ["fs"] } tokio-util = { workspace = true } tokio-io-timeout.workspace = true tokio-postgres.workspace = true +tokio-tar.workspace = true toml_edit.workspace = true tracing.workspace = true url.workspace = true diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index e9bb5202da..9d65187350 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -23,7 +23,7 @@ pub const SK_MAGIC: u32 = 0xcafeceefu32; pub const SK_FORMAT_VERSION: u32 = 8; // contains persistent metadata for safekeeper -const CONTROL_FILE_NAME: &str = "safekeeper.control"; +pub const CONTROL_FILE_NAME: &str = "safekeeper.control"; // needed to atomically update the state using `rename` const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial"; pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 1e29b21fac..40ac2c105d 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -1,38 +1,25 @@ use hyper::{Body, Request, Response, StatusCode, Uri}; - use once_cell::sync::Lazy; -use postgres_ffi::WAL_SEGMENT_SIZE; -use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::fmt; +use std::io::Write as _; use std::str::FromStr; use std::sync::Arc; use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; -use tokio::fs::File; -use tokio::io::AsyncReadExt; +use tokio::sync::mpsc; +use tokio::task; +use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; +use tracing::{info_span, Instrument}; use utils::failpoint_support::failpoints_handler; +use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter}; use utils::http::request::parse_query_param; -use std::io::Write as _; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tracing::{info_span, Instrument}; -use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter}; - -use crate::debug_dump::TimelineDigestRequest; -use crate::receive_wal::WalReceiverState; -use crate::safekeeper::Term; -use crate::safekeeper::{ServerInfo, TermLsn}; -use crate::send_wal::WalSenderState; -use crate::timeline::PeerInfo; -use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline}; - -use crate::timelines_global_map::TimelineDeleteForceResult; -use crate::GlobalTimelines; -use crate::SafeKeeperConf; +use postgres_ffi::WAL_SEGMENT_SIZE; +use safekeeper_api::models::TimelineCreateRequest; +use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest}; use utils::{ auth::SwappableJwtAuth, http::{ @@ -46,7 +33,16 @@ use utils::{ lsn::Lsn, }; -use super::models::TimelineCreateRequest; +use crate::debug_dump::TimelineDigestRequest; +use crate::receive_wal::WalReceiverState; +use crate::safekeeper::Term; +use crate::safekeeper::{ServerInfo, TermLsn}; +use crate::send_wal::WalSenderState; +use crate::timeline::PeerInfo; +use crate::timelines_global_map::TimelineDeleteForceResult; +use crate::GlobalTimelines; +use crate::SafeKeeperConf; +use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline}; #[derive(Debug, Serialize)] struct SafekeeperStatus { @@ -206,6 +202,42 @@ async fn timeline_pull_handler(mut request: Request) -> Result) -> Result, ApiError> { + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + check_permission(&request, Some(ttid.tenant_id))?; + + let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; + // Note: with evicted timelines it should work better then de-evict them and + // stream; probably start_snapshot would copy partial s3 file to dest path + // and stream control file, or return FullAccessTimeline if timeline is not + // evicted. + let tli = tli + .full_access_guard() + .await + .map_err(ApiError::InternalServerError)?; + + // To stream the body use wrap_stream which wants Stream of Result, + // so create the chan and write to it in another task. + let (tx, rx) = mpsc::channel(1); + + task::spawn(pull_timeline::stream_snapshot(tli, tx)); + + let rx_stream = ReceiverStream::new(rx); + let body = Body::wrap_stream(rx_stream); + + let response = Response::builder() + .status(200) + .header(hyper::header::CONTENT_TYPE, "application/octet-stream") + .body(body) + .unwrap(); + + Ok(response) +} + async fn timeline_copy_handler(mut request: Request) -> Result, ApiError> { check_permission(&request, None)?; @@ -260,41 +292,6 @@ async fn timeline_digest_handler(request: Request) -> Result) -> Result, ApiError> { - let ttid = TenantTimelineId::new( - parse_request_param(&request, "tenant_id")?, - parse_request_param(&request, "timeline_id")?, - ); - check_permission(&request, Some(ttid.tenant_id))?; - - let filename: String = parse_request_param(&request, "filename")?; - - let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; - let tli = tli - .full_access_guard() - .await - .map_err(ApiError::InternalServerError)?; - - let filepath = tli.get_timeline_dir().join(filename); - let mut file = File::open(&filepath) - .await - .map_err(|e| ApiError::InternalServerError(e.into()))?; - - let mut content = Vec::new(); - // TODO: don't store files in memory - file.read_to_end(&mut content) - .await - .map_err(|e| ApiError::InternalServerError(e.into()))?; - - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", "application/octet-stream") - .body(Body::from(content)) - .map_err(|e| ApiError::InternalServerError(e.into())) -} - /// Force persist control file. async fn timeline_checkpoint_handler(request: Request) -> Result, ApiError> { check_permission(&request, None)?; @@ -566,13 +563,13 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder .delete("/v1/tenant/:tenant_id", |r| { request_span(r, tenant_delete_handler) }) + .get( + "/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot", + |r| request_span(r, timeline_snapshot_handler), + ) .post("/v1/pull_timeline", |r| { request_span(r, timeline_pull_handler) }) - .get( - "/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename", - |r| request_span(r, timeline_files_handler), - ) .post( "/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy", |r| request_span(r, timeline_copy_handler), diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 7b41c98cb8..4099a324f9 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -1,28 +1,223 @@ -use std::sync::Arc; - +use anyhow::{anyhow, bail, Context, Result}; +use bytes::Bytes; use camino::Utf8PathBuf; use camino_tempfile::Utf8TempDir; use chrono::{DateTime, Utc}; +use futures::{SinkExt, StreamExt, TryStreamExt}; +use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; use serde::{Deserialize, Serialize}; +use std::{ + cmp::min, + io::{self, ErrorKind}, + sync::Arc, +}; +use tokio::{ + fs::{File, OpenOptions}, + io::AsyncWrite, + sync::mpsc, +}; +use tokio_tar::{Archive, Builder}; +use tokio_util::{ + io::{CopyToBytes, SinkWriter}, + sync::PollSender, +}; +use tracing::{error, info, instrument}; -use anyhow::{bail, Context, Result}; -use tokio::io::AsyncWriteExt; -use tracing::info; +use crate::{ + control_file::{self, CONTROL_FILE_NAME}, + debug_dump, + http::routes::TimelineStatus, + safekeeper::Term, + timeline::{get_tenant_dir, get_timeline_dir, FullAccessTimeline, Timeline, TimelineError}, + wal_storage::{self, open_wal_file, Storage}, + GlobalTimelines, SafeKeeperConf, +}; use utils::{ + crashsafe::{durable_rename, fsync_async_opt}, id::{TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, pausable_failpoint, }; -use crate::{ - control_file, debug_dump, - http::routes::TimelineStatus, - timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError}, - wal_storage::{self, Storage}, - GlobalTimelines, SafeKeeperConf, -}; +/// Stream tar archive of timeline to tx. +#[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))] +pub async fn stream_snapshot(tli: FullAccessTimeline, tx: mpsc::Sender>) { + if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await { + // Error type/contents don't matter as they won't can't reach the client + // (hyper likely doesn't do anything with it), but http stream will be + // prematurely terminated. It would be nice to try to send the error in + // trailers though. + tx.send(Err(anyhow!("snapshot failed"))).await.ok(); + error!("snapshot failed: {:#}", e); + } +} -/// Info about timeline on safekeeper ready for reporting. +/// State needed while streaming the snapshot. +pub struct SnapshotContext { + pub from_segno: XLogSegNo, // including + pub upto_segno: XLogSegNo, // including + pub term: Term, + pub last_log_term: Term, + pub flush_lsn: Lsn, + pub wal_seg_size: usize, + // used to remove WAL hold off in Drop. + pub tli: FullAccessTimeline, +} + +impl Drop for SnapshotContext { + fn drop(&mut self) { + // todo: spawn task removing WAL gc hold off + } +} + +pub async fn stream_snapshot_guts( + tli: FullAccessTimeline, + tx: mpsc::Sender>, +) -> Result<()> { + // tokio-tar wants Write implementor, but we have mpsc tx >; + // use SinkWriter as a Write impl. That is, + // - create Sink from the tx. It returns PollSendError if chan is closed. + let sink = PollSender::new(tx); + // - SinkWriter needs sink error to be io one, map it. + let sink_io_err = sink.sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe)); + // - SinkWriter wants sink type to be just Bytes, not Result, so map + // it with with(). Note that with() accepts async function which we don't + // need and allows the map to fail, which we don't need either, but hence + // two Oks. + let oksink = sink_io_err.with(|b: Bytes| async { io::Result::Ok(Result::Ok(b)) }); + // - SinkWriter (not surprisingly) wants sink of &[u8], not bytes, so wrap + // into CopyToBytes. This is a data copy. + let copy_to_bytes = CopyToBytes::new(oksink); + let mut writer = SinkWriter::new(copy_to_bytes); + let pinned_writer = std::pin::pin!(writer); + + // Note that tokio_tar append_* funcs use tokio::io::copy with 8KB buffer + // which is also likely suboptimal. + let mut ar = Builder::new_non_terminated(pinned_writer); + + let bctx = tli.start_snapshot(&mut ar).await?; + pausable_failpoint!("sk-snapshot-after-list-pausable"); + + let tli_dir = tli.get_timeline_dir(); + info!( + "sending {} segments [{:#X}-{:#X}], term={}, last_log_term={}, flush_lsn={}", + bctx.upto_segno - bctx.from_segno + 1, + bctx.from_segno, + bctx.upto_segno, + bctx.term, + bctx.last_log_term, + bctx.flush_lsn, + ); + for segno in bctx.from_segno..=bctx.upto_segno { + let (mut sf, is_partial) = open_wal_file(&tli_dir, segno, bctx.wal_seg_size).await?; + let mut wal_file_name = XLogFileName(PG_TLI, segno, bctx.wal_seg_size); + if is_partial { + wal_file_name.push_str(".partial"); + } + ar.append_file(&wal_file_name, &mut sf).await?; + } + + // Do the term check before ar.finish to make archive corrupted in case of + // term change. Client shouldn't ignore abrupt stream end, but to be sure. + tli.finish_snapshot(&bctx).await?; + + ar.finish().await?; + + Ok(()) +} + +impl FullAccessTimeline { + /// Start streaming tar archive with timeline: + /// 1) stream control file under lock; + /// 2) hold off WAL removal; + /// 3) collect SnapshotContext to understand which WAL segments should be + /// streamed. + /// + /// Snapshot streams data up to flush_lsn. To make this safe, we must check + /// that term doesn't change during the procedure, or we risk sending mix of + /// WAL from different histories. Term is remembered in the SnapshotContext + /// and checked in finish_snapshot. Note that in the last segment some WAL + /// higher than flush_lsn set here might be streamed; that's fine as long as + /// terms doesn't change. + /// + /// Alternatively we could send only up to commit_lsn to get some valid + /// state which later will be recovered by compute, in this case term check + /// is not needed, but we likely don't want that as there might be no + /// compute which could perform the recovery. + /// + /// When returned SnapshotContext is dropped WAL hold is removed. + async fn start_snapshot( + &self, + ar: &mut tokio_tar::Builder, + ) -> Result { + let shared_state = self.read_shared_state().await; + + let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME); + let mut cf = File::open(cf_path).await?; + ar.append_file(CONTROL_FILE_NAME, &mut cf).await?; + + // We need to stream since the oldest segment someone (s3 or pageserver) + // still needs. This duplicates calc_horizon_lsn logic. + let from_lsn = min( + shared_state.sk.state.remote_consistent_lsn, + shared_state.sk.state.backup_lsn, + ); + if from_lsn == Lsn::INVALID { + // this is possible if snapshot is called before handling first + // elected message + bail!("snapshot is called on uninitialized timeline"); + } + let from_segno = from_lsn.segment_number(shared_state.get_wal_seg_size()); + let term = shared_state.sk.get_term(); + let last_log_term = shared_state.sk.get_last_log_term(); + let flush_lsn = shared_state.sk.flush_lsn(); + let upto_segno = flush_lsn.segment_number(shared_state.get_wal_seg_size()); + // have some limit on max number of segments as a sanity check + const MAX_ALLOWED_SEGS: u64 = 1000; + let num_segs = upto_segno - from_segno + 1; + if num_segs > MAX_ALLOWED_SEGS { + bail!( + "snapshot is called on timeline with {} segments, but the limit is {}", + num_segs, + MAX_ALLOWED_SEGS + ); + } + + // TODO: set WAL hold off. + + let bctx = SnapshotContext { + from_segno, + upto_segno, + term, + last_log_term, + flush_lsn, + wal_seg_size: shared_state.get_wal_seg_size(), + tli: self.clone(), + }; + + Ok(bctx) + } + + /// Finish snapshotting: check that term(s) hasn't changed. + /// + /// Note that WAL gc hold off is removed in Drop of SnapshotContext to not + /// forget this if snapshotting fails mid the way. + pub async fn finish_snapshot(&self, bctx: &SnapshotContext) -> Result<()> { + let shared_state = self.read_shared_state().await; + let term = shared_state.sk.get_term(); + let last_log_term = shared_state.sk.get_last_log_term(); + // There are some cases to relax this check (e.g. last_log_term might + // change, but as long as older history is strictly part of new that's + // fine), but there is no need to do it. + if bctx.term != term || bctx.last_log_term != last_log_term { + bail!("term(s) changed during snapshot: were term={}, last_log_term={}, now term={}, last_log_term={}", + bctx.term, bctx.last_log_term, term, last_log_term); + } + Ok(()) + } +} + +/// pull_timeline request body. #[derive(Debug, Serialize, Deserialize)] pub struct Request { pub tenant_id: TenantId, @@ -72,13 +267,15 @@ pub async fn handle_request(request: Request) -> Result { let mut statuses = Vec::new(); for (i, response) in responses.into_iter().enumerate() { - let response = response.context(format!("Failed to get status from {}", http_hosts[i]))?; + let response = response.context(format!("fetching status from {}", http_hosts[i]))?; + response + .error_for_status_ref() + .context(format!("checking status from {}", http_hosts[i]))?; let status: crate::http::routes::TimelineStatus = response.json().await?; statuses.push((status, i)); } // Find the most advanced safekeeper - // TODO: current logic may be wrong, fix it later let (status, i) = statuses .into_iter() .max_by_key(|(status, _)| { @@ -111,95 +308,59 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result let conf = &GlobalTimelines::get_global_config(); - let client = reqwest::Client::new(); - // TODO: don't use debug dump, it should be used only in tests. - // This is a proof of concept, we should figure out a way - // to use scp without implementing it manually. + let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?; - // Implementing our own scp over HTTP. - // At first, we need to fetch list of files from safekeeper. - let dump: DebugDumpResponse = client + let client = reqwest::Client::new(); + + // Request stream with basebackup archive. + let bb_resp = client .get(format!( - "{}/v1/debug_dump?dump_all=true&tenant_id={}&timeline_id={}", + "{}/v1/tenant/{}/timeline/{}/snapshot", host, status.tenant_id, status.timeline_id )) .send() - .await? - .json() .await?; + bb_resp.error_for_status_ref()?; - if dump.timelines.len() != 1 { - bail!( - "expected to fetch single timeline, got {} timelines", - dump.timelines.len() - ); - } + // Make Stream of Bytes from it... + let bb_stream = bb_resp.bytes_stream().map_err(std::io::Error::other); + // and turn it into StreamReader implementing AsyncRead. + let bb_reader = tokio_util::io::StreamReader::new(bb_stream); - let timeline = dump.timelines.into_iter().next().unwrap(); - let disk_content = timeline.disk_content.ok_or(anyhow::anyhow!( - "timeline {} doesn't have disk content", - ttid - ))?; - - let mut filenames = disk_content - .files - .iter() - .map(|file| file.name.clone()) - .collect::>(); - - // Sort filenames to make sure we pull files in correct order - // After sorting, we should have: - // - 000000010000000000000001 - // - ... - // - 000000010000000000000002.partial - // - safekeeper.control - filenames.sort(); - - // safekeeper.control should be the first file, so we need to move it to the beginning - let control_file_index = filenames - .iter() - .position(|name| name == "safekeeper.control") - .ok_or(anyhow::anyhow!("safekeeper.control not found"))?; - filenames.remove(control_file_index); - filenames.insert(0, "safekeeper.control".to_string()); - - pausable_failpoint!("sk-pull-timeline-after-list-pausable"); - - info!( - "downloading {} files from safekeeper {}", - filenames.len(), - host - ); - - let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?; - - // Note: some time happens between fetching list of files and fetching files themselves. - // It's possible that some files will be removed from safekeeper and we will fail to fetch them. - // This function will fail in this case, should be retried by the caller. - for filename in filenames { - let file_path = tli_dir_path.join(&filename); - // /v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename - let http_url = format!( - "{}/v1/tenant/{}/timeline/{}/file/{}", - host, status.tenant_id, status.timeline_id, filename - ); - - let mut file = tokio::fs::File::create(&file_path).await?; - let mut response = client.get(&http_url).send().await?; - if response.status() != reqwest::StatusCode::OK { - bail!( - "pulling file {} failed: status is {}", - filename, - response.status() - ); - } - while let Some(chunk) = response.chunk().await? { - file.write_all(&chunk).await?; - file.flush().await?; + // Extract it on the fly to the disk. We don't use simple unpack() to fsync + // files. + let mut entries = Archive::new(bb_reader).entries()?; + while let Some(base_tar_entry) = entries.next().await { + let mut entry = base_tar_entry?; + let header = entry.header(); + let file_path = header.path()?.into_owned(); + match header.entry_type() { + tokio_tar::EntryType::Regular => { + let utf8_file_path = + Utf8PathBuf::from_path_buf(file_path).expect("non-Unicode path"); + let dst_path = tli_dir_path.join(utf8_file_path); + let mut f = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&dst_path) + .await?; + tokio::io::copy(&mut entry, &mut f).await?; + // fsync the file + f.sync_all().await?; + } + _ => { + bail!( + "entry {} in backup tar archive is of unexpected type: {:?}", + file_path.display(), + header.entry_type() + ); + } } } - // TODO: fsync? + // fsync temp timeline directory to remember its contents. + fsync_async_opt(&tli_dir_path, !conf.no_sync).await?; // Let's create timeline from temp directory and verify that it's correct let (commit_lsn, flush_lsn) = validate_temp_timeline(conf, ttid, &tli_dir_path).await?; @@ -290,7 +451,9 @@ pub async fn load_temp_timeline( ttid, tmp_path, timeline_path ); tokio::fs::create_dir_all(get_tenant_dir(conf, &ttid.tenant_id)).await?; - tokio::fs::rename(tmp_path, &timeline_path).await?; + // fsync tenant dir creation + fsync_async_opt(&conf.workdir, !conf.no_sync).await?; + durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?; let tli = GlobalTimelines::load_timeline(&guard, ttid) .await diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 563dbbe315..ae230960ae 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -780,6 +780,9 @@ where // Initializing backup_lsn is useful to avoid making backup think it should upload 0 segment. state.backup_lsn = max(state.backup_lsn, state.timeline_start_lsn); + // similar for remote_consistent_lsn + state.remote_consistent_lsn = + max(state.remote_consistent_lsn, state.timeline_start_lsn); state.acceptor_state.term_history = msg.term_history.clone(); self.state.finish_change(&state).await?; diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 148a7e90bd..e510a05a32 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -4,7 +4,7 @@ use anyhow::{anyhow, bail, Result}; use camino::Utf8PathBuf; use serde::{Deserialize, Serialize}; -use tokio::fs; +use tokio::fs::{self}; use tokio_util::sync::CancellationToken; use utils::id::TenantId; @@ -225,7 +225,7 @@ impl SharedState { }) } - fn get_wal_seg_size(&self) -> usize { + pub(crate) fn get_wal_seg_size(&self) -> usize { self.sk.state.server.wal_seg_size as usize } diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 45e27e1951..0c1731937c 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -684,13 +684,12 @@ impl WalReader { let xlogoff = self.pos.segment_offset(self.wal_seg_size); let segno = self.pos.segment_number(self.wal_seg_size); let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size); - let wal_file_path = self.timeline_dir.join(&wal_file_name); // Try to open local file, if we may have WAL locally if self.pos >= self.local_start_lsn { - let res = Self::open_wal_file(&wal_file_path).await; + let res = open_wal_file(&self.timeline_dir, segno, self.wal_seg_size).await; match res { - Ok(mut file) => { + Ok((mut file, _)) => { file.seek(SeekFrom::Start(xlogoff as u64)).await?; return Ok(Box::pin(file)); } @@ -718,25 +717,6 @@ impl WalReader { bail!("WAL segment is not found") } - - /// Helper function for opening a wal file. - async fn open_wal_file(wal_file_path: &Utf8Path) -> Result { - // First try to open the .partial file. - let mut partial_path = wal_file_path.to_owned(); - partial_path.set_extension("partial"); - if let Ok(opened_file) = tokio::fs::File::open(&partial_path).await { - return Ok(opened_file); - } - - // If that failed, try it without the .partial extension. - tokio::fs::File::open(&wal_file_path) - .await - .with_context(|| format!("Failed to open WAL file {:?}", wal_file_path)) - .map_err(|e| { - warn!("{}", e); - e - }) - } } /// Zero block for filling created WAL segments. @@ -758,6 +738,34 @@ async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> { Ok(()) } +/// Helper function for opening WAL segment `segno` in `dir`. Returns file and +/// whether it is .partial. +pub(crate) async fn open_wal_file( + timeline_dir: &Utf8Path, + segno: XLogSegNo, + wal_seg_size: usize, +) -> Result<(tokio::fs::File, bool)> { + let (wal_file_path, wal_file_partial_path) = wal_file_paths(timeline_dir, segno, wal_seg_size)?; + + // First try to open the .partial file. + let mut partial_path = wal_file_path.to_owned(); + partial_path.set_extension("partial"); + if let Ok(opened_file) = tokio::fs::File::open(&wal_file_partial_path).await { + return Ok((opened_file, true)); + } + + // If that failed, try it without the .partial extension. + let pf = tokio::fs::File::open(&wal_file_path) + .await + .with_context(|| format!("failed to open WAL file {:#}", wal_file_path)) + .map_err(|e| { + warn!("{}", e); + e + })?; + + Ok((pf, false)) +} + /// Helper returning full path to WAL segment file and its .partial brother. pub fn wal_file_paths( timeline_dir: &Utf8Path, diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index dce30f5388..11aeb8f182 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -317,9 +317,9 @@ def test_broker(neon_env_builder: NeonEnvBuilder): time.sleep(1) # Ensure that safekeepers don't lose remote_consistent_lsn on restart. - # Control file is persisted each 5s. TODO: do that on shutdown and remove sleep. - time.sleep(6) for sk in env.safekeepers: + # force persist cfile + sk.http_client().checkpoint(tenant_id, timeline_id) sk.stop() sk.start() stat_after_restart = [cli.timeline_status(tenant_id, timeline_id) for cli in clients] @@ -1749,11 +1749,11 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 4 env = neon_env_builder.init_start() tenant_id = env.initial_tenant - timeline_id = env.neon_cli.create_branch("test_pull_timeline") + timeline_id = env.initial_timeline log.info("Use only first 3 safekeepers") env.safekeepers[3].stop() - endpoint = env.endpoints.create("test_pull_timeline") + endpoint = env.endpoints.create("main") endpoint.active_safekeepers = [1, 2, 3] endpoint.start() @@ -1787,7 +1787,7 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder): show_statuses(env.safekeepers, tenant_id, timeline_id) log.info("Restarting compute with new config to verify that it works") - endpoint.stop_and_destroy().create("test_pull_timeline") + endpoint.stop_and_destroy().create("main") endpoint.active_safekeepers = [1, 3, 4] endpoint.start() @@ -1836,14 +1836,14 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder): src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id) log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}") - dst_http = dst_sk.http_client() + src_http = src_sk.http_client() # run pull_timeline which will halt before downloading files - dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "pause")) + src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause")) pt_handle = PropagatingThread( target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id) ) pt_handle.start() - dst_sk.wait_until_paused("sk-pull-timeline-after-list-pausable") + src_sk.wait_until_paused("sk-snapshot-after-list-pausable") # ensure segment exists endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'") @@ -1854,7 +1854,7 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder): first_segment_p = src_sk.timeline_dir(tenant_id, timeline_id) / "000000010000000000000001" log.info(f"first segment exist={os.path.exists(first_segment_p)}") - dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "off")) + src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "off")) pt_handle.join() timeline_start_lsn = src_sk.get_timeline_start_lsn(tenant_id, timeline_id) @@ -1883,7 +1883,6 @@ def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder): # enough, so it won't be affected by term change anymore. # # Expected to fail while term check is not implemented. -@pytest.mark.xfail def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder): neon_env_builder.num_safekeepers = 3 neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage()) @@ -1900,14 +1899,14 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder): ep.safe_psql("create table t(key int, value text)") ep.safe_psql("insert into t select generate_series(1, 1000), 'pear'") - dst_http = dst_sk.http_client() + src_http = src_sk.http_client() # run pull_timeline which will halt before downloading files - dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "pause")) + src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "pause")) pt_handle = PropagatingThread( target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id) ) pt_handle.start() - dst_sk.wait_until_paused("sk-pull-timeline-after-list-pausable") + src_sk.wait_until_paused("sk-snapshot-after-list-pausable") src_http = src_sk.http_client() term_before = src_http.timeline_status(tenant_id, timeline_id).term @@ -1922,7 +1921,7 @@ def test_pull_timeline_term_change(neon_env_builder: NeonEnvBuilder): term_after = src_http.timeline_status(tenant_id, timeline_id).term assert term_after > term_before, f"term_after={term_after}, term_before={term_before}" - dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "off")) + src_http.configure_failpoints(("sk-snapshot-after-list-pausable", "off")) with pytest.raises(requests.exceptions.HTTPError): pt_handle.join()