mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 13:32:57 +00:00
safekeeper: use set_len() to zero out segments (#9665)
## Problem When we create a new segment, we zero it out in order to avoid changing the length and fsyncing metadata on every write. However, we zeroed it out by writing 8 KB zero-pages, and Tokio file writes have non-trivial overhead. ## Summary of changes Zero out the segment using [`File::set_len()`](https://docs.rs/tokio/latest/i686-unknown-linux-gnu/tokio/fs/struct.File.html#method.set_len) instead. This will typically (depending on the filesystem) just write a sparse file and omit the 16 MB of data entirely. This improves WAL append throughput for large messages by over 400% with fsync disabled, and 100% with fsync enabled.
This commit is contained in:
@@ -31,7 +31,6 @@ use crate::state::TimelinePersistentState;
|
||||
use crate::wal_backup::{read_object, remote_timeline_path};
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::XLogFileName;
|
||||
use postgres_ffi::XLOG_BLCKSZ;
|
||||
use pq_proto::SystemId;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
@@ -223,6 +222,15 @@ impl PhysicalStorage {
|
||||
)
|
||||
}
|
||||
|
||||
/// Call fsync if config requires so.
|
||||
async fn fsync_file(&mut self, file: &File) -> Result<()> {
|
||||
if !self.no_sync {
|
||||
self.metrics
|
||||
.observe_flush_seconds(time_io_closure(file.sync_all()).await?);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Call fdatasync if config requires so.
|
||||
async fn fdatasync_file(&mut self, file: &File) -> Result<()> {
|
||||
if !self.no_sync {
|
||||
@@ -256,11 +264,15 @@ impl PhysicalStorage {
|
||||
// half initialized segment, first bake it under tmp filename and
|
||||
// then rename.
|
||||
let tmp_path = self.timeline_dir.join("waltmp");
|
||||
let mut file = File::create(&tmp_path)
|
||||
let file = File::create(&tmp_path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?;
|
||||
|
||||
write_zeroes(&mut file, self.wal_seg_size).await?;
|
||||
fail::fail_point!("sk-zero-segment", |_| {
|
||||
info!("sk-zero-segment failpoint hit");
|
||||
Err(anyhow::anyhow!("failpoint: sk-zero-segment"))
|
||||
});
|
||||
file.set_len(self.wal_seg_size as u64).await?;
|
||||
|
||||
// Note: this doesn't get into observe_flush_seconds metric. But
|
||||
// segment init should be separate metric, if any.
|
||||
@@ -486,12 +498,12 @@ impl Storage for PhysicalStorage {
|
||||
// Remove all segments after the given LSN.
|
||||
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
|
||||
|
||||
let (mut file, is_partial) = self.open_or_create(segno).await?;
|
||||
let (file, is_partial) = self.open_or_create(segno).await?;
|
||||
|
||||
// Fill end with zeroes
|
||||
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
|
||||
write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
|
||||
self.fdatasync_file(&file).await?;
|
||||
file.set_len(xlogoff as u64).await?;
|
||||
file.set_len(self.wal_seg_size as u64).await?;
|
||||
self.fsync_file(&file).await?;
|
||||
|
||||
if !is_partial {
|
||||
// Make segment partial once again
|
||||
@@ -751,25 +763,6 @@ impl WalReader {
|
||||
}
|
||||
}
|
||||
|
||||
/// Zero block for filling created WAL segments.
|
||||
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
||||
|
||||
/// Helper for filling file with zeroes.
|
||||
async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
|
||||
fail::fail_point!("sk-write-zeroes", |_| {
|
||||
info!("write_zeroes hit failpoint");
|
||||
Err(anyhow::anyhow!("failpoint: sk-write-zeroes"))
|
||||
});
|
||||
|
||||
while count >= XLOG_BLCKSZ {
|
||||
file.write_all(ZERO_BLOCK).await?;
|
||||
count -= XLOG_BLCKSZ;
|
||||
}
|
||||
file.write_all(&ZERO_BLOCK[0..count]).await?;
|
||||
file.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper function for opening WAL segment `segno` in `dir`. Returns file and
|
||||
/// whether it is .partial.
|
||||
pub(crate) async fn open_wal_file(
|
||||
|
||||
@@ -602,7 +602,7 @@ async def run_segment_init_failure(env: NeonEnv):
|
||||
|
||||
sk = env.safekeepers[0]
|
||||
sk_http = sk.http_client()
|
||||
sk_http.configure_failpoints([("sk-write-zeroes", "return")])
|
||||
sk_http.configure_failpoints([("sk-zero-segment", "return")])
|
||||
conn = await ep.connect_async()
|
||||
ep.safe_psql("select pg_switch_wal()") # jump to the segment boundary
|
||||
# next insertion should hang until failpoint is disabled.
|
||||
|
||||
Reference in New Issue
Block a user