mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
1 Commits
split-prox
...
sk-rm-part
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0a43b05ea6 |
@@ -17,6 +17,5 @@ declare -i WAL_SIZE=$REDO_POS+114
|
||||
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
|
||||
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
|
||||
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
|
||||
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
|
||||
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
|
||||
rm -f 000000010000000000000001
|
||||
|
||||
@@ -16,6 +16,5 @@ declare -i WAL_SIZE=$REDO_POS+114
|
||||
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
|
||||
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
|
||||
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
|
||||
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
|
||||
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
|
||||
rm -f 000000010000000000000001
|
||||
|
||||
@@ -469,16 +469,33 @@ async fn backup_object(
|
||||
.await
|
||||
}
|
||||
|
||||
/// Source file should point to path with segment without .partial suffix; we'll
|
||||
/// try to append .partial if file without it doesn't exist.
|
||||
pub(crate) async fn backup_partial_segment(
|
||||
source_file: &Utf8Path,
|
||||
target_file: &RemotePath,
|
||||
size: usize,
|
||||
) -> Result<()> {
|
||||
let storage = get_configured_remote_storage();
|
||||
let storage: &GenericRemoteStorage = get_configured_remote_storage();
|
||||
let mut partial_path = source_file.to_owned();
|
||||
partial_path.set_extension("partial");
|
||||
|
||||
let file = File::open(&source_file)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
|
||||
// First try opening without .partial prefix, if that fails, open legacy .partial one.
|
||||
let file = match File::open(&source_file).await {
|
||||
Ok(file) => file,
|
||||
Err(full_e) => match File::open(&partial_path).await {
|
||||
Ok(file) => file,
|
||||
Err(partial_e) => {
|
||||
anyhow::bail!(
|
||||
"failed to open file for partial backup, {} error: '{}', {} error: '{}'",
|
||||
source_file,
|
||||
full_e,
|
||||
partial_path,
|
||||
partial_e
|
||||
);
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
// limiting the file to read only the first `size` bytes
|
||||
let limited_file = tokio::io::AsyncReadExt::take(file, size as u64);
|
||||
|
||||
@@ -117,10 +117,6 @@ impl PartialBackup {
|
||||
self.conf.my_id.0,
|
||||
)
|
||||
}
|
||||
|
||||
fn local_segment_name(&self, segno: u64) -> String {
|
||||
format!("{}.partial", self.segment_name(segno))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialBackup {
|
||||
@@ -152,7 +148,7 @@ impl PartialBackup {
|
||||
// We're going to backup bytes from the start of the segment up to flush_lsn.
|
||||
let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
|
||||
|
||||
let local_path = self.local_prefix.join(self.local_segment_name(segno));
|
||||
let local_path = self.local_prefix.join(self.segment_name(segno));
|
||||
let remote_path = RemotePath::new(self.remote_prefix.join(&prepared.name).as_ref())?;
|
||||
|
||||
// Upload first `backup_bytes` bytes of the segment to the remote storage.
|
||||
|
||||
@@ -3,9 +3,9 @@
|
||||
//! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
|
||||
//! PG timeline is always 1, so WAL segments are usually have names like this:
|
||||
//! - 000000010000000000000001
|
||||
//! - 000000010000000000000002.partial
|
||||
//! - 000000010000000000000002
|
||||
//!
|
||||
//! Note that last file has `.partial` suffix, that's different from postgres.
|
||||
//! In the past last file had `.partial` suffix, so code still can read it.
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use bytes::Bytes;
|
||||
@@ -102,11 +102,13 @@ pub struct PhysicalStorage {
|
||||
|
||||
/// Cached open file for the last segment.
|
||||
///
|
||||
/// If Some(file) is open, then it always:
|
||||
/// - has ".partial" suffix
|
||||
/// If Some(file, is_partial) is open, then it always:
|
||||
/// - points to write_lsn, so no seek is needed for writing
|
||||
/// - doesn't point to the end of the segment
|
||||
file: Option<File>,
|
||||
///
|
||||
/// If the file name has .partial suffix (created before suffix was
|
||||
/// removed), the bool is True.
|
||||
file: Option<(File, bool)>,
|
||||
|
||||
/// When false, we have just initialized storage using the LSN from find_end_of_wal().
|
||||
/// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
|
||||
@@ -243,29 +245,26 @@ impl PhysicalStorage {
|
||||
|
||||
// Note: this doesn't get into observe_flush_seconds metric. But
|
||||
// segment init should be separate metric, if any.
|
||||
if let Err(e) =
|
||||
durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await
|
||||
{
|
||||
if let Err(e) = durable_rename(&tmp_path, &wal_file_path, !self.conf.no_sync).await {
|
||||
// Probably rename succeeded, but fsync of it failed. Remove
|
||||
// the file then to avoid using it.
|
||||
remove_file(wal_file_partial_path)
|
||||
remove_file(wal_file_path)
|
||||
.await
|
||||
.or_else(utils::fs_ext::ignore_not_found)?;
|
||||
return Err(e.into());
|
||||
}
|
||||
Ok((file, true))
|
||||
Ok((file, false))
|
||||
}
|
||||
}
|
||||
|
||||
/// Write WAL bytes, which are known to be located in a single WAL segment.
|
||||
async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
|
||||
let mut file = if let Some(file) = self.file.take() {
|
||||
file
|
||||
let (mut file, is_partial) = if let Some((file, is_partial)) = self.file.take() {
|
||||
(file, is_partial)
|
||||
} else {
|
||||
let (mut file, is_partial) = self.open_or_create(segno).await?;
|
||||
assert!(is_partial, "unexpected write into non-partial segment file");
|
||||
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
|
||||
file
|
||||
(file, is_partial)
|
||||
};
|
||||
|
||||
file.write_all(buf).await?;
|
||||
@@ -278,13 +277,15 @@ impl PhysicalStorage {
|
||||
// If we reached the end of a WAL segment, flush and close it.
|
||||
self.fdatasync_file(&file).await?;
|
||||
|
||||
// Rename partial file to completed file
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
fs::rename(wal_file_partial_path, wal_file_path).await?;
|
||||
// Rename partial file to completed file in case it was legacy .partial file.
|
||||
if is_partial {
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
fs::rename(wal_file_partial_path, wal_file_path).await?;
|
||||
}
|
||||
} else {
|
||||
// otherwise, file can be reused later
|
||||
self.file = Some(file);
|
||||
self.file = Some((file, is_partial));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -298,7 +299,7 @@ impl PhysicalStorage {
|
||||
async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
|
||||
if self.write_lsn != pos {
|
||||
// need to flush the file before discarding it
|
||||
if let Some(file) = self.file.take() {
|
||||
if let Some((file, _)) = self.file.take() {
|
||||
self.fdatasync_file(&file).await?;
|
||||
}
|
||||
|
||||
@@ -402,9 +403,9 @@ impl Storage for PhysicalStorage {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(unflushed_file) = self.file.take() {
|
||||
if let Some((unflushed_file, is_partial)) = self.file.take() {
|
||||
self.fdatasync_file(&unflushed_file).await?;
|
||||
self.file = Some(unflushed_file);
|
||||
self.file = Some((unflushed_file, is_partial));
|
||||
} else {
|
||||
// We have unflushed data (write_lsn != flush_lsn), but no file.
|
||||
// This should only happen if last file was fully written and flushed,
|
||||
@@ -445,7 +446,7 @@ impl Storage for PhysicalStorage {
|
||||
}
|
||||
|
||||
// Close previously opened file, if any
|
||||
if let Some(unflushed_file) = self.file.take() {
|
||||
if let Some((unflushed_file, _)) = self.file.take() {
|
||||
self.fdatasync_file(&unflushed_file).await?;
|
||||
}
|
||||
|
||||
@@ -455,20 +456,13 @@ impl Storage for PhysicalStorage {
|
||||
// Remove all segments after the given LSN.
|
||||
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
|
||||
|
||||
let (mut file, is_partial) = self.open_or_create(segno).await?;
|
||||
let (mut file, _) = self.open_or_create(segno).await?;
|
||||
|
||||
// Fill end with zeroes
|
||||
file.seek(SeekFrom::Start(xlogoff as u64)).await?;
|
||||
write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
|
||||
self.fdatasync_file(&file).await?;
|
||||
|
||||
if !is_partial {
|
||||
// Make segment partial once again
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
fs::rename(wal_file_path, wal_file_partial_path).await?;
|
||||
}
|
||||
|
||||
// Update LSNs
|
||||
self.write_lsn = end_pos;
|
||||
self.write_record_lsn = end_pos;
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import os
|
||||
import shutil
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||
from fixtures.utils import subprocess_capture
|
||||
@@ -48,14 +47,13 @@ def test_pg_waldump(neon_simple_env: NeonEnv, test_output_dir, pg_bin: PgBin):
|
||||
endpoint.stop()
|
||||
|
||||
assert endpoint.pgdata_dir
|
||||
wal_path = os.path.join(endpoint.pgdata_dir, "pg_wal/000000010000000000000001")
|
||||
seg_path = os.path.join(endpoint.pgdata_dir, "pg_wal/000000010000000000000001")
|
||||
pg_waldump_path = os.path.join(pg_bin.pg_bin_path, "pg_waldump")
|
||||
# check segment on compute
|
||||
check_wal_segment(pg_waldump_path, wal_path, test_output_dir)
|
||||
check_wal_segment(pg_waldump_path, seg_path, test_output_dir)
|
||||
|
||||
# Check file on safekeepers as well. pg_waldump is strict about file naming, so remove .partial suffix.
|
||||
# Check file on safekeepers as well.
|
||||
sk = env.safekeepers[0]
|
||||
sk_tli_dir = sk.timeline_dir(tenant_id, timeline_id)
|
||||
non_partial_path = os.path.join(sk_tli_dir, "000000010000000000000001")
|
||||
shutil.copyfile(os.path.join(sk_tli_dir, "000000010000000000000001.partial"), non_partial_path)
|
||||
check_wal_segment(pg_waldump_path, non_partial_path, test_output_dir)
|
||||
seg_path = os.path.join(sk_tli_dir, "000000010000000000000001")
|
||||
check_wal_segment(pg_waldump_path, seg_path, test_output_dir)
|
||||
|
||||
@@ -590,10 +590,10 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# save the last (partial) file to put it back after recreation; others will be fetched from s3
|
||||
sk = env.safekeepers[0]
|
||||
tli_dir = Path(sk.data_dir) / str(tenant_id) / str(timeline_id)
|
||||
f_partial = Path([f for f in os.listdir(tli_dir) if f.endswith(".partial")][0])
|
||||
tli_dir = sk.data_dir / str(tenant_id) / str(timeline_id)
|
||||
f_partial = sk.list_segments(tenant_id, timeline_id)[-1]
|
||||
f_partial_path = tli_dir / f_partial
|
||||
f_partial_saved = Path(sk.data_dir) / f_partial.name
|
||||
f_partial_saved = sk.data_dir / f_partial
|
||||
f_partial_path.rename(f_partial_saved)
|
||||
|
||||
pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version
|
||||
|
||||
Reference in New Issue
Block a user