Use fsync+rename for atomic downloads from remote storage

Use failpoint in test_remote_storage to check the behavior
This commit is contained in:
Dmitry Rodionov
2022-04-25 16:56:19 +03:00
committed by Dmitry Rodionov
parent 76388abeb6
commit 05f8e6a050
10 changed files with 274 additions and 76 deletions

View File

@@ -4,8 +4,12 @@ version = "0.1.0"
edition = "2021"
[features]
default = []
# It is simpler infra-wise to have failpoints enabled by default
# It shouldnt affect perf in any way because failpoints
# are not placed in hot code paths
default = ["failpoints"]
profiling = ["pprof"]
failpoints = ["fail/failpoints"]
[dependencies]
chrono = "0.4.19"

View File

@@ -27,7 +27,12 @@ use utils::{
};
fn version() -> String {
format!("{} profiling:{}", GIT_VERSION, cfg!(feature = "profiling"))
format!(
"{} profiling:{} failpoints:{}",
GIT_VERSION,
cfg!(feature = "profiling"),
fail::has_failpoints()
)
}
fn main() -> anyhow::Result<()> {

View File

@@ -179,43 +179,47 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
let include_non_incremental_logical_size = get_include_non_incremental_logical_size(&request);
let span = info_span!("timeline_detail_handler", tenant = %tenant_id, timeline = %timeline_id);
let (local_timeline_info, remote_timeline_info) = async {
// any error here will render local timeline as None
// XXX .in_current_span does not attach messages in spawn_blocking future to current future's span
let local_timeline_info = tokio::task::spawn_blocking(move || {
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let local_timeline = {
repo.get_timeline(timeline_id)
.as_ref()
.map(|timeline| {
LocalTimelineInfo::from_repo_timeline(
tenant_id,
timeline_id,
timeline,
include_non_incremental_logical_size,
)
})
.transpose()?
};
Ok::<_, anyhow::Error>(local_timeline)
})
.await
.ok()
.and_then(|r| r.ok())
.flatten();
let (local_timeline_info, span) = tokio::task::spawn_blocking(move || {
let entered = span.entered();
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?;
let local_timeline = {
repo.get_timeline(timeline_id)
.as_ref()
.map(|timeline| {
LocalTimelineInfo::from_repo_timeline(
tenant_id,
timeline_id,
timeline,
include_non_incremental_logical_size,
)
let remote_timeline_info = {
let remote_index_read = get_state(&request).remote_index.read().await;
remote_index_read
.timeline_entry(&ZTenantTimelineId {
tenant_id,
timeline_id,
})
.map(|remote_entry| RemoteTimelineInfo {
remote_consistent_lsn: remote_entry.metadata.disk_consistent_lsn(),
awaits_download: remote_entry.awaits_download,
})
.transpose()?
};
Ok::<_, anyhow::Error>((local_timeline, entered.exit()))
})
.await
.map_err(ApiError::from_err)??;
let remote_timeline_info = {
let remote_index_read = get_state(&request).remote_index.read().await;
remote_index_read
.timeline_entry(&ZTenantTimelineId {
tenant_id,
timeline_id,
})
.map(|remote_entry| RemoteTimelineInfo {
remote_consistent_lsn: remote_entry.metadata.disk_consistent_lsn(),
awaits_download: remote_entry.awaits_download,
})
};
let _enter = span.entered();
(local_timeline_info, remote_timeline_info)
}
.instrument(info_span!("timeline_detail_handler", tenant = %tenant_id, timeline = %timeline_id))
.await;
if local_timeline_info.is_none() && remote_timeline_info.is_none() {
return Err(ApiError::NotFound(

View File

@@ -721,7 +721,7 @@ impl LayeredRepository {
}
/// Save timeline metadata to file
fn save_metadata(
pub fn save_metadata(
conf: &'static PageServerConf,
timelineid: ZTimelineId,
tenantid: ZTenantId,

View File

@@ -667,7 +667,10 @@ impl postgres_backend::Handler for PageServerHandler {
// on connect
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("failpoints ") {
ensure!(fail::has_failpoints(), "Cannot manage failpoints because pageserver was compiled without failpoints support");
let (_, failpoints) = query_string.split_at("failpoints ".len());
for failpoint in failpoints.split(';') {
if let Some((name, actions)) = failpoint.split_once('=') {
info!("cfg failpoint: {} {}", name, actions);

View File

@@ -101,6 +101,7 @@ use anyhow::{bail, Context};
use tokio::io;
use tracing::{debug, error, info};
use self::storage_sync::TEMP_DOWNLOAD_EXTENSION;
pub use self::{
local_fs::LocalFs,
s3_bucket::S3Bucket,
@@ -304,12 +305,29 @@ fn collect_timeline_files(
} else if is_ephemeral_file(&entry_path.file_name().unwrap().to_string_lossy()) {
debug!("skipping ephemeral file {}", entry_path.display());
continue;
} else if entry_path.extension().and_then(ffi::OsStr::to_str)
== Some(TEMP_DOWNLOAD_EXTENSION)
{
info!("removing temp download file at {}", entry_path.display());
fs::remove_file(&entry_path).with_context(|| {
format!(
"failed to remove temp download file at {}",
entry_path.display()
)
})?;
} else {
timeline_files.insert(entry_path);
}
}
}
// FIXME (rodionov) if attach call succeeded, and then pageserver is restarted before download is completed
// then attach is lost. There would be no retries for that,
// initial collect will fail because there is no metadata.
// We either need to start download if we see empty dir after restart or attach caller should
// be aware of that and retry attach if awaits_download for timeline switched from true to false
// but timelinne didnt appear locally.
// Check what happens with remote index in that case.
let timeline_metadata_path = match timeline_metadata_path {
Some(path) => path,
None => bail!("No metadata file found in the timeline directory"),

View File

@@ -17,6 +17,8 @@ use tokio::{
};
use tracing::*;
use crate::remote_storage::storage_sync::path_with_suffix_extension;
use super::{strip_path_prefix, RemoteStorage, StorageMetadata};
pub struct LocalFs {
@@ -114,7 +116,7 @@ impl RemoteStorage for LocalFs {
// We need this dance with sort of durable rename (without fsyncs)
// to prevent partial uploads. This was really hit when pageserver shutdown
// cancelled the upload and partial file was left on the fs
let temp_file_path = path_with_suffix_extension(&target_file_path, ".temp");
let temp_file_path = path_with_suffix_extension(&target_file_path, "temp");
let mut destination = io::BufWriter::new(
fs::OpenOptions::new()
.write(true)
@@ -299,15 +301,8 @@ impl RemoteStorage for LocalFs {
}
}
fn path_with_suffix_extension(original_path: &Path, suffix: &str) -> PathBuf {
let mut extension_with_suffix = original_path.extension().unwrap_or_default().to_os_string();
extension_with_suffix.push(suffix);
original_path.with_extension(extension_with_suffix)
}
fn storage_metadata_path(original_path: &Path) -> PathBuf {
path_with_suffix_extension(original_path, ".metadata")
path_with_suffix_extension(original_path, "metadata")
}
fn get_all_files<'a, P>(

View File

@@ -62,7 +62,9 @@ pub mod index;
mod upload;
use std::{
borrow::Cow,
collections::{HashMap, HashSet, VecDeque},
ffi::OsStr,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
ops::ControlFlow,
@@ -89,7 +91,10 @@ use self::{
use super::{LocalTimelineInitStatus, LocalTimelineInitStatuses, RemoteStorage, SyncStartupData};
use crate::{
config::PageServerConf,
layered_repository::metadata::{metadata_path, TimelineMetadata},
layered_repository::{
metadata::{metadata_path, TimelineMetadata},
LayeredRepository,
},
repository::TimelineSyncStatusUpdate,
tenant_mgr::apply_timeline_sync_status_updates,
thread_mgr,
@@ -103,6 +108,7 @@ use metrics::{
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
pub use self::download::download_index_part;
pub use self::download::TEMP_DOWNLOAD_EXTENSION;
lazy_static! {
static ref REMAINING_SYNC_ITEMS: IntGauge = register_int_gauge!(
@@ -782,8 +788,14 @@ where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
{
match download_timeline_layers(storage, current_remote_timeline, sync_id, new_download_data)
.await
match download_timeline_layers(
conf,
storage,
current_remote_timeline,
sync_id,
new_download_data,
)
.await
{
DownloadedTimeline::Abort => {
register_sync_status(sync_start, task_name, None);
@@ -852,18 +864,28 @@ async fn update_local_metadata(
if local_lsn < Some(remote_lsn) {
info!("Updating local timeline metadata from remote timeline: local disk_consistent_lsn={local_lsn:?}, remote disk_consistent_lsn={remote_lsn}");
let remote_metadata_bytes = remote_metadata
.to_bytes()
.context("Failed to serialize remote metadata to bytes")?;
fs::write(&local_metadata_path, &remote_metadata_bytes)
.await
.with_context(|| {
format!(
"Failed to write remote metadata bytes locally to path '{}'",
local_metadata_path.display()
)
})?;
// clone because spawn_blocking requires static lifetime
let cloned_metadata = remote_metadata.to_owned();
let ZTenantTimelineId {
tenant_id,
timeline_id,
} = sync_id;
tokio::task::spawn_blocking(move || {
LayeredRepository::save_metadata(conf, timeline_id, tenant_id, &cloned_metadata, true)
})
.await
.with_context(|| {
format!(
"failed to join save_metadata task for {}",
local_metadata_path.display()
)
})?
.with_context(|| {
format!(
"Failed to write remote metadata bytes locally to path '{}'",
local_metadata_path.display()
)
})?;
} else {
info!("Local metadata at path '{}' has later disk consistent Lsn ({local_lsn:?}) than the remote one ({remote_lsn}), skipping the update", local_metadata_path.display());
}
@@ -1062,7 +1084,7 @@ where
debug!("Successfully fetched index part for {id}");
index_parts.insert(id, index_part);
}
Err(e) => warn!("Failed to fetch index part for {id}: {e:?}"),
Err(e) => warn!("Failed to fetch index part for {id}: {e}"),
}
}
@@ -1192,6 +1214,20 @@ fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Optio
.observe(secs_elapsed)
}
pub fn path_with_suffix_extension(original_path: impl AsRef<Path>, suffix: &str) -> PathBuf {
let new_extension = match original_path
.as_ref()
.extension()
.map(OsStr::to_string_lossy)
{
Some(extension) => Cow::Owned(format!("{extension}.{suffix}")),
None => Cow::Borrowed(suffix),
};
original_path
.as_ref()
.with_extension(new_extension.as_ref())
}
#[cfg(test)]
mod test_utils {
use utils::lsn::Lsn;
@@ -1600,4 +1636,28 @@ mod tests {
"Merged upload tasks should have a metadata with biggest disk_consistent_lsn"
);
}
#[test]
fn test_path_with_suffix_extension() {
let p = PathBuf::from("/foo/bar");
assert_eq!(
&path_with_suffix_extension(&p, "temp").to_string_lossy(),
"/foo/bar.temp"
);
let p = PathBuf::from("/foo/bar");
assert_eq!(
&path_with_suffix_extension(&p, "temp.temp").to_string_lossy(),
"/foo/bar.temp.temp"
);
let p = PathBuf::from("/foo/bar.baz");
assert_eq!(
&path_with_suffix_extension(&p, "temp.temp").to_string_lossy(),
"/foo/bar.baz.temp.temp"
);
let p = PathBuf::from("/foo/bar.baz");
assert_eq!(
&path_with_suffix_extension(&p, ".temp").to_string_lossy(),
"/foo/bar.baz..temp"
);
}
}

View File

@@ -1,17 +1,20 @@
//! Timeline synchrnonization logic to fetch the layer files from remote storage into pageserver's local directory.
use std::fmt::Debug;
use std::{collections::HashSet, fmt::Debug, path::Path};
use anyhow::Context;
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::fs;
use tokio::{
fs,
io::{self, AsyncWriteExt},
};
use tracing::{debug, error, info, warn};
use crate::{
config::PageServerConf,
layered_repository::metadata::metadata_path,
remote_storage::{
storage_sync::{sync_queue, SyncTask},
storage_sync::{path_with_suffix_extension, sync_queue, SyncTask},
RemoteStorage,
},
};
@@ -22,6 +25,8 @@ use super::{
SyncData, TimelineDownload,
};
pub const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
/// Retrieves index data from the remote storage for a given timeline.
pub async fn download_index_part<P, S>(
conf: &'static PageServerConf,
@@ -46,7 +51,7 @@ where
.download(&part_storage_path, &mut index_part_bytes)
.await
.with_context(|| {
format!("Failed to download an index part from storage path '{part_storage_path:?}'")
format!("Failed to download an index part from storage path {part_storage_path:?}")
})?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes).with_context(|| {
@@ -80,6 +85,7 @@ pub(super) enum DownloadedTimeline {
///
/// On an error, bumps the retries count and updates the files to skip with successful downloads, rescheduling the task.
pub(super) async fn download_timeline_layers<'a, P, S>(
conf: &'static PageServerConf,
storage: &'a S,
remote_timeline: Option<&'a RemoteTimeline>,
sync_id: ZTenantTimelineId,
@@ -132,12 +138,24 @@ where
)
})?;
let mut destination_file = fs::File::create(&layer_desination_path)
.await
.with_context(|| {
// Perform a rename inspired by durable_rename from file_utils.c.
// The sequence:
// write(tmp)
// fsync(tmp)
// rename(tmp, new)
// fsync(new)
// fsync(parent)
// For more context about durable_rename check this email from postgres mailing list:
// https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
// If pageserver crashes the temp file will be deleted on startup and re-downloaded.
let temp_file_path =
path_with_suffix_extension(&layer_desination_path, TEMP_DOWNLOAD_EXTENSION);
let mut destination_file =
fs::File::create(&temp_file_path).await.with_context(|| {
format!(
"Failed to create a destination file for layer '{}'",
layer_desination_path.display()
temp_file_path.display()
)
})?;
@@ -149,15 +167,55 @@ where
"Failed to download a layer from storage path '{layer_storage_path:?}'"
)
})?;
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations
// that have not yet completed. To ensure that a file is closed immediately when it is dropped,
// you should call flush before dropping it.
//
// From the tokio code I see that it waits for pending operations to complete. There shouldt be any because
// we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations.
// But for additional safety lets check/wait for any pending operations.
destination_file.flush().await.with_context(|| {
format!(
"failed to flush source file at {}",
temp_file_path.display()
)
})?;
// not using sync_data because it can lose file size update
destination_file.sync_all().await.with_context(|| {
format!(
"failed to fsync source file at {}",
temp_file_path.display()
)
})?;
drop(destination_file);
fail::fail_point!("remote-storage-download-pre-rename", |_| {
anyhow::bail!("remote-storage-download-pre-rename failpoint triggered")
});
fs::rename(&temp_file_path, &layer_desination_path).await?;
fsync_path(&layer_desination_path).await.with_context(|| {
format!(
"Cannot fsync layer destination path {}",
layer_desination_path.display(),
)
})?;
}
Ok::<_, anyhow::Error>(layer_desination_path)
})
.collect::<FuturesUnordered<_>>();
let mut errors_happened = false;
// keep files we've downloaded to remove them from layers_to_skip if directory fsync fails
let mut undo = HashSet::new();
while let Some(download_result) = download_tasks.next().await {
match download_result {
Ok(downloaded_path) => {
undo.insert(downloaded_path.clone());
download.layers_to_skip.insert(downloaded_path);
}
Err(e) => {
@@ -167,6 +225,24 @@ where
}
}
// fsync timeline directory which is a parent directory for downloaded files
let ZTenantTimelineId {
tenant_id,
timeline_id,
} = &sync_id;
let timeline_dir = conf.timeline_path(timeline_id, tenant_id);
if let Err(e) = fsync_path(&timeline_dir).await {
error!(
"Cannot fsync parent directory {} error {}",
timeline_dir.display(),
e
);
for item in undo {
download.layers_to_skip.remove(&item);
}
errors_happened = true;
}
if errors_happened {
debug!("Reenqueuing failed download task for timeline {sync_id}");
download_data.retries += 1;
@@ -178,6 +254,10 @@ where
}
}
async fn fsync_path(path: impl AsRef<Path>) -> Result<(), io::Error> {
fs::File::open(path).await?.sync_all().await
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeSet, HashSet};
@@ -236,6 +316,7 @@ mod tests {
);
let download_data = match download_timeline_layers(
harness.conf,
&storage,
Some(&remote_timeline),
sync_id,
@@ -297,6 +378,7 @@ mod tests {
let storage = LocalFs::new(tempdir()?.path().to_owned(), &harness.conf.workdir)?;
let empty_remote_timeline_download = download_timeline_layers(
harness.conf,
&storage,
None,
sync_id,
@@ -319,6 +401,7 @@ mod tests {
"Should not expect download for the timeline"
);
let already_downloading_remote_timeline_download = download_timeline_layers(
harness.conf,
&storage,
Some(&not_expecting_download_remote_timeline),
sync_id,

View File

@@ -4,10 +4,11 @@
import shutil, os
from contextlib import closing
from pathlib import Path
import time
from uuid import UUID
from fixtures.zenith_fixtures import ZenithEnvBuilder, assert_local, wait_for, wait_for_last_record_lsn, wait_for_upload
from fixtures.log_helper import log
from fixtures.utils import lsn_from_hex
from fixtures.utils import lsn_from_hex, lsn_to_hex
import pytest
@@ -23,14 +24,14 @@ import pytest
#
# 2. Second pageserver
# * starts another pageserver, connected to the same remote storage
# * same timeline id is queried for status, triggering timeline's download
# * timeline_attach is called for the same timeline id
# * timeline status is polled until it's downloaded
# * queries the specific data, ensuring that it matches the one stored before
#
# The tests are done for all types of remote storage pageserver supports.
@pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3'])
def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder, storage_type: str):
zenith_env_builder.rust_log_override = 'debug'
# zenith_env_builder.rust_log_override = 'debug'
zenith_env_builder.num_safekeepers = 1
if storage_type == 'local_fs':
zenith_env_builder.enable_local_fs_remote_storage()
@@ -67,9 +68,7 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
wait_for_last_record_lsn(client, UUID(tenant_id), UUID(timeline_id), current_lsn)
# run checkpoint manually to be sure that data landed in remote storage
with closing(env.pageserver.connect()) as psconn:
with psconn.cursor() as pscur:
pscur.execute(f"checkpoint {tenant_id} {timeline_id}")
env.pageserver.safe_psql(f"checkpoint {tenant_id} {timeline_id}")
log.info(f'waiting for checkpoint {checkpoint_number} upload')
# wait until pageserver successfully uploaded a checkpoint to remote storage
@@ -87,6 +86,27 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
##### Second start, restore the data and ensure it's the same
env.pageserver.start()
# Introduce failpoint in download
env.pageserver.safe_psql(f"failpoints remote-storage-download-pre-rename=return")
client.timeline_attach(UUID(tenant_id), UUID(timeline_id))
# is there a better way to assert that fafilpoint triggered?
time.sleep(10)
# assert cannot attach timeline that is scheduled for download
with pytest.raises(Exception, match="Timeline download is already in progress"):
client.timeline_attach(UUID(tenant_id), UUID(timeline_id))
detail = client.timeline_detail(UUID(tenant_id), UUID(timeline_id))
log.info("Timeline detail with active failpoint: %s", detail)
assert detail['local'] is None
assert detail['remote']['awaits_download']
# trigger temporary download files removal
env.pageserver.stop()
env.pageserver.start()
client.timeline_attach(UUID(tenant_id), UUID(timeline_id))
log.info("waiting for timeline redownload")
@@ -94,6 +114,12 @@ def test_remote_storage_backup_and_restore(zenith_env_builder: ZenithEnvBuilder,
interval=1,
func=lambda: assert_local(client, UUID(tenant_id), UUID(timeline_id)))
detail = client.timeline_detail(UUID(tenant_id), UUID(timeline_id))
assert detail['local'] is not None
log.info("Timeline detail after attach completed: %s", detail)
assert lsn_from_hex(detail['local']['last_record_lsn']) == current_lsn
assert not detail['remote']['awaits_download']
pg = env.postgres.create_start('main')
with closing(pg.connect()) as conn:
with conn.cursor() as cur: