From 05f8e6a050fb7af35950e69b30a23be2cc40e78a Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Mon, 25 Apr 2022 16:56:19 +0300 Subject: [PATCH] Use fsync+rename for atomic downloads from remote storage Use failpoint in test_remote_storage to check the behavior --- pageserver/Cargo.toml | 6 +- pageserver/src/bin/pageserver.rs | 7 +- pageserver/src/http/routes.rs | 72 +++++++------- pageserver/src/layered_repository.rs | 2 +- pageserver/src/page_service.rs | 3 + pageserver/src/remote_storage.rs | 18 ++++ pageserver/src/remote_storage/local_fs.rs | 13 +-- pageserver/src/remote_storage/storage_sync.rs | 92 ++++++++++++++--- .../remote_storage/storage_sync/download.rs | 99 +++++++++++++++++-- .../batch_others/test_remote_storage.py | 38 +++++-- 10 files changed, 274 insertions(+), 76 deletions(-) diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 5607baf698..23c16dd5be 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -4,8 +4,12 @@ version = "0.1.0" edition = "2021" [features] -default = [] +# It is simpler infra-wise to have failpoints enabled by default +# It shouldnt affect perf in any way because failpoints +# are not placed in hot code paths +default = ["failpoints"] profiling = ["pprof"] +failpoints = ["fail/failpoints"] [dependencies] chrono = "0.4.19" diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 728dcb53de..01fcc1224f 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -27,7 +27,12 @@ use utils::{ }; fn version() -> String { - format!("{} profiling:{}", GIT_VERSION, cfg!(feature = "profiling")) + format!( + "{} profiling:{} failpoints:{}", + GIT_VERSION, + cfg!(feature = "profiling"), + fail::has_failpoints() + ) } fn main() -> anyhow::Result<()> { diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 311ae5adf4..c589813d69 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -179,43 +179,47 @@ async fn timeline_detail_handler(request: Request) -> Result(local_timeline) + }) + .await + .ok() + .and_then(|r| r.ok()) + .flatten(); - let (local_timeline_info, span) = tokio::task::spawn_blocking(move || { - let entered = span.entered(); - let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; - let local_timeline = { - repo.get_timeline(timeline_id) - .as_ref() - .map(|timeline| { - LocalTimelineInfo::from_repo_timeline( - tenant_id, - timeline_id, - timeline, - include_non_incremental_logical_size, - ) + let remote_timeline_info = { + let remote_index_read = get_state(&request).remote_index.read().await; + remote_index_read + .timeline_entry(&ZTenantTimelineId { + tenant_id, + timeline_id, + }) + .map(|remote_entry| RemoteTimelineInfo { + remote_consistent_lsn: remote_entry.metadata.disk_consistent_lsn(), + awaits_download: remote_entry.awaits_download, }) - .transpose()? }; - Ok::<_, anyhow::Error>((local_timeline, entered.exit())) - }) - .await - .map_err(ApiError::from_err)??; - - let remote_timeline_info = { - let remote_index_read = get_state(&request).remote_index.read().await; - remote_index_read - .timeline_entry(&ZTenantTimelineId { - tenant_id, - timeline_id, - }) - .map(|remote_entry| RemoteTimelineInfo { - remote_consistent_lsn: remote_entry.metadata.disk_consistent_lsn(), - awaits_download: remote_entry.awaits_download, - }) - }; - - let _enter = span.entered(); + (local_timeline_info, remote_timeline_info) + } + .instrument(info_span!("timeline_detail_handler", tenant = %tenant_id, timeline = %timeline_id)) + .await; if local_timeline_info.is_none() && remote_timeline_info.is_none() { return Err(ApiError::NotFound( diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 116fbf03a2..bbeb245f0a 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -721,7 +721,7 @@ impl LayeredRepository { } /// Save timeline metadata to file - fn save_metadata( + pub fn save_metadata( conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: ZTenantId, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 8adbdc5d9d..ec08a840b0 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -667,7 +667,10 @@ impl postgres_backend::Handler for PageServerHandler { // on connect pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("failpoints ") { + ensure!(fail::has_failpoints(), "Cannot manage failpoints because pageserver was compiled without failpoints support"); + let (_, failpoints) = query_string.split_at("failpoints ".len()); + for failpoint in failpoints.split(';') { if let Some((name, actions)) = failpoint.split_once('=') { info!("cfg failpoint: {} {}", name, actions); diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index 39595b7167..cfa09dce14 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -101,6 +101,7 @@ use anyhow::{bail, Context}; use tokio::io; use tracing::{debug, error, info}; +use self::storage_sync::TEMP_DOWNLOAD_EXTENSION; pub use self::{ local_fs::LocalFs, s3_bucket::S3Bucket, @@ -304,12 +305,29 @@ fn collect_timeline_files( } else if is_ephemeral_file(&entry_path.file_name().unwrap().to_string_lossy()) { debug!("skipping ephemeral file {}", entry_path.display()); continue; + } else if entry_path.extension().and_then(ffi::OsStr::to_str) + == Some(TEMP_DOWNLOAD_EXTENSION) + { + info!("removing temp download file at {}", entry_path.display()); + fs::remove_file(&entry_path).with_context(|| { + format!( + "failed to remove temp download file at {}", + entry_path.display() + ) + })?; } else { timeline_files.insert(entry_path); } } } + // FIXME (rodionov) if attach call succeeded, and then pageserver is restarted before download is completed + // then attach is lost. There would be no retries for that, + // initial collect will fail because there is no metadata. + // We either need to start download if we see empty dir after restart or attach caller should + // be aware of that and retry attach if awaits_download for timeline switched from true to false + // but timelinne didnt appear locally. + // Check what happens with remote index in that case. let timeline_metadata_path = match timeline_metadata_path { Some(path) => path, None => bail!("No metadata file found in the timeline directory"), diff --git a/pageserver/src/remote_storage/local_fs.rs b/pageserver/src/remote_storage/local_fs.rs index 952b2e69fe..6772a4fbd6 100644 --- a/pageserver/src/remote_storage/local_fs.rs +++ b/pageserver/src/remote_storage/local_fs.rs @@ -17,6 +17,8 @@ use tokio::{ }; use tracing::*; +use crate::remote_storage::storage_sync::path_with_suffix_extension; + use super::{strip_path_prefix, RemoteStorage, StorageMetadata}; pub struct LocalFs { @@ -114,7 +116,7 @@ impl RemoteStorage for LocalFs { // We need this dance with sort of durable rename (without fsyncs) // to prevent partial uploads. This was really hit when pageserver shutdown // cancelled the upload and partial file was left on the fs - let temp_file_path = path_with_suffix_extension(&target_file_path, ".temp"); + let temp_file_path = path_with_suffix_extension(&target_file_path, "temp"); let mut destination = io::BufWriter::new( fs::OpenOptions::new() .write(true) @@ -299,15 +301,8 @@ impl RemoteStorage for LocalFs { } } -fn path_with_suffix_extension(original_path: &Path, suffix: &str) -> PathBuf { - let mut extension_with_suffix = original_path.extension().unwrap_or_default().to_os_string(); - extension_with_suffix.push(suffix); - - original_path.with_extension(extension_with_suffix) -} - fn storage_metadata_path(original_path: &Path) -> PathBuf { - path_with_suffix_extension(original_path, ".metadata") + path_with_suffix_extension(original_path, "metadata") } fn get_all_files<'a, P>( diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index 20012f32d7..2d3416cd32 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -62,7 +62,9 @@ pub mod index; mod upload; use std::{ + borrow::Cow, collections::{HashMap, HashSet, VecDeque}, + ffi::OsStr, fmt::Debug, num::{NonZeroU32, NonZeroUsize}, ops::ControlFlow, @@ -89,7 +91,10 @@ use self::{ use super::{LocalTimelineInitStatus, LocalTimelineInitStatuses, RemoteStorage, SyncStartupData}; use crate::{ config::PageServerConf, - layered_repository::metadata::{metadata_path, TimelineMetadata}, + layered_repository::{ + metadata::{metadata_path, TimelineMetadata}, + LayeredRepository, + }, repository::TimelineSyncStatusUpdate, tenant_mgr::apply_timeline_sync_status_updates, thread_mgr, @@ -103,6 +108,7 @@ use metrics::{ use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; pub use self::download::download_index_part; +pub use self::download::TEMP_DOWNLOAD_EXTENSION; lazy_static! { static ref REMAINING_SYNC_ITEMS: IntGauge = register_int_gauge!( @@ -782,8 +788,14 @@ where P: Debug + Send + Sync + 'static, S: RemoteStorage + Send + Sync + 'static, { - match download_timeline_layers(storage, current_remote_timeline, sync_id, new_download_data) - .await + match download_timeline_layers( + conf, + storage, + current_remote_timeline, + sync_id, + new_download_data, + ) + .await { DownloadedTimeline::Abort => { register_sync_status(sync_start, task_name, None); @@ -852,18 +864,28 @@ async fn update_local_metadata( if local_lsn < Some(remote_lsn) { info!("Updating local timeline metadata from remote timeline: local disk_consistent_lsn={local_lsn:?}, remote disk_consistent_lsn={remote_lsn}"); - - let remote_metadata_bytes = remote_metadata - .to_bytes() - .context("Failed to serialize remote metadata to bytes")?; - fs::write(&local_metadata_path, &remote_metadata_bytes) - .await - .with_context(|| { - format!( - "Failed to write remote metadata bytes locally to path '{}'", - local_metadata_path.display() - ) - })?; + // clone because spawn_blocking requires static lifetime + let cloned_metadata = remote_metadata.to_owned(); + let ZTenantTimelineId { + tenant_id, + timeline_id, + } = sync_id; + tokio::task::spawn_blocking(move || { + LayeredRepository::save_metadata(conf, timeline_id, tenant_id, &cloned_metadata, true) + }) + .await + .with_context(|| { + format!( + "failed to join save_metadata task for {}", + local_metadata_path.display() + ) + })? + .with_context(|| { + format!( + "Failed to write remote metadata bytes locally to path '{}'", + local_metadata_path.display() + ) + })?; } else { info!("Local metadata at path '{}' has later disk consistent Lsn ({local_lsn:?}) than the remote one ({remote_lsn}), skipping the update", local_metadata_path.display()); } @@ -1062,7 +1084,7 @@ where debug!("Successfully fetched index part for {id}"); index_parts.insert(id, index_part); } - Err(e) => warn!("Failed to fetch index part for {id}: {e:?}"), + Err(e) => warn!("Failed to fetch index part for {id}: {e}"), } } @@ -1192,6 +1214,20 @@ fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Optio .observe(secs_elapsed) } +pub fn path_with_suffix_extension(original_path: impl AsRef, suffix: &str) -> PathBuf { + let new_extension = match original_path + .as_ref() + .extension() + .map(OsStr::to_string_lossy) + { + Some(extension) => Cow::Owned(format!("{extension}.{suffix}")), + None => Cow::Borrowed(suffix), + }; + original_path + .as_ref() + .with_extension(new_extension.as_ref()) +} + #[cfg(test)] mod test_utils { use utils::lsn::Lsn; @@ -1600,4 +1636,28 @@ mod tests { "Merged upload tasks should have a metadata with biggest disk_consistent_lsn" ); } + + #[test] + fn test_path_with_suffix_extension() { + let p = PathBuf::from("/foo/bar"); + assert_eq!( + &path_with_suffix_extension(&p, "temp").to_string_lossy(), + "/foo/bar.temp" + ); + let p = PathBuf::from("/foo/bar"); + assert_eq!( + &path_with_suffix_extension(&p, "temp.temp").to_string_lossy(), + "/foo/bar.temp.temp" + ); + let p = PathBuf::from("/foo/bar.baz"); + assert_eq!( + &path_with_suffix_extension(&p, "temp.temp").to_string_lossy(), + "/foo/bar.baz.temp.temp" + ); + let p = PathBuf::from("/foo/bar.baz"); + assert_eq!( + &path_with_suffix_extension(&p, ".temp").to_string_lossy(), + "/foo/bar.baz..temp" + ); + } } diff --git a/pageserver/src/remote_storage/storage_sync/download.rs b/pageserver/src/remote_storage/storage_sync/download.rs index c7a2b1fd22..7e2496b796 100644 --- a/pageserver/src/remote_storage/storage_sync/download.rs +++ b/pageserver/src/remote_storage/storage_sync/download.rs @@ -1,17 +1,20 @@ //! Timeline synchrnonization logic to fetch the layer files from remote storage into pageserver's local directory. -use std::fmt::Debug; +use std::{collections::HashSet, fmt::Debug, path::Path}; use anyhow::Context; use futures::stream::{FuturesUnordered, StreamExt}; -use tokio::fs; +use tokio::{ + fs, + io::{self, AsyncWriteExt}, +}; use tracing::{debug, error, info, warn}; use crate::{ config::PageServerConf, layered_repository::metadata::metadata_path, remote_storage::{ - storage_sync::{sync_queue, SyncTask}, + storage_sync::{path_with_suffix_extension, sync_queue, SyncTask}, RemoteStorage, }, }; @@ -22,6 +25,8 @@ use super::{ SyncData, TimelineDownload, }; +pub const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download"; + /// Retrieves index data from the remote storage for a given timeline. pub async fn download_index_part( conf: &'static PageServerConf, @@ -46,7 +51,7 @@ where .download(&part_storage_path, &mut index_part_bytes) .await .with_context(|| { - format!("Failed to download an index part from storage path '{part_storage_path:?}'") + format!("Failed to download an index part from storage path {part_storage_path:?}") })?; let index_part: IndexPart = serde_json::from_slice(&index_part_bytes).with_context(|| { @@ -80,6 +85,7 @@ pub(super) enum DownloadedTimeline { /// /// On an error, bumps the retries count and updates the files to skip with successful downloads, rescheduling the task. pub(super) async fn download_timeline_layers<'a, P, S>( + conf: &'static PageServerConf, storage: &'a S, remote_timeline: Option<&'a RemoteTimeline>, sync_id: ZTenantTimelineId, @@ -132,12 +138,24 @@ where ) })?; - let mut destination_file = fs::File::create(&layer_desination_path) - .await - .with_context(|| { + // Perform a rename inspired by durable_rename from file_utils.c. + // The sequence: + // write(tmp) + // fsync(tmp) + // rename(tmp, new) + // fsync(new) + // fsync(parent) + // For more context about durable_rename check this email from postgres mailing list: + // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com + // If pageserver crashes the temp file will be deleted on startup and re-downloaded. + let temp_file_path = + path_with_suffix_extension(&layer_desination_path, TEMP_DOWNLOAD_EXTENSION); + + let mut destination_file = + fs::File::create(&temp_file_path).await.with_context(|| { format!( "Failed to create a destination file for layer '{}'", - layer_desination_path.display() + temp_file_path.display() ) })?; @@ -149,15 +167,55 @@ where "Failed to download a layer from storage path '{layer_storage_path:?}'" ) })?; + + // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that: + // A file will not be closed immediately when it goes out of scope if there are any IO operations + // that have not yet completed. To ensure that a file is closed immediately when it is dropped, + // you should call flush before dropping it. + // + // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because + // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations. + // But for additional safety lets check/wait for any pending operations. + destination_file.flush().await.with_context(|| { + format!( + "failed to flush source file at {}", + temp_file_path.display() + ) + })?; + + // not using sync_data because it can lose file size update + destination_file.sync_all().await.with_context(|| { + format!( + "failed to fsync source file at {}", + temp_file_path.display() + ) + })?; + drop(destination_file); + + fail::fail_point!("remote-storage-download-pre-rename", |_| { + anyhow::bail!("remote-storage-download-pre-rename failpoint triggered") + }); + + fs::rename(&temp_file_path, &layer_desination_path).await?; + + fsync_path(&layer_desination_path).await.with_context(|| { + format!( + "Cannot fsync layer destination path {}", + layer_desination_path.display(), + ) + })?; } Ok::<_, anyhow::Error>(layer_desination_path) }) .collect::>(); let mut errors_happened = false; + // keep files we've downloaded to remove them from layers_to_skip if directory fsync fails + let mut undo = HashSet::new(); while let Some(download_result) = download_tasks.next().await { match download_result { Ok(downloaded_path) => { + undo.insert(downloaded_path.clone()); download.layers_to_skip.insert(downloaded_path); } Err(e) => { @@ -167,6 +225,24 @@ where } } + // fsync timeline directory which is a parent directory for downloaded files + let ZTenantTimelineId { + tenant_id, + timeline_id, + } = &sync_id; + let timeline_dir = conf.timeline_path(timeline_id, tenant_id); + if let Err(e) = fsync_path(&timeline_dir).await { + error!( + "Cannot fsync parent directory {} error {}", + timeline_dir.display(), + e + ); + for item in undo { + download.layers_to_skip.remove(&item); + } + errors_happened = true; + } + if errors_happened { debug!("Reenqueuing failed download task for timeline {sync_id}"); download_data.retries += 1; @@ -178,6 +254,10 @@ where } } +async fn fsync_path(path: impl AsRef) -> Result<(), io::Error> { + fs::File::open(path).await?.sync_all().await +} + #[cfg(test)] mod tests { use std::collections::{BTreeSet, HashSet}; @@ -236,6 +316,7 @@ mod tests { ); let download_data = match download_timeline_layers( + harness.conf, &storage, Some(&remote_timeline), sync_id, @@ -297,6 +378,7 @@ mod tests { let storage = LocalFs::new(tempdir()?.path().to_owned(), &harness.conf.workdir)?; let empty_remote_timeline_download = download_timeline_layers( + harness.conf, &storage, None, sync_id, @@ -319,6 +401,7 @@ mod tests { "Should not expect download for the timeline" ); let already_downloading_remote_timeline_download = download_timeline_layers( + harness.conf, &storage, Some(¬_expecting_download_remote_timeline), sync_id, diff --git a/test_runner/batch_others/test_remote_storage.py b/test_runner/batch_others/test_remote_storage.py index f2d654423a..59a9cfa378 100644 --- a/test_runner/batch_others/test_remote_storage.py +++ b/test_runner/batch_others/test_remote_storage.py @@ -4,10 +4,11 @@ import shutil, os from contextlib import closing from pathlib import Path +import time from uuid import UUID from fixtures.zenith_fixtures import ZenithEnvBuilder, assert_local, wait_for, wait_for_last_record_lsn, wait_for_upload from fixtures.log_helper import log -from fixtures.utils import lsn_from_hex +from fixtures.utils import lsn_from_hex, lsn_to_hex import pytest @@ -23,14 +24,14 @@ import pytest # # 2. Second pageserver # * starts another pageserver, connected to the same remote storage -# * same timeline id is queried for status, triggering timeline's download +# * timeline_attach is called for the same timeline id # * timeline status is polled until it's downloaded # * queries the specific data, ensuring that it matches the one stored before # # The tests are done for all types of remote storage pageserver supports. @pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3']) def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, storage_type: str): - zenith_env_builder.rust_log_override = 'debug' + # zenith_env_builder.rust_log_override = 'debug' zenith_env_builder.num_safekeepers = 1 if storage_type == 'local_fs': zenith_env_builder.enable_local_fs_remote_storage() @@ -67,9 +68,7 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, wait_for_last_record_lsn(client, UUID(tenant_id), UUID(timeline_id), current_lsn) # run checkpoint manually to be sure that data landed in remote storage - with closing(env.pageserver.connect()) as psconn: - with psconn.cursor() as pscur: - pscur.execute(f"checkpoint {tenant_id} {timeline_id}") + env.pageserver.safe_psql(f"checkpoint {tenant_id} {timeline_id}") log.info(f'waiting for checkpoint {checkpoint_number} upload') # wait until pageserver successfully uploaded a checkpoint to remote storage @@ -87,6 +86,27 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, ##### Second start, restore the data and ensure it's the same env.pageserver.start() + # Introduce failpoint in download + env.pageserver.safe_psql(f"failpoints remote-storage-download-pre-rename=return") + + client.timeline_attach(UUID(tenant_id), UUID(timeline_id)) + + # is there a better way to assert that fafilpoint triggered? + time.sleep(10) + + # assert cannot attach timeline that is scheduled for download + with pytest.raises(Exception, match="Timeline download is already in progress"): + client.timeline_attach(UUID(tenant_id), UUID(timeline_id)) + + detail = client.timeline_detail(UUID(tenant_id), UUID(timeline_id)) + log.info("Timeline detail with active failpoint: %s", detail) + assert detail['local'] is None + assert detail['remote']['awaits_download'] + + # trigger temporary download files removal + env.pageserver.stop() + env.pageserver.start() + client.timeline_attach(UUID(tenant_id), UUID(timeline_id)) log.info("waiting for timeline redownload") @@ -94,6 +114,12 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, interval=1, func=lambda: assert_local(client, UUID(tenant_id), UUID(timeline_id))) + detail = client.timeline_detail(UUID(tenant_id), UUID(timeline_id)) + assert detail['local'] is not None + log.info("Timeline detail after attach completed: %s", detail) + assert lsn_from_hex(detail['local']['last_record_lsn']) == current_lsn + assert not detail['remote']['awaits_download'] + pg = env.postgres.create_start('main') with closing(pg.connect()) as conn: with conn.cursor() as cur: