Merge branch 'main' into erik/durable-rename-fsync-metrics

This commit is contained in:
Erik Grinaker
2024-11-07 16:20:38 +01:00
committed by GitHub
2 changed files with 20 additions and 27 deletions

View File

@@ -32,7 +32,6 @@ use crate::state::TimelinePersistentState;
use crate::wal_backup::{read_object, remote_timeline_path};
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::XLogFileName;
use postgres_ffi::XLOG_BLCKSZ;
use pq_proto::SystemId;
use utils::{id::TenantTimelineId, lsn::Lsn};
@@ -224,6 +223,15 @@ impl PhysicalStorage {
)
}
/// Call fsync if config requires so.
async fn fsync_file(&mut self, file: &File) -> Result<()> {
if !self.no_sync {
self.metrics
.observe_flush_seconds(time_io_closure(file.sync_all()).await?);
}
Ok(())
}
/// Call fdatasync if config requires so.
async fn fdatasync_file(&mut self, file: &File) -> Result<()> {
if !self.no_sync {
@@ -257,11 +265,15 @@ impl PhysicalStorage {
// half initialized segment, first bake it under tmp filename and
// then rename.
let tmp_path = self.timeline_dir.join("waltmp");
let mut file = File::create(&tmp_path)
let file = File::create(&tmp_path)
.await
.with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?;
write_zeroes(&mut file, self.wal_seg_size).await?;
fail::fail_point!("sk-zero-segment", |_| {
info!("sk-zero-segment failpoint hit");
Err(anyhow::anyhow!("failpoint: sk-zero-segment"))
});
file.set_len(self.wal_seg_size as u64).await?;
match durable_rename(&tmp_path, &wal_file_partial_path, !self.no_sync).await {
Ok(fsync_latencies) => {
@@ -494,12 +506,12 @@ impl Storage for PhysicalStorage {
// Remove all segments after the given LSN.
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
let (mut file, is_partial) = self.open_or_create(segno).await?;
let (file, is_partial) = self.open_or_create(segno).await?;
// Fill end with zeroes
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
self.fdatasync_file(&file).await?;
file.set_len(xlogoff as u64).await?;
file.set_len(self.wal_seg_size as u64).await?;
self.fsync_file(&file).await?;
if !is_partial {
// Make segment partial once again
@@ -759,25 +771,6 @@ impl WalReader {
}
}
/// Zero block for filling created WAL segments.
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
/// Helper for filling file with zeroes.
async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
fail::fail_point!("sk-write-zeroes", |_| {
info!("write_zeroes hit failpoint");
Err(anyhow::anyhow!("failpoint: sk-write-zeroes"))
});
while count >= XLOG_BLCKSZ {
file.write_all(ZERO_BLOCK).await?;
count -= XLOG_BLCKSZ;
}
file.write_all(&ZERO_BLOCK[0..count]).await?;
file.flush().await?;
Ok(())
}
/// Helper function for opening WAL segment `segno` in `dir`. Returns file and
/// whether it is .partial.
pub(crate) async fn open_wal_file(

View File

@@ -602,7 +602,7 @@ async def run_segment_init_failure(env: NeonEnv):
sk = env.safekeepers[0]
sk_http = sk.http_client()
sk_http.configure_failpoints([("sk-write-zeroes", "return")])
sk_http.configure_failpoints([("sk-zero-segment", "return")])
conn = await ep.connect_async()
ep.safe_psql("select pg_switch_wal()") # jump to the segment boundary
# next insertion should hang until failpoint is disabled.