mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-03 16:30:38 +00:00
Compare commits
3 Commits
stepashka-
...
arthur/tmp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
99e3e5ad36 | ||
|
|
f2efe2496e | ||
|
|
e2813b4144 |
@@ -1,8 +1,3 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::zid::{NodeId, ZTimelineId};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TimelineCreateRequest {
|
||||
pub timeline_id: ZTimelineId,
|
||||
pub peer_ids: Vec<NodeId>,
|
||||
}
|
||||
|
||||
@@ -25,8 +25,6 @@ use utils::{
|
||||
zid::{NodeId, ZTenantId, ZTenantTimelineId, ZTimelineId},
|
||||
};
|
||||
|
||||
use super::models::TimelineCreateRequest;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct SafekeeperStatus {
|
||||
id: NodeId,
|
||||
@@ -122,20 +120,6 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
json_response(StatusCode::OK, status)
|
||||
}
|
||||
|
||||
async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
|
||||
|
||||
let zttid = ZTenantTimelineId {
|
||||
tenant_id: parse_request_param(&request, "tenant_id")?,
|
||||
timeline_id: request_data.timeline_id,
|
||||
};
|
||||
check_permission(&request, Some(zttid.tenant_id))?;
|
||||
GlobalTimelines::create(get_conf(&request), zttid, request_data.peer_ids)
|
||||
.map_err(ApiError::from_err)?;
|
||||
|
||||
json_response(StatusCode::CREATED, ())
|
||||
}
|
||||
|
||||
/// Deactivates the timeline and removes its data directory.
|
||||
///
|
||||
/// It does not try to stop any processing of the timeline; there is no such code at the time of writing.
|
||||
@@ -221,8 +205,7 @@ pub fn make_router(
|
||||
.data(Arc::new(conf))
|
||||
.data(auth)
|
||||
.get("/v1/status", status_handler)
|
||||
// Will be used in the future instead of implicit timeline creation
|
||||
.post("/v1/tenant/:tenant_id/timeline", timeline_create_handler)
|
||||
// TODO: update OpenAPI spec
|
||||
.get(
|
||||
"/v1/tenant/:tenant_id/timeline/:timeline_id",
|
||||
timeline_status_handler,
|
||||
|
||||
@@ -484,8 +484,12 @@ impl AcceptorProposerMessage {
|
||||
}
|
||||
}
|
||||
|
||||
/// SafeKeeper which consumes events (messages from compute) and provides
|
||||
/// replies.
|
||||
/// Safekeeper implements consensus to reliably persist WAL across nodes.
|
||||
/// It controls all WAL disk writes and updates of control file.
|
||||
///
|
||||
/// Currently safekeeper processes:
|
||||
/// - messages from compute (proposers) and provides replies
|
||||
/// - messages from etcd peers
|
||||
pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
|
||||
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
|
||||
/// Note: be careful to set only if we are sure our WAL (term history) matches
|
||||
@@ -508,19 +512,27 @@ where
|
||||
CTRL: control_file::Storage,
|
||||
WAL: wal_storage::Storage,
|
||||
{
|
||||
// constructor
|
||||
/// Accepts a control file storage containing the safekeeper state.
|
||||
/// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
|
||||
/// and `server` (`wal_seg_size` inside it) fields.
|
||||
pub fn new(
|
||||
ztli: ZTimelineId,
|
||||
state: CTRL,
|
||||
mut wal_store: WAL,
|
||||
node_id: NodeId,
|
||||
) -> Result<SafeKeeper<CTRL, WAL>> {
|
||||
if state.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.timeline_id {
|
||||
bail!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.timeline_id);
|
||||
if state.tenant_id == ZTenantId::from([0u8; 16])
|
||||
|| state.timeline_id == ZTimelineId::from([0u8; 16])
|
||||
{
|
||||
bail!(
|
||||
"Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
|
||||
state.tenant_id,
|
||||
state.timeline_id
|
||||
);
|
||||
}
|
||||
if state.server.wal_seg_size == 0 {
|
||||
bail!("Calling SafeKeeper::new with empty wal_seg_size");
|
||||
}
|
||||
|
||||
// initialize wal_store, if state is already initialized
|
||||
wal_store.init_storage(&state)?;
|
||||
|
||||
Ok(SafeKeeper {
|
||||
global_commit_lsn: state.commit_lsn,
|
||||
@@ -579,7 +591,7 @@ where
|
||||
&mut self,
|
||||
msg: &ProposerGreeting,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
/* Check protocol compatibility */
|
||||
// Check protocol compatibility
|
||||
if msg.protocol_version != SK_PROTOCOL_VERSION {
|
||||
bail!(
|
||||
"incompatible protocol version {}, expected {}",
|
||||
@@ -587,11 +599,11 @@ where
|
||||
SK_PROTOCOL_VERSION
|
||||
);
|
||||
}
|
||||
/* Postgres upgrade is not treated as fatal error */
|
||||
// Postgres upgrade is not treated as fatal error
|
||||
if msg.pg_version != self.state.server.pg_version
|
||||
&& self.state.server.pg_version != UNKNOWN_SERVER_VERSION
|
||||
{
|
||||
info!(
|
||||
warn!(
|
||||
"incompatible server version {}, expected {}",
|
||||
msg.pg_version, self.state.server.pg_version
|
||||
);
|
||||
@@ -610,17 +622,25 @@ where
|
||||
self.state.timeline_id
|
||||
);
|
||||
}
|
||||
|
||||
// set basic info about server, if not yet
|
||||
// TODO: verify that is doesn't change after
|
||||
{
|
||||
let mut state = self.state.clone();
|
||||
state.server.system_id = msg.system_id;
|
||||
state.server.wal_seg_size = msg.wal_seg_size;
|
||||
self.state.persist(&state)?;
|
||||
if self.state.server.wal_seg_size != msg.wal_seg_size {
|
||||
bail!(
|
||||
"invalid wal_seg_size, got {}, expected {}",
|
||||
msg.wal_seg_size,
|
||||
self.state.server.wal_seg_size
|
||||
);
|
||||
}
|
||||
|
||||
self.wal_store.init_storage(&self.state)?;
|
||||
// system_id will be updated on mismatch
|
||||
if self.state.server.system_id != msg.system_id {
|
||||
warn!(
|
||||
"unexpected system ID arrived, got {}, expected {}",
|
||||
msg.system_id, self.state.server.system_id
|
||||
);
|
||||
|
||||
let mut state = self.state.clone();
|
||||
state.server.system_id = msg.system_id;
|
||||
self.state.persist(&state)?;
|
||||
}
|
||||
|
||||
info!(
|
||||
"processed greeting from proposer {:?}, sending term {:?}",
|
||||
@@ -670,16 +690,6 @@ where
|
||||
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
|
||||
}
|
||||
|
||||
/// Bump our term if received a note from elected proposer with higher one
|
||||
fn bump_if_higher(&mut self, term: Term) -> Result<()> {
|
||||
if self.state.acceptor_state.term < term {
|
||||
let mut state = self.state.clone();
|
||||
state.acceptor_state.term = term;
|
||||
self.state.persist(&state)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Form AppendResponse from current state.
|
||||
fn append_response(&self) -> AppendResponse {
|
||||
let ar = AppendResponse {
|
||||
@@ -696,7 +706,12 @@ where
|
||||
|
||||
fn handle_elected(&mut self, msg: &ProposerElected) -> Result<Option<AcceptorProposerMessage>> {
|
||||
info!("received ProposerElected {:?}", msg);
|
||||
self.bump_if_higher(msg.term)?;
|
||||
if self.state.acceptor_state.term < msg.term {
|
||||
let mut state = self.state.clone();
|
||||
state.acceptor_state.term = msg.term;
|
||||
self.state.persist(&state)?;
|
||||
}
|
||||
|
||||
// If our term is higher, ignore the message (next feedback will inform the compute)
|
||||
if self.state.acceptor_state.term > msg.term {
|
||||
return Ok(None);
|
||||
@@ -749,7 +764,7 @@ where
|
||||
}
|
||||
|
||||
/// Advance commit_lsn taking into account what we have locally
|
||||
pub fn update_commit_lsn(&mut self) -> Result<()> {
|
||||
fn update_commit_lsn(&mut self) -> Result<()> {
|
||||
let commit_lsn = min(self.global_commit_lsn, self.flush_lsn());
|
||||
assert!(commit_lsn >= self.inmem.commit_lsn);
|
||||
|
||||
@@ -952,10 +967,6 @@ mod tests {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
fn init_storage(&mut self, _state: &SafeKeeperState) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
self.lsn = startpos + buf.len() as u64;
|
||||
Ok(())
|
||||
|
||||
@@ -557,6 +557,7 @@ impl TimelineTools for Option<Arc<Timeline>> {
|
||||
struct GlobalTimelinesState {
|
||||
timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>,
|
||||
wal_backup_launcher_tx: Option<Sender<ZTenantTimelineId>>,
|
||||
conf: Option<SafeKeeperConf>,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
@@ -576,15 +577,16 @@ pub struct TimelineDeleteForceResult {
|
||||
pub struct GlobalTimelines;
|
||||
|
||||
impl GlobalTimelines {
|
||||
pub fn init(wal_backup_launcher_tx: Sender<ZTenantTimelineId>) {
|
||||
pub fn init(wal_backup_launcher_tx: Sender<ZTenantTimelineId>, conf: &SafeKeeperConf) {
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
assert!(state.wal_backup_launcher_tx.is_none());
|
||||
assert!(state.conf.is_none());
|
||||
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
|
||||
state.conf = Some(conf.clone());
|
||||
}
|
||||
|
||||
fn create_internal(
|
||||
mut state: MutexGuard<GlobalTimelinesState>,
|
||||
conf: &SafeKeeperConf,
|
||||
zttid: ZTenantTimelineId,
|
||||
peer_ids: Vec<NodeId>,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
@@ -609,15 +611,6 @@ impl GlobalTimelines {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create(
|
||||
conf: &SafeKeeperConf,
|
||||
zttid: ZTenantTimelineId,
|
||||
peer_ids: Vec<NodeId>,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
GlobalTimelines::create_internal(state, conf, zttid, peer_ids)
|
||||
}
|
||||
|
||||
/// Get a timeline with control file loaded from the global TIMELINES_STATE.timelines map.
|
||||
/// If control file doesn't exist and create=false, bails out.
|
||||
pub fn get(
|
||||
@@ -625,7 +618,7 @@ impl GlobalTimelines {
|
||||
zttid: ZTenantTimelineId,
|
||||
create: bool,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let _enter = info_span!("", timeline = %zttid.tenant_id).entered();
|
||||
let _enter = info_span!("", tenant = %zttid.tenant_id).entered();
|
||||
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
|
||||
|
||||
@@ -53,7 +53,7 @@ pub fn wal_backup_launcher_thread_main(
|
||||
/// Check whether wal backup is required for timeline. If yes, mark that launcher is
|
||||
/// aware of current status and return the timeline.
|
||||
fn is_wal_backup_required(zttid: ZTenantTimelineId) -> Option<Arc<Timeline>> {
|
||||
GlobalTimelines::get_loaded(zttid).filter(|t| t.wal_backup_attend())
|
||||
GlobalTimelines::get_loaded(zttid).filter(|t| t.wal_backup-_attend())
|
||||
}
|
||||
|
||||
struct WalBackupTaskHandle {
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
//!
|
||||
//! Note that last file has `.partial` suffix, that's different from postgres.
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use anyhow::{bail, Context, Result};
|
||||
use std::io::{self, Seek, SeekFrom};
|
||||
use std::pin::Pin;
|
||||
use tokio::io::AsyncRead;
|
||||
@@ -16,7 +16,7 @@ use lazy_static::lazy_static;
|
||||
use postgres_ffi::xlog_utils::{
|
||||
find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName, XLogSegNo, PG_TLI,
|
||||
};
|
||||
use std::cmp::min;
|
||||
use std::cmp::{max, min};
|
||||
|
||||
use std::fs::{self, remove_file, File, OpenOptions};
|
||||
use std::io::Write;
|
||||
@@ -86,9 +86,6 @@ pub trait Storage {
|
||||
/// LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn;
|
||||
|
||||
/// Init storage with wal_seg_size and read WAL from disk to get latest LSN.
|
||||
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()>;
|
||||
|
||||
/// Write piece of WAL from buf to disk, but not necessarily sync it.
|
||||
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
|
||||
|
||||
@@ -104,7 +101,7 @@ pub trait Storage {
|
||||
}
|
||||
|
||||
/// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
|
||||
/// for better performance. Storage must be initialized before use.
|
||||
/// for better performance. Storage is initialized in the constructor.
|
||||
///
|
||||
/// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
|
||||
/// its filename and may be not fully flushed.
|
||||
@@ -112,16 +109,15 @@ pub trait Storage {
|
||||
/// Relationship of LSNs:
|
||||
/// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
|
||||
///
|
||||
/// When storage is just created, all LSNs are zeroes and there are no segments on disk.
|
||||
/// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
|
||||
pub struct PhysicalStorage {
|
||||
metrics: WalStorageMetrics,
|
||||
zttid: ZTenantTimelineId,
|
||||
timeline_dir: PathBuf,
|
||||
conf: SafeKeeperConf,
|
||||
|
||||
// fields below are filled upon initialization
|
||||
/// None if uninitialized, Some(usize) if storage is initialized.
|
||||
wal_seg_size: Option<usize>,
|
||||
/// Size of WAL segment in bytes.
|
||||
wal_seg_size: usize,
|
||||
|
||||
/// Written to disk, but possibly still in the cache and not fully persisted.
|
||||
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
|
||||
@@ -146,25 +142,50 @@ pub struct PhysicalStorage {
|
||||
}
|
||||
|
||||
impl PhysicalStorage {
|
||||
pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> PhysicalStorage {
|
||||
pub fn new(
|
||||
zttid: &ZTenantTimelineId,
|
||||
conf: &SafeKeeperConf,
|
||||
state: &SafeKeeperState,
|
||||
) -> Result<PhysicalStorage> {
|
||||
let timeline_dir = conf.timeline_dir(zttid);
|
||||
PhysicalStorage {
|
||||
|
||||
if state.server.wal_seg_size == 0 {
|
||||
bail!("wal_seg_size must be initialized before creating PhysicalStorage");
|
||||
}
|
||||
let wal_seg_size = state.server.wal_seg_size as usize;
|
||||
|
||||
// Find out where stored WAL ends, starting at commit_lsn which is a
|
||||
// known recent record boundary (unless we don't have WAL at all).
|
||||
let write_lsn = if state.commit_lsn == Lsn(0) {
|
||||
Lsn(0)
|
||||
} else {
|
||||
Lsn(find_end_of_wal(&timeline_dir, wal_seg_size, true, state.commit_lsn)?.0)
|
||||
};
|
||||
|
||||
// TODO: do we really know that write_lsn is fully flushed to disk?
|
||||
// If not, maybe it's better to call fsync() here to be sure?
|
||||
let flush_lsn = write_lsn;
|
||||
|
||||
info!(
|
||||
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
|
||||
zttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
|
||||
);
|
||||
if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
|
||||
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", zttid.timeline_id);
|
||||
}
|
||||
|
||||
Ok(PhysicalStorage {
|
||||
metrics: WalStorageMetrics::new(zttid),
|
||||
zttid: *zttid,
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
wal_seg_size: None,
|
||||
write_lsn: Lsn(0),
|
||||
write_record_lsn: Lsn(0),
|
||||
flush_record_lsn: Lsn(0),
|
||||
decoder: WalStreamDecoder::new(Lsn(0)),
|
||||
wal_seg_size,
|
||||
write_lsn,
|
||||
write_record_lsn: write_lsn,
|
||||
flush_record_lsn: flush_lsn,
|
||||
decoder: WalStreamDecoder::new(write_lsn),
|
||||
file: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper for flush_lsn updates that also updates metrics.
|
||||
fn update_flush_lsn(&mut self) {
|
||||
self.flush_record_lsn = self.write_record_lsn;
|
||||
})
|
||||
}
|
||||
|
||||
/// Call fdatasync if config requires so.
|
||||
@@ -189,9 +210,9 @@ impl PhysicalStorage {
|
||||
|
||||
/// Open or create WAL segment file. Caller must call seek to the wanted position.
|
||||
/// Returns `file` and `is_partial`.
|
||||
fn open_or_create(&self, segno: XLogSegNo, wal_seg_size: usize) -> Result<(File, bool)> {
|
||||
fn open_or_create(&self, segno: XLogSegNo) -> Result<(File, bool)> {
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
|
||||
// Try to open already completed segment
|
||||
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
|
||||
@@ -207,24 +228,18 @@ impl PhysicalStorage {
|
||||
.open(&wal_file_partial_path)
|
||||
.with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
|
||||
|
||||
write_zeroes(&mut file, wal_seg_size)?;
|
||||
write_zeroes(&mut file, self.wal_seg_size)?;
|
||||
self.fsync_file(&mut file)?;
|
||||
Ok((file, true))
|
||||
}
|
||||
}
|
||||
|
||||
/// Write WAL bytes, which are known to be located in a single WAL segment.
|
||||
fn write_in_segment(
|
||||
&mut self,
|
||||
segno: u64,
|
||||
xlogoff: usize,
|
||||
buf: &[u8],
|
||||
wal_seg_size: usize,
|
||||
) -> Result<()> {
|
||||
fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
|
||||
let mut file = if let Some(file) = self.file.take() {
|
||||
file
|
||||
} else {
|
||||
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
|
||||
let (mut file, is_partial) = self.open_or_create(segno)?;
|
||||
assert!(is_partial, "unexpected write into non-partial segment file");
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
file
|
||||
@@ -232,13 +247,13 @@ impl PhysicalStorage {
|
||||
|
||||
file.write_all(buf)?;
|
||||
|
||||
if xlogoff + buf.len() == wal_seg_size {
|
||||
if xlogoff + buf.len() == self.wal_seg_size {
|
||||
// If we reached the end of a WAL segment, flush and close it.
|
||||
self.fdatasync_file(&mut file)?;
|
||||
|
||||
// Rename partial file to completed file
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
fs::rename(&wal_file_partial_path, &wal_file_path)?;
|
||||
} else {
|
||||
// otherwise, file can be reused later
|
||||
@@ -254,10 +269,6 @@ impl PhysicalStorage {
|
||||
///
|
||||
/// Updates `write_lsn`.
|
||||
fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
|
||||
let wal_seg_size = self
|
||||
.wal_seg_size
|
||||
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
|
||||
|
||||
if self.write_lsn != pos {
|
||||
// need to flush the file before discarding it
|
||||
if let Some(mut file) = self.file.take() {
|
||||
@@ -269,17 +280,17 @@ impl PhysicalStorage {
|
||||
|
||||
while !buf.is_empty() {
|
||||
// Extract WAL location for this block
|
||||
let xlogoff = self.write_lsn.segment_offset(wal_seg_size) as usize;
|
||||
let segno = self.write_lsn.segment_number(wal_seg_size);
|
||||
let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size) as usize;
|
||||
let segno = self.write_lsn.segment_number(self.wal_seg_size);
|
||||
|
||||
// If crossing a WAL boundary, only write up until we reach wal segment size.
|
||||
let bytes_write = if xlogoff + buf.len() > wal_seg_size {
|
||||
wal_seg_size - xlogoff
|
||||
let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
|
||||
self.wal_seg_size - xlogoff
|
||||
} else {
|
||||
buf.len()
|
||||
};
|
||||
|
||||
self.write_in_segment(segno, xlogoff, &buf[..bytes_write], wal_seg_size)?;
|
||||
self.write_in_segment(segno, xlogoff, &buf[..bytes_write])?;
|
||||
self.write_lsn += bytes_write as u64;
|
||||
buf = &buf[bytes_write..];
|
||||
}
|
||||
@@ -294,53 +305,6 @@ impl Storage for PhysicalStorage {
|
||||
self.flush_record_lsn
|
||||
}
|
||||
|
||||
/// Storage needs to know wal_seg_size to know which segment to read/write, but
|
||||
/// wal_seg_size is not always known at the moment of storage creation. This method
|
||||
/// allows to postpone its initialization.
|
||||
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()> {
|
||||
if state.server.wal_seg_size == 0 {
|
||||
// wal_seg_size is still unknown. This is dead path normally, should
|
||||
// be used only in tests.
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(wal_seg_size) = self.wal_seg_size {
|
||||
// physical storage is already initialized
|
||||
assert_eq!(wal_seg_size, state.server.wal_seg_size as usize);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// initialize physical storage
|
||||
let wal_seg_size = state.server.wal_seg_size as usize;
|
||||
self.wal_seg_size = Some(wal_seg_size);
|
||||
|
||||
// Find out where stored WAL ends, starting at commit_lsn which is a
|
||||
// known recent record boundary (unless we don't have WAL at all).
|
||||
self.write_lsn = if state.commit_lsn == Lsn(0) {
|
||||
Lsn(0)
|
||||
} else {
|
||||
Lsn(find_end_of_wal(&self.timeline_dir, wal_seg_size, true, state.commit_lsn)?.0)
|
||||
};
|
||||
|
||||
self.write_record_lsn = self.write_lsn;
|
||||
|
||||
// TODO: do we really know that write_lsn is fully flushed to disk?
|
||||
// If not, maybe it's better to call fsync() here to be sure?
|
||||
self.update_flush_lsn();
|
||||
|
||||
info!(
|
||||
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
|
||||
self.zttid.timeline_id, self.flush_record_lsn, state.commit_lsn, state.peer_horizon_lsn,
|
||||
);
|
||||
if self.flush_record_lsn < state.commit_lsn
|
||||
|| self.flush_record_lsn < state.peer_horizon_lsn
|
||||
{
|
||||
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", self.zttid.timeline_id);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write WAL to disk.
|
||||
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
// Disallow any non-sequential writes, which can result in gaps or overwrites.
|
||||
@@ -404,80 +368,83 @@ impl Storage for PhysicalStorage {
|
||||
// We have unflushed data (write_lsn != flush_lsn), but no file.
|
||||
// This should only happen if last file was fully written and flushed,
|
||||
// but haven't updated flush_lsn yet.
|
||||
assert!(self.write_lsn.segment_offset(self.wal_seg_size.unwrap()) == 0);
|
||||
if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
|
||||
bail!(
|
||||
"unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
|
||||
self.write_lsn,
|
||||
self.flush_record_lsn
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// everything is flushed now, let's update flush_lsn
|
||||
self.update_flush_lsn();
|
||||
self.flush_record_lsn = self.write_record_lsn;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Truncate written WAL by removing all WAL segments after the given LSN.
|
||||
/// end_pos must point to the end of the WAL record.
|
||||
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
|
||||
let wal_seg_size = self
|
||||
.wal_seg_size
|
||||
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
|
||||
|
||||
// Streaming must not create a hole, so truncate cannot be called on non-written lsn
|
||||
assert!(self.write_lsn == Lsn(0) || self.write_lsn >= end_pos);
|
||||
if self.write_lsn != Lsn(0) && end_pos >= self.write_lsn {
|
||||
bail!(
|
||||
"truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
|
||||
self.write_lsn,
|
||||
end_pos
|
||||
);
|
||||
}
|
||||
|
||||
// Close previously opened file, if any
|
||||
if let Some(mut unflushed_file) = self.file.take() {
|
||||
self.fdatasync_file(&mut unflushed_file)?;
|
||||
}
|
||||
|
||||
let xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
|
||||
let segno = end_pos.segment_number(wal_seg_size);
|
||||
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
|
||||
let xlogoff = end_pos.segment_offset(self.wal_seg_size) as usize;
|
||||
let segno = end_pos.segment_number(self.wal_seg_size);
|
||||
|
||||
// Remove all segments after the given LSN.
|
||||
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno)?;
|
||||
|
||||
let (mut file, is_partial) = self.open_or_create(segno)?;
|
||||
|
||||
// Fill end with zeroes
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
write_zeroes(&mut file, wal_seg_size - xlogoff)?;
|
||||
write_zeroes(&mut file, self.wal_seg_size - xlogoff)?;
|
||||
self.fdatasync_file(&mut file)?;
|
||||
|
||||
if !is_partial {
|
||||
// Make segment partial once again
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
fs::rename(&wal_file_path, &wal_file_partial_path)?;
|
||||
}
|
||||
|
||||
// Remove all subsequent segments
|
||||
let mut segno = segno;
|
||||
loop {
|
||||
segno += 1;
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
|
||||
// TODO: better use fs::try_exists which is currently available only in nightly build
|
||||
if wal_file_path.exists() {
|
||||
fs::remove_file(&wal_file_path)?;
|
||||
} else if wal_file_partial_path.exists() {
|
||||
fs::remove_file(&wal_file_partial_path)?;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Update LSNs
|
||||
self.write_lsn = end_pos;
|
||||
self.write_record_lsn = end_pos;
|
||||
self.update_flush_lsn();
|
||||
self.flush_record_lsn = end_pos;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
|
||||
let timeline_dir = self.timeline_dir.clone();
|
||||
let wal_seg_size = self.wal_seg_size.unwrap();
|
||||
let wal_seg_size = self.wal_seg_size;
|
||||
Box::new(move |segno_up_to: XLogSegNo| {
|
||||
remove_up_to(&timeline_dir, wal_seg_size, segno_up_to)
|
||||
remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove all WAL segments in timeline_dir <= given segno.
|
||||
fn remove_up_to(timeline_dir: &Path, wal_seg_size: usize, segno_up_to: XLogSegNo) -> Result<()> {
|
||||
/// Remove all WAL segments in timeline_dir that match the given predicate.
|
||||
fn remove_segments_from_disk(
|
||||
timeline_dir: &Path,
|
||||
wal_seg_size: usize,
|
||||
remove_predicate: impl Fn(XLogSegNo) -> bool,
|
||||
) -> Result<()> {
|
||||
let mut n_removed = 0;
|
||||
let mut min_removed = u64::MAX;
|
||||
let mut max_removed = u64::MIN;
|
||||
|
||||
for entry in fs::read_dir(&timeline_dir)? {
|
||||
let entry = entry?;
|
||||
let entry_path = entry.path();
|
||||
@@ -489,19 +456,21 @@ fn remove_up_to(timeline_dir: &Path, wal_seg_size: usize, segno_up_to: XLogSegNo
|
||||
continue;
|
||||
}
|
||||
let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
|
||||
if segno <= segno_up_to {
|
||||
if remove_predicate(segno) {
|
||||
remove_file(entry_path)?;
|
||||
n_removed += 1;
|
||||
min_removed = min(min_removed, segno);
|
||||
max_removed = max(max_removed, segno);
|
||||
}
|
||||
}
|
||||
}
|
||||
let segno_from = segno_up_to - n_removed + 1;
|
||||
info!(
|
||||
"removed {} WAL segments [{}; {}]",
|
||||
n_removed,
|
||||
XLogFileName(PG_TLI, segno_from, wal_seg_size),
|
||||
XLogFileName(PG_TLI, segno_up_to, wal_seg_size)
|
||||
);
|
||||
|
||||
if n_removed > 0 {
|
||||
info!(
|
||||
"removed {} WAL segments [{}; {}]",
|
||||
n_removed, min_removed, max_removed
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -511,8 +480,10 @@ pub struct WalReader {
|
||||
pos: Lsn,
|
||||
wal_segment: Option<Pin<Box<dyn AsyncRead>>>,
|
||||
|
||||
enable_remote_read: bool,
|
||||
// S3 will be used to read WAL if LSN is not available locally
|
||||
enable_remote_read: bool,
|
||||
|
||||
// We don't have WAL locally if LSN is less than local_start_lsn
|
||||
local_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user