mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
Add calculate_walrecord_end_lsn function
This commit is contained in:
@@ -247,6 +247,7 @@ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
|
||||
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
|
||||
|
||||
// Export some version independent functions that are used outside of this mod
|
||||
pub use v14::xlog_utils::calculate_walrecord_end_lsn;
|
||||
pub use v14::xlog_utils::encode_logical_message;
|
||||
pub use v14::xlog_utils::get_current_timestamp;
|
||||
pub use v14::xlog_utils::to_pg_timestamp;
|
||||
|
||||
@@ -499,6 +499,27 @@ pub fn encode_logical_message(prefix: &str, message: &str) -> Bytes {
|
||||
.encode(Lsn(0))
|
||||
}
|
||||
|
||||
/// Given the start LSN of a WAL record and its length, calculate its end LSN.
|
||||
///
|
||||
/// In the simple cases, the end LSN is just `start_lsn + len`, but it's more complicated if
|
||||
/// the record crosses a page boundary.
|
||||
///
|
||||
/// `len` is the length not including any WAL page headers. It should match the `xl_tot_len`
|
||||
/// field on the WAL record.
|
||||
pub fn calculate_walrecord_end_lsn(start_lsn: Lsn, len: usize) -> Lsn {
|
||||
let aligned_len = ((len + 7) & !7) as u64;
|
||||
let page_header_size = if (XLOG_BLCKSZ as u64) - start_lsn.0 % (XLOG_BLCKSZ as u64) < aligned_len {
|
||||
if (start_lsn.0 & ((WAL_SEGMENT_SIZE - 1) as u64)) < (XLOG_BLCKSZ as u64) {
|
||||
XLOG_SIZE_OF_XLOG_LONG_PHD
|
||||
} else {
|
||||
XLOG_SIZE_OF_XLOG_SHORT_PHD
|
||||
}
|
||||
} else {
|
||||
0
|
||||
} as u64;
|
||||
start_lsn + aligned_len + page_header_size
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -33,10 +33,9 @@ use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PG_HBA};
|
||||
use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
|
||||
use postgres_ffi::XLogFileName;
|
||||
use postgres_ffi::PG_TLI;
|
||||
use postgres_ffi::{dispatch_pgversion, CheckPoint};
|
||||
use postgres_ffi::{calculate_walrecord_end_lsn, dispatch_pgversion, CheckPoint};
|
||||
use postgres_ffi::{
|
||||
BLCKSZ, RELSEG_SIZE, SIZEOF_CHECKPOINT, WAL_SEGMENT_SIZE, XLOG_BLCKSZ,
|
||||
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
|
||||
BLCKSZ, RELSEG_SIZE, SIZEOF_CHECKPOINT, WAL_SEGMENT_SIZE, XLOG_SIZE_OF_XLOG_RECORD,
|
||||
};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -259,29 +258,18 @@ where
|
||||
// TODO include checksum
|
||||
|
||||
// Detect if we are creating the basebackup exactly at a shutdown checkpoint.
|
||||
let normal_shutdown = if let Ok(checkpoint_bytes) =
|
||||
self.timeline.get_checkpoint(self.lsn, self.ctx).await
|
||||
{
|
||||
let checkpoint =
|
||||
CheckPoint::decode(&checkpoint_bytes).context("deserialize checkpoint")?;
|
||||
let checkpoint_end =
|
||||
checkpoint.redo + ((XLOG_SIZE_OF_XLOG_RECORD + 8 + SIZEOF_CHECKPOINT) as u64);
|
||||
let checkpoint_end_lsn = Lsn(checkpoint_end
|
||||
+ (if (XLOG_BLCKSZ as u64) - checkpoint.redo % (XLOG_BLCKSZ as u64)
|
||||
< (SIZEOF_CHECKPOINT as u64)
|
||||
{
|
||||
if (checkpoint_end & ((WAL_SEGMENT_SIZE - 1) as u64)) < (XLOG_BLCKSZ as u64) {
|
||||
XLOG_SIZE_OF_XLOG_LONG_PHD
|
||||
} else {
|
||||
XLOG_SIZE_OF_XLOG_SHORT_PHD
|
||||
}
|
||||
} else {
|
||||
0
|
||||
}) as u64);
|
||||
checkpoint_end_lsn == self.lsn
|
||||
} else {
|
||||
false
|
||||
};
|
||||
let normal_shutdown =
|
||||
if let Ok(checkpoint_bytes) = self.timeline.get_checkpoint(self.lsn, self.ctx).await {
|
||||
let checkpoint =
|
||||
CheckPoint::decode(&checkpoint_bytes).context("deserialize checkpoint")?;
|
||||
let checkpoint_end_lsn = calculate_walrecord_end_lsn(
|
||||
Lsn(checkpoint.redo),
|
||||
XLOG_SIZE_OF_XLOG_RECORD + 8 + SIZEOF_CHECKPOINT,
|
||||
);
|
||||
checkpoint_end_lsn == self.lsn
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
|
||||
|
||||
|
||||
@@ -5909,6 +5909,10 @@ impl<'a> TimelineWriter<'a> {
|
||||
let state = self.write_guard.as_mut().unwrap();
|
||||
|
||||
state.current_size += buf_size;
|
||||
info!(
|
||||
"Current layer size is {}, buf_size={}",
|
||||
state.current_size, buf_size
|
||||
);
|
||||
state.prev_lsn = Some(batch_max_lsn);
|
||||
state.max_lsn = std::cmp::max(state.max_lsn, Some(batch_max_lsn));
|
||||
}
|
||||
|
||||
@@ -79,7 +79,6 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
|
||||
branch_name = "import_from_vanilla"
|
||||
tenant = TenantId.generate()
|
||||
timeline = TimelineId.generate()
|
||||
|
||||
# Set up pageserver for import
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
Reference in New Issue
Block a user