mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 18:40:38 +00:00
layer file download: final rename: fix durability (#6991)
Before this PR, the layer file download code would fsync the inode after rename instead of the timeline directory. That is not in line with what a comment further up says we're doing, and it's obviously not achieving the goal of making the rename durable. part of https://github.com/neondatabase/neon/issues/6663
This commit is contained in:
committed by
GitHub
parent
752bf5a22f
commit
f3e4f85e65
@@ -14,14 +14,14 @@ use tokio::io::{AsyncSeekExt, AsyncWriteExt};
|
||||
use tokio_util::io::StreamReader;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
use utils::{backoff, crashsafe};
|
||||
use utils::backoff;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
use crate::tenant::Generation;
|
||||
use crate::virtual_file::on_fatal_io_error;
|
||||
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
@@ -50,9 +50,8 @@ pub async fn download_layer_file<'a>(
|
||||
) -> Result<u64, DownloadError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let local_path = conf
|
||||
.timeline_path(&tenant_shard_id, &timeline_id)
|
||||
.join(layer_file_name.file_name());
|
||||
let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
|
||||
let local_path = timeline_path.join(layer_file_name.file_name());
|
||||
|
||||
let remote_path = remote_layer_path(
|
||||
&tenant_shard_id.tenant_id,
|
||||
@@ -149,10 +148,21 @@ pub async fn download_layer_file<'a>(
|
||||
.with_context(|| format!("rename download layer file to {local_path}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
crashsafe::fsync_async(&local_path)
|
||||
.await
|
||||
.with_context(|| format!("fsync layer file {local_path}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
// We use fatal_err() below because the after the rename above,
|
||||
// the in-memory state of the filesystem already has the layer file in its final place,
|
||||
// and subsequent pageserver code could think it's durable while it really isn't.
|
||||
let work = async move {
|
||||
let timeline_dir = VirtualFile::open(&timeline_path)
|
||||
.await
|
||||
.fatal_err("VirtualFile::open for timeline dir fsync");
|
||||
timeline_dir
|
||||
.sync_all()
|
||||
.await
|
||||
.fatal_err("VirtualFile::sync_all timeline dir");
|
||||
};
|
||||
crate::virtual_file::io_engine::get()
|
||||
.spawn_blocking_and_block_on_if_std(work)
|
||||
.await;
|
||||
|
||||
tracing::debug!("download complete: {local_path}");
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
//! Then use [`get`] and [`super::OpenOptions`].
|
||||
|
||||
use tokio_epoll_uring::{IoBuf, Slice};
|
||||
use tracing::Instrument;
|
||||
|
||||
pub(crate) use super::api::IoEngineKind;
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -225,4 +226,29 @@ impl IoEngine {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// If we switch a user of [`tokio::fs`] to use [`super::io_engine`],
|
||||
/// they'd start blocking the executor thread if [`IoEngine::StdFs`] is configured
|
||||
/// whereas before the switch to [`super::io_engine`], that wasn't the case.
|
||||
/// This method helps avoid such a regression.
|
||||
///
|
||||
/// Panics if the `spawn_blocking` fails, see [`tokio::task::JoinError`] for reasons why that can happen.
|
||||
pub(crate) async fn spawn_blocking_and_block_on_if_std<Fut, R>(&self, work: Fut) -> R
|
||||
where
|
||||
Fut: 'static + Send + std::future::Future<Output = R>,
|
||||
R: 'static + Send,
|
||||
{
|
||||
match self {
|
||||
IoEngine::NotSet => panic!("not initialized"),
|
||||
IoEngine::StdFs => {
|
||||
let span = tracing::info_span!("spawn_blocking_block_on_if_std");
|
||||
tokio::task::spawn_blocking({
|
||||
move || tokio::runtime::Handle::current().block_on(work.instrument(span))
|
||||
})
|
||||
.await
|
||||
.expect("failed to join blocking code most likely it panicked, panicking as well")
|
||||
}
|
||||
IoEngine::TokioEpollUring => work.await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user