safekeeper: remove .partial suffix on the last WAL file.

Reasons:
- it makes pg_waldump usage slightly more cumbersome, forcing to rename file.
- it makes pull_timeline slightly more cumbersome because at any
  moment source file can be renamed from partial to full.

Leave ability to read .partial files for backward compatibility.
This commit is contained in:
Arseny Sher
2024-05-24 22:25:56 +03:00
parent b2d34a82b9
commit 0a43b05ea6
7 changed files with 55 additions and 52 deletions

View File

@@ -17,6 +17,5 @@ declare -i WAL_SIZE=$REDO_POS+114
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
rm -f 000000010000000000000001

View File

@@ -16,6 +16,5 @@ declare -i WAL_SIZE=$REDO_POS+114
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
rm -f 000000010000000000000001

View File

@@ -469,16 +469,33 @@ async fn backup_object(
.await
}
/// Source file should point to path with segment without .partial suffix; we'll
/// try to append .partial if file without it doesn't exist.
pub(crate) async fn backup_partial_segment(
source_file: &Utf8Path,
target_file: &RemotePath,
size: usize,
) -> Result<()> {
let storage = get_configured_remote_storage();
let storage: &GenericRemoteStorage = get_configured_remote_storage();
let mut partial_path = source_file.to_owned();
partial_path.set_extension("partial");
let file = File::open(&source_file)
.await
.with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
// First try opening without .partial prefix, if that fails, open legacy .partial one.
let file = match File::open(&source_file).await {
Ok(file) => file,
Err(full_e) => match File::open(&partial_path).await {
Ok(file) => file,
Err(partial_e) => {
anyhow::bail!(
"failed to open file for partial backup, {} error: '{}', {} error: '{}'",
source_file,
full_e,
partial_path,
partial_e
);
}
},
};
// limiting the file to read only the first `size` bytes
let limited_file = tokio::io::AsyncReadExt::take(file, size as u64);

View File

@@ -117,10 +117,6 @@ impl PartialBackup {
self.conf.my_id.0,
)
}
fn local_segment_name(&self, segno: u64) -> String {
format!("{}.partial", self.segment_name(segno))
}
}
impl PartialBackup {
@@ -152,7 +148,7 @@ impl PartialBackup {
// We're going to backup bytes from the start of the segment up to flush_lsn.
let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
let local_path = self.local_prefix.join(self.local_segment_name(segno));
let local_path = self.local_prefix.join(self.segment_name(segno));
let remote_path = RemotePath::new(self.remote_prefix.join(&prepared.name).as_ref())?;
// Upload first `backup_bytes` bytes of the segment to the remote storage.

View File

@@ -3,9 +3,9 @@
//! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
//! PG timeline is always 1, so WAL segments are usually have names like this:
//! - 000000010000000000000001
//! - 000000010000000000000002.partial
//! - 000000010000000000000002
//!
//! Note that last file has `.partial` suffix, that's different from postgres.
//! In the past last file had `.partial` suffix, so code still can read it.
use anyhow::{bail, Context, Result};
use bytes::Bytes;
@@ -102,11 +102,13 @@ pub struct PhysicalStorage {
/// Cached open file for the last segment.
///
/// If Some(file) is open, then it always:
/// - has ".partial" suffix
/// If Some(file, is_partial) is open, then it always:
/// - points to write_lsn, so no seek is needed for writing
/// - doesn't point to the end of the segment
file: Option<File>,
///
/// If the file name has .partial suffix (created before suffix was
/// removed), the bool is True.
file: Option<(File, bool)>,
/// When false, we have just initialized storage using the LSN from find_end_of_wal().
/// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
@@ -243,29 +245,26 @@ impl PhysicalStorage {
// Note: this doesn't get into observe_flush_seconds metric. But
// segment init should be separate metric, if any.
if let Err(e) =
durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await
{
if let Err(e) = durable_rename(&tmp_path, &wal_file_path, !self.conf.no_sync).await {
// Probably rename succeeded, but fsync of it failed. Remove
// the file then to avoid using it.
remove_file(wal_file_partial_path)
remove_file(wal_file_path)
.await
.or_else(utils::fs_ext::ignore_not_found)?;
return Err(e.into());
}
Ok((file, true))
Ok((file, false))
}
}
/// Write WAL bytes, which are known to be located in a single WAL segment.
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
let mut file = if let Some(file) = self.file.take() {
file
let (mut file, is_partial) = if let Some((file, is_partial)) = self.file.take() {
(file, is_partial)
} else {
let (mut file, is_partial) = self.open_or_create(segno).await?;
assert!(is_partial, "unexpected write into non-partial segment file");
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
file
(file, is_partial)
};
file.write_all(buf).await?;
@@ -278,13 +277,15 @@ impl PhysicalStorage {
// If we reached the end of a WAL segment, flush and close it.
self.fdatasync_file(&file).await?;
// Rename partial file to completed file
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_partial_path, wal_file_path).await?;
// Rename partial file to completed file in case it was legacy .partial file.
if is_partial {
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_partial_path, wal_file_path).await?;
}
} else {
// otherwise, file can be reused later
self.file = Some(file);
self.file = Some((file, is_partial));
}
Ok(())
@@ -298,7 +299,7 @@ impl PhysicalStorage {
async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
if self.write_lsn != pos {
// need to flush the file before discarding it
if let Some(file) = self.file.take() {
if let Some((file, _)) = self.file.take() {
self.fdatasync_file(&file).await?;
}
@@ -402,9 +403,9 @@ impl Storage for PhysicalStorage {
return Ok(());
}
if let Some(unflushed_file) = self.file.take() {
if let Some((unflushed_file, is_partial)) = self.file.take() {
self.fdatasync_file(&unflushed_file).await?;
self.file = Some(unflushed_file);
self.file = Some((unflushed_file, is_partial));
} else {
// We have unflushed data (write_lsn != flush_lsn), but no file.
// This should only happen if last file was fully written and flushed,
@@ -445,7 +446,7 @@ impl Storage for PhysicalStorage {
}
// Close previously opened file, if any
if let Some(unflushed_file) = self.file.take() {
if let Some((unflushed_file, _)) = self.file.take() {
self.fdatasync_file(&unflushed_file).await?;
}
@@ -455,20 +456,13 @@ impl Storage for PhysicalStorage {
// Remove all segments after the given LSN.
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
let (mut file, is_partial) = self.open_or_create(segno).await?;
let (mut file, _) = self.open_or_create(segno).await?;
// Fill end with zeroes
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
self.fdatasync_file(&file).await?;
if !is_partial {
// Make segment partial once again
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_path, wal_file_partial_path).await?;
}
// Update LSNs
self.write_lsn = end_pos;
self.write_record_lsn = end_pos;

View File

@@ -1,5 +1,4 @@
import os
import shutil
from fixtures.neon_fixtures import NeonEnv, PgBin
from fixtures.utils import subprocess_capture
@@ -48,14 +47,13 @@ def test_pg_waldump(neon_simple_env: NeonEnv, test_output_dir, pg_bin: PgBin):
endpoint.stop()
assert endpoint.pgdata_dir
wal_path = os.path.join(endpoint.pgdata_dir, "pg_wal/000000010000000000000001")
seg_path = os.path.join(endpoint.pgdata_dir, "pg_wal/000000010000000000000001")
pg_waldump_path = os.path.join(pg_bin.pg_bin_path, "pg_waldump")
# check segment on compute
check_wal_segment(pg_waldump_path, wal_path, test_output_dir)
check_wal_segment(pg_waldump_path, seg_path, test_output_dir)
# Check file on safekeepers as well. pg_waldump is strict about file naming, so remove .partial suffix.
# Check file on safekeepers as well.
sk = env.safekeepers[0]
sk_tli_dir = sk.timeline_dir(tenant_id, timeline_id)
non_partial_path = os.path.join(sk_tli_dir, "000000010000000000000001")
shutil.copyfile(os.path.join(sk_tli_dir, "000000010000000000000001.partial"), non_partial_path)
check_wal_segment(pg_waldump_path, non_partial_path, test_output_dir)
seg_path = os.path.join(sk_tli_dir, "000000010000000000000001")
check_wal_segment(pg_waldump_path, seg_path, test_output_dir)

View File

@@ -590,10 +590,10 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
# save the last (partial) file to put it back after recreation; others will be fetched from s3
sk = env.safekeepers[0]
tli_dir = Path(sk.data_dir) / str(tenant_id) / str(timeline_id)
f_partial = Path([f for f in os.listdir(tli_dir) if f.endswith(".partial")][0])
tli_dir = sk.data_dir / str(tenant_id) / str(timeline_id)
f_partial = sk.list_segments(tenant_id, timeline_id)[-1]
f_partial_path = tli_dir / f_partial
f_partial_saved = Path(sk.data_dir) / f_partial.name
f_partial_saved = sk.data_dir / f_partial
f_partial_path.rename(f_partial_saved)
pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version