mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 05:22:56 +00:00
Use PG timelineid 1 everywhere.
As changing it doesn't have useful meaning in Zenith. ref #824
This commit is contained in:
@@ -285,11 +285,7 @@ impl<'a> Basebackup<'a> {
|
||||
|
||||
//send wal segment
|
||||
let segno = self.lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE);
|
||||
let wal_file_name = XLogFileName(
|
||||
1, // FIXME: always use Postgres timeline 1
|
||||
segno,
|
||||
pg_constants::WAL_SEGMENT_SIZE,
|
||||
);
|
||||
let wal_file_name = XLogFileName(PG_TLI, segno, pg_constants::WAL_SEGMENT_SIZE);
|
||||
let wal_file_path = format!("pg_wal/{}", wal_file_name);
|
||||
let header = new_tar_header(&wal_file_path, pg_constants::WAL_SEGMENT_SIZE as u64)?;
|
||||
let wal_seg = generate_wal_segment(segno, pg_control.system_identifier);
|
||||
|
||||
@@ -43,6 +43,9 @@ pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::<XLogRecord>();
|
||||
#[allow(clippy::identity_op)]
|
||||
pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2;
|
||||
|
||||
// PG timeline is always 1, changing it doesn't have useful meaning in Zenith.
|
||||
pub const PG_TLI: u32 = 1;
|
||||
|
||||
pub type XLogRecPtr = u64;
|
||||
pub type TimeLineID = u32;
|
||||
pub type TimestampTz = i64;
|
||||
@@ -421,7 +424,7 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Bytes {
|
||||
XLogPageHeaderData {
|
||||
xlp_magic: XLOG_PAGE_MAGIC as u16,
|
||||
xlp_info: pg_constants::XLP_LONG_HEADER,
|
||||
xlp_tli: 1, // FIXME: always use Postgres timeline 1
|
||||
xlp_tli: PG_TLI,
|
||||
xlp_pageaddr: pageaddr,
|
||||
xlp_rem_len: 0,
|
||||
..Default::default() // Put 0 in padding fields.
|
||||
|
||||
@@ -66,7 +66,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
)
|
||||
.map_err(ApiError::from_err)?;
|
||||
let sk_state = tli.get_info();
|
||||
let (flush_lsn, _) = tli.get_end_of_wal();
|
||||
let flush_lsn = tli.get_end_of_wal();
|
||||
|
||||
let status = TimelineStatus {
|
||||
tenant_id,
|
||||
|
||||
@@ -6,7 +6,9 @@ use crate::timeline::{ReplicaState, Timeline, TimelineTools};
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use postgres_ffi::xlog_utils::{get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE};
|
||||
use postgres_ffi::xlog_utils::{
|
||||
get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI,
|
||||
};
|
||||
use regex::Regex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::cmp::min;
|
||||
@@ -194,7 +196,7 @@ impl ReplicationConn {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let (wal_end, timeline) = swh.timeline.get().get_end_of_wal();
|
||||
let wal_end = swh.timeline.get().get_end_of_wal();
|
||||
// Walproposer gets special handling: safekeeper must give proposer all
|
||||
// local WAL till the end, whether committed or not (walproposer will
|
||||
// hang otherwise). That's because walproposer runs the consensus and
|
||||
@@ -248,7 +250,7 @@ impl ReplicationConn {
|
||||
None => {
|
||||
// Open a new file.
|
||||
let segno = start_pos.segment_number(wal_seg_size);
|
||||
let wal_file_name = XLogFileName(timeline, segno, wal_seg_size);
|
||||
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
|
||||
let timeline_id = swh.timeline.get().timelineid;
|
||||
let wal_file_path = swh.conf.timeline_dir(&timeline_id).join(wal_file_name);
|
||||
Self::open_wal_file(&wal_file_path)?
|
||||
|
||||
@@ -59,7 +59,6 @@ pub struct ServerInfo {
|
||||
pub tenant_id: ZTenantId,
|
||||
/// Zenith timelineid
|
||||
pub ztli: ZTimelineId,
|
||||
pub tli: TimeLineID,
|
||||
pub wal_seg_size: u32,
|
||||
}
|
||||
|
||||
@@ -98,7 +97,6 @@ impl SafeKeeperState {
|
||||
system_id: 0, /* Postgres system identifier */
|
||||
tenant_id: ZTenantId::from([0u8; 16]),
|
||||
ztli: ZTimelineId::from([0u8; 16]),
|
||||
tli: 0,
|
||||
wal_seg_size: 0,
|
||||
},
|
||||
proposer_uuid: [0; 16],
|
||||
@@ -353,7 +351,6 @@ pub struct SafeKeeper<ST: Storage> {
|
||||
/// Locally flushed part of WAL with full records (end_lsn of last record).
|
||||
/// Established by reading wal.
|
||||
pub flush_lsn: Lsn,
|
||||
pub tli: u32,
|
||||
// Cached metrics so we don't have to recompute labels on each update.
|
||||
metrics: SafeKeeperMetrics,
|
||||
/// not-yet-flushed pairs of same named fields in s.*
|
||||
@@ -370,10 +367,9 @@ where
|
||||
ST: Storage,
|
||||
{
|
||||
// constructor
|
||||
pub fn new(flush_lsn: Lsn, tli: u32, storage: ST, state: SafeKeeperState) -> SafeKeeper<ST> {
|
||||
pub fn new(flush_lsn: Lsn, storage: ST, state: SafeKeeperState) -> SafeKeeper<ST> {
|
||||
SafeKeeper {
|
||||
flush_lsn,
|
||||
tli,
|
||||
metrics: SafeKeeperMetrics::new_noname(),
|
||||
commit_lsn: state.commit_lsn,
|
||||
truncate_lsn: state.truncate_lsn,
|
||||
@@ -422,7 +418,6 @@ where
|
||||
self.s.server.system_id = msg.system_id;
|
||||
self.s.server.tenant_id = msg.tenant_id;
|
||||
self.s.server.ztli = msg.ztli;
|
||||
self.s.server.tli = msg.tli;
|
||||
self.s.server.wal_seg_size = msg.wal_seg_size;
|
||||
self.storage
|
||||
.persist(&self.s, true)
|
||||
@@ -643,7 +638,7 @@ mod tests {
|
||||
let storage = InMemoryStorage {
|
||||
persisted_state: SafeKeeperState::new(),
|
||||
};
|
||||
let mut sk = SafeKeeper::new(Lsn(0), 0, storage, SafeKeeperState::new());
|
||||
let mut sk = SafeKeeper::new(Lsn(0), storage, SafeKeeperState::new());
|
||||
|
||||
// check voting for 1 is ok
|
||||
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
|
||||
@@ -658,7 +653,7 @@ mod tests {
|
||||
let storage = InMemoryStorage {
|
||||
persisted_state: state.clone(),
|
||||
};
|
||||
sk = SafeKeeper::new(Lsn(0), 0, storage, state);
|
||||
sk = SafeKeeper::new(Lsn(0), storage, state);
|
||||
|
||||
// and ensure voting second time for 1 is not ok
|
||||
vote_resp = sk.process_msg(&vote_request);
|
||||
@@ -673,7 +668,7 @@ mod tests {
|
||||
let storage = InMemoryStorage {
|
||||
persisted_state: SafeKeeperState::new(),
|
||||
};
|
||||
let mut sk = SafeKeeper::new(Lsn(0), 0, storage, SafeKeeperState::new());
|
||||
let mut sk = SafeKeeper::new(Lsn(0), storage, SafeKeeperState::new());
|
||||
|
||||
let mut ar_hdr = AppendRequestHeader {
|
||||
term: 1,
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::timeline::{Timeline, TimelineTools};
|
||||
use crate::SafeKeeperConf;
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use postgres_ffi::xlog_utils::PG_TLI;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use zenith_utils::postgres_backend;
|
||||
@@ -101,11 +102,11 @@ impl SendWalHandler {
|
||||
/// Handle IDENTIFY_SYSTEM replication command
|
||||
///
|
||||
fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
|
||||
let (start_pos, timeline) = self.timeline.get().get_end_of_wal();
|
||||
let start_pos = self.timeline.get().get_end_of_wal();
|
||||
let lsn = start_pos.to_string();
|
||||
let tli = timeline.to_string();
|
||||
let sysid = self.timeline.get().get_info().server.system_id.to_string();
|
||||
let lsn_bytes = lsn.as_bytes();
|
||||
let tli = PG_TLI.to_string();
|
||||
let tli_bytes = tli.as_bytes();
|
||||
let sysid_bytes = sysid.as_bytes();
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use fs2::FileExt;
|
||||
use lazy_static::lazy_static;
|
||||
use log::*;
|
||||
use postgres_ffi::xlog_utils::find_end_of_wal;
|
||||
use postgres_ffi::xlog_utils::{find_end_of_wal, PG_TLI};
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::HashMap;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
@@ -136,7 +136,7 @@ impl SharedState {
|
||||
) -> Result<Self> {
|
||||
let (file_storage, state) = SharedState::load_from_control_file(conf, timelineid, create)
|
||||
.with_context(|| "failed to load from control file")?;
|
||||
let (flush_lsn, tli) = if state.server.wal_seg_size != 0 {
|
||||
let flush_lsn = if state.server.wal_seg_size != 0 {
|
||||
let wal_dir = conf.timeline_dir(&timelineid);
|
||||
find_end_of_wal(
|
||||
&wal_dir,
|
||||
@@ -144,13 +144,14 @@ impl SharedState {
|
||||
true,
|
||||
state.wal_start_lsn,
|
||||
)?
|
||||
.0
|
||||
} else {
|
||||
(0, 0)
|
||||
0
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
notified_commit_lsn: Lsn(0),
|
||||
sk: SafeKeeper::new(Lsn(flush_lsn), tli, file_storage, state),
|
||||
sk: SafeKeeper::new(Lsn(flush_lsn), file_storage, state),
|
||||
replicas: Vec::new(),
|
||||
})
|
||||
}
|
||||
@@ -352,9 +353,9 @@ impl Timeline {
|
||||
shared_state.replicas[id] = state;
|
||||
}
|
||||
|
||||
pub fn get_end_of_wal(&self) -> (Lsn, u32) {
|
||||
pub fn get_end_of_wal(&self) -> Lsn {
|
||||
let shared_state = self.mutex.lock().unwrap();
|
||||
(shared_state.sk.flush_lsn, shared_state.sk.tli)
|
||||
shared_state.sk.flush_lsn
|
||||
}
|
||||
}
|
||||
|
||||
@@ -533,7 +534,7 @@ impl Storage for FileStorage {
|
||||
/* Open file */
|
||||
let segno = start_pos.segment_number(wal_seg_size);
|
||||
// note: we basically don't support changing pg timeline
|
||||
let wal_file_name = XLogFileName(server.tli, segno, wal_seg_size);
|
||||
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
|
||||
let wal_file_path = self.conf.timeline_dir(&ztli).join(wal_file_name.clone());
|
||||
let wal_file_partial_path = self
|
||||
.conf
|
||||
|
||||
Reference in New Issue
Block a user