Compare commits

...

1 Commits

Author SHA1 Message Date
Arseny Sher
0a43b05ea6 safekeeper: remove .partial suffix on the last WAL file.
Reasons:
- it makes pg_waldump usage slightly more cumbersome, forcing to rename file.
- it makes pull_timeline slightly more cumbersome because at any
  moment source file can be renamed from partial to full.

Leave ability to read .partial files for backward compatibility.
2024-05-25 13:39:55 +03:00
7 changed files with 55 additions and 52 deletions

View File

@@ -17,6 +17,5 @@ declare -i WAL_SIZE=$REDO_POS+114
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate "$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
cp "$DATA_DIR"/pg_wal/000000010000000000000001 . cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/ cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
rm -f 000000010000000000000001 rm -f 000000010000000000000001

View File

@@ -16,6 +16,5 @@ declare -i WAL_SIZE=$REDO_POS+114
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate "$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
cp "$DATA_DIR"/pg_wal/000000010000000000000001 . cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/ cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
rm -f 000000010000000000000001 rm -f 000000010000000000000001

View File

@@ -469,16 +469,33 @@ async fn backup_object(
.await .await
} }
/// Source file should point to path with segment without .partial suffix; we'll
/// try to append .partial if file without it doesn't exist.
pub(crate) async fn backup_partial_segment( pub(crate) async fn backup_partial_segment(
source_file: &Utf8Path, source_file: &Utf8Path,
target_file: &RemotePath, target_file: &RemotePath,
size: usize, size: usize,
) -> Result<()> { ) -> Result<()> {
let storage = get_configured_remote_storage(); let storage: &GenericRemoteStorage = get_configured_remote_storage();
let mut partial_path = source_file.to_owned();
partial_path.set_extension("partial");
let file = File::open(&source_file) // First try opening without .partial prefix, if that fails, open legacy .partial one.
.await let file = match File::open(&source_file).await {
.with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?; Ok(file) => file,
Err(full_e) => match File::open(&partial_path).await {
Ok(file) => file,
Err(partial_e) => {
anyhow::bail!(
"failed to open file for partial backup, {} error: '{}', {} error: '{}'",
source_file,
full_e,
partial_path,
partial_e
);
}
},
};
// limiting the file to read only the first `size` bytes // limiting the file to read only the first `size` bytes
let limited_file = tokio::io::AsyncReadExt::take(file, size as u64); let limited_file = tokio::io::AsyncReadExt::take(file, size as u64);

View File

@@ -117,10 +117,6 @@ impl PartialBackup {
self.conf.my_id.0, self.conf.my_id.0,
) )
} }
fn local_segment_name(&self, segno: u64) -> String {
format!("{}.partial", self.segment_name(segno))
}
} }
impl PartialBackup { impl PartialBackup {
@@ -152,7 +148,7 @@ impl PartialBackup {
// We're going to backup bytes from the start of the segment up to flush_lsn. // We're going to backup bytes from the start of the segment up to flush_lsn.
let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size); let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
let local_path = self.local_prefix.join(self.local_segment_name(segno)); let local_path = self.local_prefix.join(self.segment_name(segno));
let remote_path = RemotePath::new(self.remote_prefix.join(&prepared.name).as_ref())?; let remote_path = RemotePath::new(self.remote_prefix.join(&prepared.name).as_ref())?;
// Upload first `backup_bytes` bytes of the segment to the remote storage. // Upload first `backup_bytes` bytes of the segment to the remote storage.

View File

@@ -3,9 +3,9 @@
//! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal. //! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
//! PG timeline is always 1, so WAL segments are usually have names like this: //! PG timeline is always 1, so WAL segments are usually have names like this:
//! - 000000010000000000000001 //! - 000000010000000000000001
//! - 000000010000000000000002.partial //! - 000000010000000000000002
//! //!
//! Note that last file has `.partial` suffix, that's different from postgres. //! In the past last file had `.partial` suffix, so code still can read it.
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use bytes::Bytes; use bytes::Bytes;
@@ -102,11 +102,13 @@ pub struct PhysicalStorage {
/// Cached open file for the last segment. /// Cached open file for the last segment.
/// ///
/// If Some(file) is open, then it always: /// If Some(file, is_partial) is open, then it always:
/// - has ".partial" suffix
/// - points to write_lsn, so no seek is needed for writing /// - points to write_lsn, so no seek is needed for writing
/// - doesn't point to the end of the segment /// - doesn't point to the end of the segment
file: Option<File>, ///
/// If the file name has .partial suffix (created before suffix was
/// removed), the bool is True.
file: Option<(File, bool)>,
/// When false, we have just initialized storage using the LSN from find_end_of_wal(). /// When false, we have just initialized storage using the LSN from find_end_of_wal().
/// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular, /// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
@@ -243,29 +245,26 @@ impl PhysicalStorage {
// Note: this doesn't get into observe_flush_seconds metric. But // Note: this doesn't get into observe_flush_seconds metric. But
// segment init should be separate metric, if any. // segment init should be separate metric, if any.
if let Err(e) = if let Err(e) = durable_rename(&tmp_path, &wal_file_path, !self.conf.no_sync).await {
durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await
{
// Probably rename succeeded, but fsync of it failed. Remove // Probably rename succeeded, but fsync of it failed. Remove
// the file then to avoid using it. // the file then to avoid using it.
remove_file(wal_file_partial_path) remove_file(wal_file_path)
.await .await
.or_else(utils::fs_ext::ignore_not_found)?; .or_else(utils::fs_ext::ignore_not_found)?;
return Err(e.into()); return Err(e.into());
} }
Ok((file, true)) Ok((file, false))
} }
} }
/// Write WAL bytes, which are known to be located in a single WAL segment. /// Write WAL bytes, which are known to be located in a single WAL segment.
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> { async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
let mut file = if let Some(file) = self.file.take() { let (mut file, is_partial) = if let Some((file, is_partial)) = self.file.take() {
file (file, is_partial)
} else { } else {
let (mut file, is_partial) = self.open_or_create(segno).await?; let (mut file, is_partial) = self.open_or_create(segno).await?;
assert!(is_partial, "unexpected write into non-partial segment file");
file.seek(SeekFrom::Start(xlogoff as u64)).await?; file.seek(SeekFrom::Start(xlogoff as u64)).await?;
file (file, is_partial)
}; };
file.write_all(buf).await?; file.write_all(buf).await?;
@@ -278,13 +277,15 @@ impl PhysicalStorage {
// If we reached the end of a WAL segment, flush and close it. // If we reached the end of a WAL segment, flush and close it.
self.fdatasync_file(&file).await?; self.fdatasync_file(&file).await?;
// Rename partial file to completed file // Rename partial file to completed file in case it was legacy .partial file.
let (wal_file_path, wal_file_partial_path) = if is_partial {
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?; let (wal_file_path, wal_file_partial_path) =
fs::rename(wal_file_partial_path, wal_file_path).await?; wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_partial_path, wal_file_path).await?;
}
} else { } else {
// otherwise, file can be reused later // otherwise, file can be reused later
self.file = Some(file); self.file = Some((file, is_partial));
} }
Ok(()) Ok(())
@@ -298,7 +299,7 @@ impl PhysicalStorage {
async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> { async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
if self.write_lsn != pos { if self.write_lsn != pos {
// need to flush the file before discarding it // need to flush the file before discarding it
if let Some(file) = self.file.take() { if let Some((file, _)) = self.file.take() {
self.fdatasync_file(&file).await?; self.fdatasync_file(&file).await?;
} }
@@ -402,9 +403,9 @@ impl Storage for PhysicalStorage {
return Ok(()); return Ok(());
} }
if let Some(unflushed_file) = self.file.take() { if let Some((unflushed_file, is_partial)) = self.file.take() {
self.fdatasync_file(&unflushed_file).await?; self.fdatasync_file(&unflushed_file).await?;
self.file = Some(unflushed_file); self.file = Some((unflushed_file, is_partial));
} else { } else {
// We have unflushed data (write_lsn != flush_lsn), but no file. // We have unflushed data (write_lsn != flush_lsn), but no file.
// This should only happen if last file was fully written and flushed, // This should only happen if last file was fully written and flushed,
@@ -445,7 +446,7 @@ impl Storage for PhysicalStorage {
} }
// Close previously opened file, if any // Close previously opened file, if any
if let Some(unflushed_file) = self.file.take() { if let Some((unflushed_file, _)) = self.file.take() {
self.fdatasync_file(&unflushed_file).await?; self.fdatasync_file(&unflushed_file).await?;
} }
@@ -455,20 +456,13 @@ impl Storage for PhysicalStorage {
// Remove all segments after the given LSN. // Remove all segments after the given LSN.
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?; remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
let (mut file, is_partial) = self.open_or_create(segno).await?; let (mut file, _) = self.open_or_create(segno).await?;
// Fill end with zeroes // Fill end with zeroes
file.seek(SeekFrom::Start(xlogoff as u64)).await?; file.seek(SeekFrom::Start(xlogoff as u64)).await?;
write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?; write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
self.fdatasync_file(&file).await?; self.fdatasync_file(&file).await?;
if !is_partial {
// Make segment partial once again
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(wal_file_path, wal_file_partial_path).await?;
}
// Update LSNs // Update LSNs
self.write_lsn = end_pos; self.write_lsn = end_pos;
self.write_record_lsn = end_pos; self.write_record_lsn = end_pos;

View File

@@ -1,5 +1,4 @@
import os import os
import shutil
from fixtures.neon_fixtures import NeonEnv, PgBin from fixtures.neon_fixtures import NeonEnv, PgBin
from fixtures.utils import subprocess_capture from fixtures.utils import subprocess_capture
@@ -48,14 +47,13 @@ def test_pg_waldump(neon_simple_env: NeonEnv, test_output_dir, pg_bin: PgBin):
endpoint.stop() endpoint.stop()
assert endpoint.pgdata_dir assert endpoint.pgdata_dir
wal_path = os.path.join(endpoint.pgdata_dir, "pg_wal/000000010000000000000001") seg_path = os.path.join(endpoint.pgdata_dir, "pg_wal/000000010000000000000001")
pg_waldump_path = os.path.join(pg_bin.pg_bin_path, "pg_waldump") pg_waldump_path = os.path.join(pg_bin.pg_bin_path, "pg_waldump")
# check segment on compute # check segment on compute
check_wal_segment(pg_waldump_path, wal_path, test_output_dir) check_wal_segment(pg_waldump_path, seg_path, test_output_dir)
# Check file on safekeepers as well. pg_waldump is strict about file naming, so remove .partial suffix. # Check file on safekeepers as well.
sk = env.safekeepers[0] sk = env.safekeepers[0]
sk_tli_dir = sk.timeline_dir(tenant_id, timeline_id) sk_tli_dir = sk.timeline_dir(tenant_id, timeline_id)
non_partial_path = os.path.join(sk_tli_dir, "000000010000000000000001") seg_path = os.path.join(sk_tli_dir, "000000010000000000000001")
shutil.copyfile(os.path.join(sk_tli_dir, "000000010000000000000001.partial"), non_partial_path) check_wal_segment(pg_waldump_path, seg_path, test_output_dir)
check_wal_segment(pg_waldump_path, non_partial_path, test_output_dir)

View File

@@ -590,10 +590,10 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
# save the last (partial) file to put it back after recreation; others will be fetched from s3 # save the last (partial) file to put it back after recreation; others will be fetched from s3
sk = env.safekeepers[0] sk = env.safekeepers[0]
tli_dir = Path(sk.data_dir) / str(tenant_id) / str(timeline_id) tli_dir = sk.data_dir / str(tenant_id) / str(timeline_id)
f_partial = Path([f for f in os.listdir(tli_dir) if f.endswith(".partial")][0]) f_partial = sk.list_segments(tenant_id, timeline_id)[-1]
f_partial_path = tli_dir / f_partial f_partial_path = tli_dir / f_partial
f_partial_saved = Path(sk.data_dir) / f_partial.name f_partial_saved = sk.data_dir / f_partial
f_partial_path.rename(f_partial_saved) f_partial_path.rename(f_partial_saved)
pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version