Compare commits

...

5 Commits

Author SHA1 Message Date
Arthur Petukhovsky
b735e92546 Clean up after self-review 2022-09-12 14:23:55 +00:00
Arthur Petukhovsky
c71f637d65 Fix timeline deletion 2022-09-12 14:23:55 +00:00
Arthur Petukhovsky
780e44c07b Fix context for some errors 2022-09-12 14:23:55 +00:00
Arthur Petukhovsky
f8aecd53cd Remove complicated state management 2022-09-12 14:23:55 +00:00
Arthur Petukhovsky
0faf0e92ec Add TimelineState 2022-09-12 14:23:55 +00:00
19 changed files with 1000 additions and 712 deletions

2
Cargo.lock generated
View File

@@ -2721,6 +2721,7 @@ dependencies = [
"hyper",
"metrics",
"once_cell",
"parking_lot 0.12.1",
"postgres",
"postgres-protocol",
"postgres_ffi",
@@ -2731,6 +2732,7 @@ dependencies = [
"serde_with",
"signal-hook",
"tempfile",
"thiserror",
"tokio",
"tokio-postgres",
"toml_edit",

View File

@@ -429,8 +429,22 @@ impl PostgresBackend {
// full cause of the error, not just the top-level context + its trace.
// We don't want to send that in the ErrorResponse though,
// because it's not relevant to the compute node logs.
error!("query handler for '{}' failed: {:?}", query_string, e);
self.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?;
//
// We also don't want to log full stacktrace when the error is primitive,
// such as usual connection closed.
let short_error = format!("{:#}", e);
let root_cause = e.root_cause().to_string();
if root_cause.contains("connection closed unexpectedly")
|| root_cause.contains("Broken pipe (os error 32)")
{
error!(
"query handler for '{}' failed: {}",
query_string, short_error
);
} else {
error!("query handler for '{}' failed: {:?}", query_string, e);
}
self.write_message_noflush(&BeMessage::ErrorResponse(&short_error))?;
// TODO: untangle convoluted control flow
if e.to_string().contains("failed to run") {
return Ok(ProcessMsgResult::Break);

View File

@@ -30,6 +30,8 @@ git-version = "0.3.5"
async-trait = "0.1"
once_cell = "1.13.0"
toml_edit = { version = "0.13", features = ["easy"] }
thiserror = "1"
parking_lot = "0.12.1"
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }

View File

@@ -24,9 +24,9 @@ use safekeeper::defaults::{
};
use safekeeper::http;
use safekeeper::remove_wal;
use safekeeper::timeline::GlobalTimelines;
use safekeeper::wal_backup;
use safekeeper::wal_service;
use safekeeper::GlobalTimelines;
use safekeeper::SafeKeeperConf;
use utils::auth::JwtAuth;
use utils::{
@@ -298,7 +298,9 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
let signals = signals::install_shutdown_handlers()?;
let mut threads = vec![];
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
GlobalTimelines::init(wal_backup_launcher_tx);
// Load all timelines from disk to memory.
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;
let conf_ = conf.clone();
threads.push(

View File

@@ -10,6 +10,7 @@ use etcd_broker::LeaseKeeper;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::HashSet;
use std::time::Duration;
use tokio::spawn;
use tokio::task::JoinHandle;
@@ -17,7 +18,8 @@ use tokio::{runtime, time::sleep};
use tracing::*;
use url::Url;
use crate::{timeline::GlobalTimelines, SafeKeeperConf};
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
use etcd_broker::{
subscription_key::{OperationKind, SkOperationKind, SubscriptionKey},
Client, PutOptions,
@@ -210,11 +212,15 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
// is under plain mutex. That's ok, all this code is not performance
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
let active_tlis = GlobalTimelines::get_active_timelines();
let mut active_tlis = GlobalTimelines::get_all();
active_tlis.retain(|tli| tli.is_active());
let active_tlis_set: HashSet<ZTenantTimelineId> =
active_tlis.iter().map(|tli| tli.zttid).collect();
// // Get and maintain (if not yet) per timeline lease to automatically delete obsolete data.
for zttid in active_tlis.iter() {
if let Entry::Vacant(v) = leases.entry(*zttid) {
for tli in &active_tlis {
if let Entry::Vacant(v) = leases.entry(tli.zttid) {
let lease = client.lease_grant(LEASE_TTL_SEC, None).await?;
let (keeper, ka_stream) = client.lease_keep_alive(lease.id()).await?;
v.insert(Lease {
@@ -224,12 +230,11 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
});
}
}
leases.retain(|zttid, _| active_tlis.contains(zttid));
leases.retain(|zttid, _| active_tlis_set.contains(zttid));
// Push data concurrently to not suffer from latency, with many timelines it can be slow.
let handles = active_tlis
.iter()
.filter_map(|zttid| GlobalTimelines::get_loaded(*zttid))
.map(|tli| {
let sk_info = tli.get_public_info(&conf);
let key = timeline_safekeeper_path(
@@ -279,7 +284,7 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
match subscription.value_updates.recv().await {
Some(new_info) => {
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(&conf, new_info.key.id, false) {
if let Ok(tli) = GlobalTimelines::get(new_info.key.id) {
tli.record_safekeeper_info(&new_info.value, new_info.key.node_id)
.await?
}

View File

@@ -9,8 +9,6 @@ use std::io::{Read, Write};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use tracing::*;
use crate::control_file_upgrade::upgrade_control_file;
use crate::safekeeper::{SafeKeeperState, SK_FORMAT_VERSION, SK_MAGIC};
use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};
@@ -55,6 +53,7 @@ pub struct FileStorage {
}
impl FileStorage {
/// Initialize storage by loading state from disk.
pub fn restore_new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
let timeline_dir = conf.timeline_dir(zttid);
let tenant_id = zttid.tenant_id.to_string();
@@ -71,6 +70,7 @@ impl FileStorage {
})
}
/// Create file storage for a new timeline, but don't persist it yet.
pub fn create_new(
zttid: &ZTenantTimelineId,
conf: &SafeKeeperConf,
@@ -80,19 +80,18 @@ impl FileStorage {
let tenant_id = zttid.tenant_id.to_string();
let timeline_id = zttid.timeline_id.to_string();
let mut store = FileStorage {
let store = FileStorage {
timeline_dir,
conf: conf.clone(),
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
.with_label_values(&[&tenant_id, &timeline_id]),
state: state.clone(),
state,
};
store.persist(&state)?;
Ok(store)
}
// Check the magic/version in the on-disk data and deserialize it, if possible.
/// Check the magic/version in the on-disk data and deserialize it, if possible.
fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
// Read the version independent part
let magic = buf.read_u32::<LittleEndian>()?;
@@ -112,7 +111,7 @@ impl FileStorage {
upgrade_control_file(buf, version)
}
// Load control file for given zttid at path specified by conf.
/// Load control file for given zttid at path specified by conf.
pub fn load_control_file_conf(
conf: &SafeKeeperConf,
zttid: &ZTenantTimelineId,
@@ -122,13 +121,7 @@ impl FileStorage {
}
/// Read in the control file.
/// If create=false and file doesn't exist, bails out.
pub fn load_control_file<P: AsRef<Path>>(control_file_path: P) -> Result<SafeKeeperState> {
info!(
"loading control file {}",
control_file_path.as_ref().display(),
);
let mut control_file = OpenOptions::new()
.read(true)
.write(true)
@@ -179,8 +172,8 @@ impl Deref for FileStorage {
}
impl Storage for FileStorage {
// persists state durably to underlying storage
// for description see https://lwn.net/Articles/457667/
/// persists state durably to underlying storage
/// for description see https://lwn.net/Articles/457667/
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
let _timer = &self.persist_control_file_seconds.start_timer();

View File

@@ -3,15 +3,15 @@
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
use crate::receive_wal::ReceiveWalConn;
use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage};
use crate::send_wal::ReplicationConn;
use crate::timeline::{Timeline, TimelineTools};
use crate::SafeKeeperConf;
use crate::{GlobalTimelines, SafeKeeperConf};
use anyhow::{bail, Context, Result};
use postgres_ffi::PG_TLI;
use regex::Regex;
use std::sync::Arc;
use tracing::info;
use utils::{
lsn::Lsn,
@@ -27,7 +27,7 @@ pub struct SafekeeperPostgresHandler {
pub appname: Option<String>,
pub ztenantid: Option<ZTenantId>,
pub ztimelineid: Option<ZTimelineId>,
pub timeline: Option<Arc<Timeline>>,
pub zttid: ZTenantTimelineId,
}
/// Parsed Postgres command.
@@ -98,30 +98,19 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
query_string, self.ztimelineid
);
let create = !(matches!(cmd, SafekeeperPostgresCommand::StartReplication { .. })
|| matches!(cmd, SafekeeperPostgresCommand::IdentifySystem));
let tenantid = self.ztenantid.context("tenantid is required")?;
let timelineid = self.ztimelineid.context("timelineid is required")?;
if self.timeline.is_none() {
self.timeline.set(
&self.conf,
ZTenantTimelineId::new(tenantid, timelineid),
create,
)?;
}
self.zttid = ZTenantTimelineId::new(tenantid, timelineid);
match cmd {
SafekeeperPostgresCommand::StartWalPush => ReceiveWalConn::new(pgb)
.run(self)
.context("failed to run ReceiveWalConn"),
SafekeeperPostgresCommand::StartReplication { start_lsn } => ReplicationConn::new(pgb)
.run(self, pgb, start_lsn)
.context("failed to run ReplicationConn"),
SafekeeperPostgresCommand::StartWalPush => ReceiveWalConn::new(pgb).run(self),
SafekeeperPostgresCommand::StartReplication { start_lsn } => {
ReplicationConn::new(pgb).run(self, pgb, start_lsn)
}
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb),
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => handle_json_ctrl(self, pgb, cmd),
}
.context(format!("timeline {timelineid}"))?;
.context(format!("Failed to process query for timeline {timelineid}"))?;
Ok(())
}
@@ -134,42 +123,26 @@ impl SafekeeperPostgresHandler {
appname: None,
ztenantid: None,
ztimelineid: None,
timeline: None,
zttid: ZTenantTimelineId::empty(),
}
}
/// Shortcut for calling `process_msg` in the timeline.
pub fn process_safekeeper_msg(
&self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
self.timeline
.get()
.process_msg(msg)
.context("failed to process ProposerAcceptorMessage")
}
///
/// Handle IDENTIFY_SYSTEM replication command
///
fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
let tli = GlobalTimelines::get(self.zttid)?;
let lsn = if self.is_walproposer_recovery() {
// walproposer should get all local WAL until flush_lsn
self.timeline.get().get_end_of_wal()
tli.get_flush_lsn()
} else {
// other clients shouldn't get any uncommitted WAL
self.timeline.get().get_state().0.commit_lsn
tli.get_state().0.commit_lsn
}
.to_string();
let sysid = self
.timeline
.get()
.get_state()
.1
.server
.system_id
.to_string();
let sysid = tli.get_state().1.server.system_id.to_string();
let lsn_bytes = lsn.as_bytes();
let tli = PG_TLI.to_string();
let tli_bytes = tli.as_bytes();

View File

@@ -1,3 +1,4 @@
use anyhow::anyhow;
use hyper::{Body, Request, Response, StatusCode, Uri};
use once_cell::sync::Lazy;
@@ -9,7 +10,9 @@ use std::sync::Arc;
use crate::safekeeper::Term;
use crate::safekeeper::TermHistory;
use crate::timeline::{GlobalTimelines, TimelineDeleteForceResult};
use crate::timelines_global_map::TimelineDeleteForceResult;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
use etcd_broker::subscription_value::SkTimelineInfo;
use utils::{
@@ -96,9 +99,9 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
);
check_permission(&request, Some(zttid.tenant_id))?;
let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
let tli = GlobalTimelines::get(zttid)?;
let (inmem, state) = tli.get_state();
let flush_lsn = tli.get_end_of_wal();
let flush_lsn = tli.get_flush_lsn();
let acc_state = AcceptorStateStatus {
term: state.acceptor_state.term,
@@ -130,19 +133,11 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
timeline_id: request_data.timeline_id,
};
check_permission(&request, Some(zttid.tenant_id))?;
GlobalTimelines::create(get_conf(&request), zttid, request_data.peer_ids)
.map_err(ApiError::from_err)?;
json_response(StatusCode::CREATED, ())
Err(ApiError::from_err(anyhow!("not implemented")))
}
/// Deactivates the timeline and removes its data directory.
///
/// It does not try to stop any processing of the timeline; there is no such code at the time of writing.
/// However, it tries to check whether the timeline was active and report it to caller just in case.
/// Note that this information is inaccurate:
/// 1. There is a race condition between checking the timeline for activity and actual directory deletion.
/// 2. At the time of writing Safekeeper rarely marks a timeline inactive. E.g. disconnecting the compute node does nothing.
async fn timeline_delete_force_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
@@ -152,12 +147,10 @@ async fn timeline_delete_force_handler(
);
check_permission(&request, Some(zttid.tenant_id))?;
ensure_no_body(&mut request).await?;
json_response(
StatusCode::OK,
GlobalTimelines::delete_force(get_conf(&request), &zttid)
.await
.map_err(ApiError::from_err)?,
)
let resp = tokio::task::spawn_blocking(move || GlobalTimelines::delete_force(&zttid))
.await
.map_err(ApiError::from_err)??;
json_response(StatusCode::OK, resp)
}
/// Deactivates all timelines for the tenant and removes its data directory.
@@ -168,11 +161,14 @@ async fn tenant_delete_force_handler(
let tenant_id = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
ensure_no_body(&mut request).await?;
let delete_info = tokio::task::spawn_blocking(move || {
GlobalTimelines::delete_force_all_for_tenant(&tenant_id)
})
.await
.map_err(ApiError::from_err)??;
json_response(
StatusCode::OK,
GlobalTimelines::delete_force_all_for_tenant(get_conf(&request), &tenant_id)
.await
.map_err(ApiError::from_err)?
delete_info
.iter()
.map(|(zttid, resp)| (format!("{}", zttid.timeline_id), *resp))
.collect::<HashMap<String, TimelineDeleteForceResult>>(),
@@ -188,7 +184,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
check_permission(&request, Some(zttid.tenant_id))?;
let safekeeper_info: SkTimelineInfo = json_request(&mut request).await?;
let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
let tli = GlobalTimelines::get(zttid)?;
tli.record_safekeeper_info(&safekeeper_info, NodeId(1))
.await?;

View File

@@ -6,18 +6,22 @@
//! modifications in tests.
//!
use std::sync::Arc;
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use tracing::*;
use utils::zid::ZTenantTimelineId;
use crate::handler::SafekeeperPostgresHandler;
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
use crate::safekeeper::{
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting,
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
};
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
use crate::timeline::TimelineTools;
use crate::timeline::Timeline;
use crate::GlobalTimelines;
use postgres_ffi::v14::xlog_utils;
use postgres_ffi::WAL_SEGMENT_SIZE;
use utils::{
@@ -57,23 +61,23 @@ struct AppendResult {
/// content, and then append it with specified term and lsn. This
/// function is used to test safekeepers in different scenarios.
pub fn handle_json_ctrl(
spg: &mut SafekeeperPostgresHandler,
spg: &SafekeeperPostgresHandler,
pgb: &mut PostgresBackend,
append_request: &AppendLogicalMessage,
) -> Result<()> {
info!("JSON_CTRL request: {:?}", append_request);
// need to init safekeeper state before AppendRequest
prepare_safekeeper(spg)?;
let tli = prepare_safekeeper(spg.zttid)?;
// if send_proposer_elected is true, we need to update local history
if append_request.send_proposer_elected {
send_proposer_elected(spg, append_request.term, append_request.epoch_start_lsn)?;
send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn)?;
}
let inserted_wal = append_logical_message(spg, append_request)?;
let inserted_wal = append_logical_message(&tli, append_request)?;
let response = AppendResult {
state: spg.timeline.get().get_state().1,
state: tli.get_state().1,
inserted_wal,
};
let response_data = serde_json::to_vec(&response)?;
@@ -91,28 +95,20 @@ pub fn handle_json_ctrl(
/// Prepare safekeeper to process append requests without crashes,
/// by sending ProposerGreeting with default server.wal_seg_size.
fn prepare_safekeeper(spg: &mut SafekeeperPostgresHandler) -> Result<()> {
let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting {
protocol_version: 2, // current protocol
pg_version: 0, // unknown
proposer_id: [0u8; 16],
system_id: 0,
ztli: spg.ztimelineid.unwrap(),
tenant_id: spg.ztenantid.unwrap(),
tli: 0,
wal_seg_size: WAL_SEGMENT_SIZE as u32, // 16MB, default for tests
});
let response = spg.timeline.get().process_msg(&greeting_request)?;
match response {
Some(AcceptorProposerMessage::Greeting(_)) => Ok(()),
_ => anyhow::bail!("not GreetingResponse"),
}
fn prepare_safekeeper(zttid: ZTenantTimelineId) -> Result<Arc<Timeline>> {
GlobalTimelines::create(
zttid,
ServerInfo {
pg_version: 0, // unknown
wal_seg_size: WAL_SEGMENT_SIZE as u32,
system_id: 0,
},
)
}
fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: Lsn) -> Result<()> {
fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> Result<()> {
// add new term to existing history
let history = spg.timeline.get().get_state().1.acceptor_state.term_history;
let history = tli.get_state().1.acceptor_state.term_history;
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
let mut history_entries = history.0;
history_entries.push(TermSwitchEntry { term, lsn });
@@ -125,7 +121,7 @@ fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: L
timeline_start_lsn: lsn,
});
spg.timeline.get().process_msg(&proposer_elected_request)?;
tli.process_msg(&proposer_elected_request)?;
Ok(())
}
@@ -138,12 +134,9 @@ struct InsertedWAL {
/// Extend local WAL with new LogicalMessage record. To do that,
/// create AppendRequest with new WAL and pass it to safekeeper.
fn append_logical_message(
spg: &mut SafekeeperPostgresHandler,
msg: &AppendLogicalMessage,
) -> Result<InsertedWAL> {
fn append_logical_message(tli: &Arc<Timeline>, msg: &AppendLogicalMessage) -> Result<InsertedWAL> {
let wal_data = xlog_utils::encode_logical_message(&msg.lm_prefix, &msg.lm_message);
let sk_state = spg.timeline.get().get_state().1;
let sk_state = tli.get_state().1;
let begin_lsn = msg.begin_lsn;
let end_lsn = begin_lsn + wal_data.len() as u64;
@@ -167,7 +160,7 @@ fn append_logical_message(
wal_data: Bytes::from(wal_data),
});
let response = spg.timeline.get().process_msg(&append_request)?;
let response = tli.process_msg(&append_request)?;
let append_response = match response {
Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,

View File

@@ -23,6 +23,9 @@ pub mod wal_backup;
pub mod wal_service;
pub mod wal_storage;
mod timelines_global_map;
pub use timelines_global_map::GlobalTimelines;
pub mod defaults {
use const_format::formatcp;
use std::time::Duration;

View File

@@ -12,7 +12,8 @@ use utils::{lsn::Lsn, zid::ZTenantTimelineId};
use crate::{
safekeeper::{SafeKeeperState, SafekeeperMemState},
timeline::{GlobalTimelines, ReplicaState},
timeline::ReplicaState,
GlobalTimelines,
};
pub struct FullTimelineInfo {
@@ -235,9 +236,15 @@ impl Collector for TimelineCollector {
self.disk_usage.reset();
self.acceptor_term.reset();
let timelines = GlobalTimelines::active_timelines_metrics();
let timelines = GlobalTimelines::get_all();
for arc_tli in timelines {
let tli = arc_tli.info_for_metrics();
if tli.is_none() {
continue;
}
let tli = tli.unwrap();
for tli in timelines {
let tenant_id = tli.zttid.tenant_id.to_string();
let timeline_id = tli.zttid.timeline_id.to_string();
let labels = &[tenant_id.as_str(), timeline_id.as_str()];

View File

@@ -7,7 +7,9 @@ use anyhow::{anyhow, bail, Result};
use bytes::BytesMut;
use tracing::*;
use crate::safekeeper::ServerInfo;
use crate::timeline::Timeline;
use crate::GlobalTimelines;
use std::net::SocketAddr;
use std::sync::mpsc::channel;
@@ -20,7 +22,6 @@ use crate::safekeeper::AcceptorProposerMessage;
use crate::safekeeper::ProposerAcceptorMessage;
use crate::handler::SafekeeperPostgresHandler;
use crate::timeline::TimelineTools;
use utils::{
postgres_backend::PostgresBackend,
pq_proto::{BeMessage, FeMessage},
@@ -67,15 +68,21 @@ impl<'pg> ReceiveWalConn<'pg> {
// Receive information about server
let next_msg = poll_reader.recv_msg()?;
match next_msg {
let tli = match next_msg {
ProposerAcceptorMessage::Greeting(ref greeting) => {
info!(
"start handshake with wal proposer {} sysid {} timeline {}",
self.peer_addr, greeting.system_id, greeting.tli,
);
let server_info = ServerInfo {
pg_version: greeting.pg_version,
system_id: greeting.system_id,
wal_seg_size: greeting.wal_seg_size,
};
GlobalTimelines::create(spg.zttid, server_info)?
}
_ => bail!("unexpected message {:?} instead of greeting", next_msg),
}
};
let mut next_msg = Some(next_msg);
@@ -88,7 +95,7 @@ impl<'pg> ReceiveWalConn<'pg> {
while let Some(ProposerAcceptorMessage::AppendRequest(append_request)) = next_msg {
let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
let reply = spg.process_safekeeper_msg(&msg)?;
let reply = tli.process_msg(&msg)?;
if let Some(reply) = reply {
self.write_msg(&reply)?;
}
@@ -97,13 +104,13 @@ impl<'pg> ReceiveWalConn<'pg> {
}
// flush all written WAL to the disk
let reply = spg.process_safekeeper_msg(&ProposerAcceptorMessage::FlushWAL)?;
let reply = tli.process_msg(&ProposerAcceptorMessage::FlushWAL)?;
if let Some(reply) = reply {
self.write_msg(&reply)?;
}
} else if let Some(msg) = next_msg.take() {
// process other message
let reply = spg.process_safekeeper_msg(&msg)?;
let reply = tli.process_msg(&msg)?;
if let Some(reply) = reply {
self.write_msg(&reply)?;
}
@@ -112,9 +119,9 @@ impl<'pg> ReceiveWalConn<'pg> {
// Register the connection and defer unregister. Do that only
// after processing first message, as it sets wal_seg_size,
// wanted by many.
spg.timeline.get().on_compute_connect()?;
tli.on_compute_connect()?;
_guard = Some(ComputeConnectionGuard {
timeline: Arc::clone(spg.timeline.get()),
timeline: Arc::clone(&tli),
});
first_time_through = false;
}
@@ -190,6 +197,8 @@ struct ComputeConnectionGuard {
impl Drop for ComputeConnectionGuard {
fn drop(&mut self) {
self.timeline.on_compute_disconnect().unwrap();
if let Err(e) = self.timeline.on_compute_disconnect() {
error!("failed to unregister compute connection: {}", e);
}
}
}

View File

@@ -4,20 +4,21 @@ use std::{thread, time::Duration};
use tracing::*;
use crate::{timeline::GlobalTimelines, SafeKeeperConf};
use crate::{GlobalTimelines, SafeKeeperConf};
pub fn thread_main(conf: SafeKeeperConf) {
let wal_removal_interval = Duration::from_millis(5000);
loop {
let active_tlis = GlobalTimelines::get_active_timelines();
for zttid in &active_tlis {
if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) {
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) {
warn!(
"failed to remove WAL for tenant {} timeline {}: {}",
tli.zttid.tenant_id, tli.zttid.timeline_id, e
);
}
let tlis = GlobalTimelines::get_all();
for tli in &tlis {
if !tli.is_active() {
continue;
}
let zttid = tli.zttid;
let _enter =
info_span!("", tenant = %zttid.tenant_id, timeline = %zttid.timeline_id).entered();
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) {
warn!("failed to remove WAL: {}", e);
}
}
thread::sleep(wal_removal_interval)

View File

@@ -219,7 +219,11 @@ pub struct SafekeeperMemState {
}
impl SafeKeeperState {
pub fn new(zttid: &ZTenantTimelineId, peers: Vec<NodeId>) -> SafeKeeperState {
pub fn new(
zttid: &ZTenantTimelineId,
server_info: ServerInfo,
peers: Vec<NodeId>,
) -> SafeKeeperState {
SafeKeeperState {
tenant_id: zttid.tenant_id,
timeline_id: zttid.timeline_id,
@@ -227,11 +231,7 @@ impl SafeKeeperState {
term: 0,
term_history: TermHistory::empty(),
},
server: ServerInfo {
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
system_id: 0, /* Postgres system identifier */
wal_seg_size: 0,
},
server: server_info,
proposer_uuid: [0; 16],
timeline_start_lsn: Lsn(0),
local_start_lsn: Lsn(0),
@@ -245,7 +245,15 @@ impl SafeKeeperState {
#[cfg(test)]
pub fn empty() -> Self {
SafeKeeperState::new(&ZTenantTimelineId::empty(), vec![])
SafeKeeperState::new(
&ZTenantTimelineId::empty(),
ServerInfo {
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
system_id: 0, /* Postgres system identifier */
wal_seg_size: 0,
},
vec![],
)
}
}
@@ -481,8 +489,12 @@ impl AcceptorProposerMessage {
}
}
/// SafeKeeper which consumes events (messages from compute) and provides
/// replies.
/// Safekeeper implements consensus to reliably persist WAL across nodes.
/// It controls all WAL disk writes and updates of control file.
///
/// Currently safekeeper processes:
/// - messages from compute (proposers) and provides replies
/// - messages from etcd peers
pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
/// Note: be careful to set only if we are sure our WAL (term history) matches
@@ -505,20 +517,20 @@ where
CTRL: control_file::Storage,
WAL: wal_storage::Storage,
{
// constructor
pub fn new(
ztli: ZTimelineId,
state: CTRL,
mut wal_store: WAL,
node_id: NodeId,
) -> Result<SafeKeeper<CTRL, WAL>> {
if state.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.timeline_id {
bail!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.timeline_id);
/// Accepts a control file storage containing the safekeeper state.
/// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
/// and `server` (`wal_seg_size` inside it) fields.
pub fn new(state: CTRL, wal_store: WAL, node_id: NodeId) -> Result<SafeKeeper<CTRL, WAL>> {
if state.tenant_id == ZTenantId::from([0u8; 16])
|| state.timeline_id == ZTimelineId::from([0u8; 16])
{
bail!(
"Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
state.tenant_id,
state.timeline_id
);
}
// initialize wal_store, if state is already initialized
wal_store.init_storage(&state)?;
Ok(SafeKeeper {
global_commit_lsn: state.commit_lsn,
epoch_start_lsn: Lsn(0),
@@ -576,7 +588,7 @@ where
&mut self,
msg: &ProposerGreeting,
) -> Result<Option<AcceptorProposerMessage>> {
/* Check protocol compatibility */
// Check protocol compatibility
if msg.protocol_version != SK_PROTOCOL_VERSION {
bail!(
"incompatible protocol version {}, expected {}",
@@ -584,11 +596,11 @@ where
SK_PROTOCOL_VERSION
);
}
/* Postgres upgrade is not treated as fatal error */
// Postgres upgrade is not treated as fatal error
if msg.pg_version != self.state.server.pg_version
&& self.state.server.pg_version != UNKNOWN_SERVER_VERSION
{
info!(
warn!(
"incompatible server version {}, expected {}",
msg.pg_version, self.state.server.pg_version
);
@@ -607,17 +619,25 @@ where
self.state.timeline_id
);
}
// set basic info about server, if not yet
// TODO: verify that is doesn't change after
{
let mut state = self.state.clone();
state.server.system_id = msg.system_id;
state.server.wal_seg_size = msg.wal_seg_size;
self.state.persist(&state)?;
if self.state.server.wal_seg_size != msg.wal_seg_size {
bail!(
"invalid wal_seg_size, got {}, expected {}",
msg.wal_seg_size,
self.state.server.wal_seg_size
);
}
self.wal_store.init_storage(&self.state)?;
// system_id will be updated on mismatch
if self.state.server.system_id != msg.system_id {
warn!(
"unexpected system ID arrived, got {}, expected {}",
msg.system_id, self.state.server.system_id
);
let mut state = self.state.clone();
state.server.system_id = msg.system_id;
self.state.persist(&state)?;
}
info!(
"processed greeting from proposer {:?}, sending term {:?}",
@@ -667,16 +687,6 @@ where
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
}
/// Bump our term if received a note from elected proposer with higher one
fn bump_if_higher(&mut self, term: Term) -> Result<()> {
if self.state.acceptor_state.term < term {
let mut state = self.state.clone();
state.acceptor_state.term = term;
self.state.persist(&state)?;
}
Ok(())
}
/// Form AppendResponse from current state.
fn append_response(&self) -> AppendResponse {
let ar = AppendResponse {
@@ -693,7 +703,12 @@ where
fn handle_elected(&mut self, msg: &ProposerElected) -> Result<Option<AcceptorProposerMessage>> {
info!("received ProposerElected {:?}", msg);
self.bump_if_higher(msg.term)?;
if self.state.acceptor_state.term < msg.term {
let mut state = self.state.clone();
state.acceptor_state.term = msg.term;
self.state.persist(&state)?;
}
// If our term is higher, ignore the message (next feedback will inform the compute)
if self.state.acceptor_state.term > msg.term {
return Ok(None);
@@ -750,7 +765,7 @@ where
}
/// Advance commit_lsn taking into account what we have locally
pub fn update_commit_lsn(&mut self) -> Result<()> {
fn update_commit_lsn(&mut self) -> Result<()> {
let commit_lsn = min(self.global_commit_lsn, self.flush_lsn());
assert!(commit_lsn >= self.inmem.commit_lsn);
@@ -770,6 +785,11 @@ where
Ok(())
}
/// Persist control file to disk, called only after timeline creation (bootstrap).
pub fn persist(&mut self) -> Result<()> {
self.persist_control_file(self.state.clone())
}
/// Persist in-memory state to the disk, taking other data from state.
fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
state.commit_lsn = self.inmem.commit_lsn;
@@ -920,6 +940,8 @@ where
#[cfg(test)]
mod tests {
use postgres_ffi::WAL_SEGMENT_SIZE;
use super::*;
use crate::wal_storage::Storage;
use std::ops::Deref;
@@ -944,6 +966,14 @@ mod tests {
}
}
fn test_sk_state() -> SafeKeeperState {
let mut state = SafeKeeperState::empty();
state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
state.tenant_id = ZTenantId::from([1u8; 16]);
state.timeline_id = ZTimelineId::from([1u8; 16]);
state
}
struct DummyWalStore {
lsn: Lsn,
}
@@ -953,10 +983,6 @@ mod tests {
self.lsn
}
fn init_storage(&mut self, _state: &SafeKeeperState) -> Result<()> {
Ok(())
}
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
self.lsn = startpos + buf.len() as u64;
Ok(())
@@ -979,12 +1005,10 @@ mod tests {
#[test]
fn test_voting() {
let storage = InMemoryState {
persisted_state: SafeKeeperState::empty(),
persisted_state: test_sk_state(),
};
let wal_store = DummyWalStore { lsn: Lsn(0) };
let ztli = ZTimelineId::from([0u8; 16]);
let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap();
let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
// check voting for 1 is ok
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
@@ -1000,7 +1024,7 @@ mod tests {
persisted_state: state,
};
sk = SafeKeeper::new(ztli, storage, sk.wal_store, NodeId(0)).unwrap();
sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap();
// and ensure voting second time for 1 is not ok
vote_resp = sk.process_msg(&vote_request);
@@ -1013,12 +1037,11 @@ mod tests {
#[test]
fn test_epoch_switch() {
let storage = InMemoryState {
persisted_state: SafeKeeperState::empty(),
persisted_state: test_sk_state(),
};
let wal_store = DummyWalStore { lsn: Lsn(0) };
let ztli = ZTimelineId::from([0u8; 16]);
let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap();
let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
let mut ar_hdr = AppendRequestHeader {
term: 1,

View File

@@ -2,8 +2,9 @@
//! with the "START_REPLICATION" message.
use crate::handler::SafekeeperPostgresHandler;
use crate::timeline::{ReplicaState, Timeline, TimelineTools};
use crate::timeline::{ReplicaState, Timeline};
use crate::wal_storage::WalReader;
use crate::GlobalTimelines;
use anyhow::{bail, Context, Result};
use bytes::Bytes;
@@ -167,8 +168,10 @@ impl ReplicationConn {
) -> Result<()> {
let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap()).entered();
let tli = GlobalTimelines::get(spg.zttid)?;
// spawn the background thread which receives HotStandbyFeedback messages.
let bg_timeline = Arc::clone(spg.timeline.get());
let bg_timeline = Arc::clone(&tli);
let bg_stream_in = self.stream_in.take().unwrap();
let bg_timeline_id = spg.ztimelineid.unwrap();
@@ -201,11 +204,8 @@ impl ReplicationConn {
.build()?;
runtime.block_on(async move {
let (inmem_state, persisted_state) = spg.timeline.get().get_state();
let (inmem_state, persisted_state) = tli.get_state();
// add persisted_state.timeline_start_lsn == Lsn(0) check
if persisted_state.server.wal_seg_size == 0 {
bail!("Cannot start replication before connecting to walproposer");
}
// Walproposer gets special handling: safekeeper must give proposer all
// local WAL till the end, whether committed or not (walproposer will
@@ -217,7 +217,7 @@ impl ReplicationConn {
// on this safekeeper itself. That's ok as (old) proposer will never be
// able to commit such WAL.
let stop_pos: Option<Lsn> = if spg.is_walproposer_recovery() {
let wal_end = spg.timeline.get().get_end_of_wal();
let wal_end = tli.get_flush_lsn();
Some(wal_end)
} else {
None
@@ -231,7 +231,7 @@ impl ReplicationConn {
let mut end_pos = stop_pos.unwrap_or(inmem_state.commit_lsn);
let mut wal_reader = WalReader::new(
spg.conf.timeline_dir(&spg.timeline.get().zttid),
spg.conf.timeline_dir(&tli.zttid),
&persisted_state,
start_pos,
spg.conf.wal_backup_enabled,
@@ -241,7 +241,7 @@ impl ReplicationConn {
let mut send_buf = vec![0u8; MAX_SEND_SIZE];
// watcher for commit_lsn updates
let mut commit_lsn_watch_rx = spg.timeline.get().get_commit_lsn_watch_rx();
let mut commit_lsn_watch_rx = tli.get_commit_lsn_watch_rx();
loop {
if let Some(stop_pos) = stop_pos {
@@ -258,7 +258,7 @@ impl ReplicationConn {
} else {
// TODO: also check once in a while whether we are walsender
// to right pageserver.
if spg.timeline.get().stop_walsender(replica_id)? {
if tli.should_walsender_stop(replica_id) {
// Shut down, timeline is suspended.
// TODO create proper error type for this
bail!("end streaming to {:?}", spg.appname);

View File

@@ -1,21 +1,19 @@
//! This module contains timeline id -> safekeeper state map with file-backed
//! persistence and support for interaction between sending and receiving wal.
//! This module implements Timeline lifecycle management and has all neccessary code
//! to glue together SafeKeeper and all other background services.
use anyhow::{bail, Context, Result};
use anyhow::{bail, Result};
use etcd_broker::subscription_value::SkTimelineInfo;
use once_cell::sync::Lazy;
use postgres_ffi::XLogSegNo;
use serde::Serialize;
use tokio::sync::watch;
use std::cmp::{max, min};
use std::collections::{HashMap, HashSet};
use std::fs::{self};
use std::sync::{Arc, Mutex, MutexGuard};
use parking_lot::{Mutex, MutexGuard};
use std::path::PathBuf;
use tokio::sync::mpsc::Sender;
use tracing::*;
@@ -23,13 +21,13 @@ use tracing::*;
use utils::{
lsn::Lsn,
pq_proto::ReplicationFeedback,
zid::{NodeId, ZTenantId, ZTenantTimelineId},
zid::{NodeId, ZTenantTimelineId},
};
use crate::control_file;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState,
SafekeeperMemState, ServerInfo,
};
use crate::send_wal::HotStandbyFeedback;
@@ -73,7 +71,7 @@ impl ReplicaState {
}
/// Shared state associated with database instance
struct SharedState {
pub struct SharedState {
/// Safekeeper object
sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
/// State of replicas
@@ -95,17 +93,21 @@ struct SharedState {
}
impl SharedState {
/// Initialize timeline state, creating control file
fn create(
/// Initialize fresh timeline state without persisting anything to disk.
fn create_new(
conf: &SafeKeeperConf,
zttid: &ZTenantTimelineId,
peer_ids: Vec<NodeId>,
state: SafeKeeperState,
) -> Result<Self> {
let state = SafeKeeperState::new(zttid, peer_ids);
let control_store = control_file::FileStorage::create_new(zttid, conf, state)?;
if state.server.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(*zttid));
}
let wal_store = wal_storage::PhysicalStorage::new(zttid, conf);
let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?;
// We don't want to write anything to disk, because we may have existing timeline there.
// These functions should not change anything on disk.
let control_store = control_file::FileStorage::create_new(zttid, conf, state)?;
let wal_store = wal_storage::PhysicalStorage::new(zttid, conf, &control_store)?;
let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
Ok(Self {
sk,
@@ -117,16 +119,17 @@ impl SharedState {
})
}
/// Restore SharedState from control file.
/// If file doesn't exist, bails out.
/// Restore SharedState from control file. If file doesn't exist, bails out.
fn restore(conf: &SafeKeeperConf, zttid: &ZTenantTimelineId) -> Result<Self> {
let control_store = control_file::FileStorage::restore_new(zttid, conf)?;
let wal_store = wal_storage::PhysicalStorage::new(zttid, conf);
if control_store.server.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(*zttid));
}
info!("timeline {} restored", zttid.timeline_id);
let wal_store = wal_storage::PhysicalStorage::new(zttid, conf, &control_store)?;
Ok(Self {
sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?,
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
replicas: Vec::new(),
wal_backup_active: false,
active: false,
@@ -134,6 +137,7 @@ impl SharedState {
last_removed_segno: 0,
})
}
fn is_active(&self) -> bool {
self.is_wal_backup_required()
// FIXME: add tracking of relevant pageservers and check them here individually,
@@ -254,48 +258,189 @@ impl SharedState {
}
}
/// Database instance (tenant)
#[derive(Debug, thiserror::Error)]
pub enum TimelineError {
#[error("Timeline {0} was cancelled and cannot be used anymore")]
Cancelled(ZTenantTimelineId),
#[error("Timeline {0} was not found in global map")]
NotFound(ZTenantTimelineId),
#[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
Invalid(ZTenantTimelineId),
#[error("Timeline {0} is already exists")]
AlreadyExists(ZTenantTimelineId),
#[error("Timeline {0} is not initialized, wal_seg_size is zero")]
UninitializedWalSegSize(ZTenantTimelineId),
}
/// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
/// It also holds SharedState and provides mutually exclusive access to it.
pub struct Timeline {
pub zttid: ZTenantTimelineId,
/// Sending here asks for wal backup launcher attention (start/stop
/// offloading). Sending zttid instead of concrete command allows to do
/// sending without timeline lock.
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
/// Used to broadcast commit_lsn updates to all background jobs.
commit_lsn_watch_tx: watch::Sender<Lsn>,
/// For breeding receivers.
commit_lsn_watch_rx: watch::Receiver<Lsn>,
/// Safekeeper and other state, that should remain consistent and synchronized
/// with the disk.
mutex: Mutex<SharedState>,
/// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
cancellation_tx: watch::Sender<bool>,
/// Timeline should not be used after cancellation. Background tasks should
/// monitor this channel and stop eventually after receiving `true` from this channel.
cancellation_rx: watch::Receiver<bool>,
/// Directory where timeline state is stored.
timeline_dir: PathBuf,
}
impl Timeline {
fn new(
/// Load existing timeline from disk.
pub fn load_timeline(
conf: SafeKeeperConf,
zttid: ZTenantTimelineId,
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
shared_state: SharedState,
) -> Timeline {
) -> Result<Timeline> {
let shared_state = SharedState::restore(&conf, &zttid)?;
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
watch::channel(shared_state.sk.inmem.commit_lsn);
Timeline {
watch::channel(shared_state.sk.state.commit_lsn);
let (cancellation_tx, cancellation_rx) = watch::channel(false);
Ok(Timeline {
zttid,
wal_backup_launcher_tx,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
mutex: Mutex::new(shared_state),
cancellation_rx,
cancellation_tx,
timeline_dir: conf.timeline_dir(&zttid),
})
}
/// Create a new timeline, which is not yet persisted to disk.
pub fn create_empty(
conf: SafeKeeperConf,
zttid: ZTenantTimelineId,
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
server_info: ServerInfo,
) -> Result<Timeline> {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
let (cancellation_tx, cancellation_rx) = watch::channel(false);
let state = SafeKeeperState::new(&zttid, server_info, vec![]);
Ok(Timeline {
zttid,
wal_backup_launcher_tx,
commit_lsn_watch_tx,
commit_lsn_watch_rx,
mutex: Mutex::new(SharedState::create_new(&conf, &zttid, state)?),
cancellation_rx,
cancellation_tx,
timeline_dir: conf.timeline_dir(&zttid),
})
}
/// Initialize fresh timeline on disk and start background tasks. If bootstrap
/// fails, timeline is cancelled and cannot be used anymore.
///
/// Bootstrap is transactional, so if it fails, created files will be deleted,
/// and state on disk should remain unchanged.
pub fn bootstrap(&self, shared_state: &mut MutexGuard<SharedState>) -> Result<()> {
match std::fs::metadata(&self.timeline_dir) {
Ok(_) => {
// Timeline directory exists on disk, we should leave state unchanged
// and return error.
bail!(TimelineError::Invalid(self.zttid));
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
return Err(e.into());
}
}
// Create timeline directory.
std::fs::create_dir_all(&self.timeline_dir)?;
// Write timeline to disk and TODO: start background tasks.
match || -> Result<()> {
shared_state.sk.persist()?;
// TODO: add more initialization steps here
Ok(())
}() {
Ok(_) => Ok(()),
Err(e) => {
// Bootstrap failed, cancel timeline and remove timeline directory.
self.cancel();
if let Err(fs_err) = std::fs::remove_dir_all(&self.timeline_dir) {
warn!(
"failed to remove timeline {} directory after bootstrap failure: {}",
self.zttid, fs_err
);
}
Err(e)
}
}
}
/// Delete timeline from disk completely, by removing timeline directory. Background
/// timeline activities will stop eventually.
pub fn delete_from_disk(
&self,
shared_state: &mut MutexGuard<SharedState>,
) -> Result<(bool, bool)> {
let was_active = shared_state.active;
self.cancel();
let dir_existed = delete_dir(&self.timeline_dir)?;
Ok((dir_existed, was_active))
}
/// Cancel timeline to prevent further usage. Background tasks will stop
/// eventually after receiving cancellation signal.
fn cancel(&self) {
info!("Timeline {} is cancelled", self.zttid);
let _ = self.cancellation_tx.send(true);
let res = self.wal_backup_launcher_tx.blocking_send(self.zttid);
if let Err(e) = res {
error!("Failed to send stop signal to wal_backup_launcher: {}", e);
}
}
/// Returns if timeline is cancelled.
pub fn is_cancelled(&self) -> bool {
*self.cancellation_rx.borrow()
}
/// Take a writing mutual exclusive lock on timeline shared_state.
pub fn write_shared_state(&self) -> MutexGuard<SharedState> {
self.mutex.lock()
}
/// Register compute connection, starting timeline-related activity if it is
/// not running yet.
/// Can fail only if channel to a static thread got closed, which is not normal at all.
pub fn on_compute_connect(&self) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.zttid));
}
let is_wal_backup_action_pending: bool;
{
let mut shared_state = self.mutex.lock().unwrap();
let mut shared_state = self.write_shared_state();
shared_state.num_computes += 1;
is_wal_backup_action_pending = shared_state.update_status(self.zttid);
}
// Wake up wal backup launcher, if offloading not started yet.
if is_wal_backup_action_pending {
// Can fail only if channel to a static thread got closed, which is not normal at all.
self.wal_backup_launcher_tx.blocking_send(self.zttid)?;
}
Ok(())
@@ -303,25 +448,33 @@ impl Timeline {
/// De-register compute connection, shutting down timeline activity if
/// pageserver doesn't need catchup.
/// Can fail only if channel to a static thread got closed, which is not normal at all.
pub fn on_compute_disconnect(&self) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.zttid));
}
let is_wal_backup_action_pending: bool;
{
let mut shared_state = self.mutex.lock().unwrap();
let mut shared_state = self.write_shared_state();
shared_state.num_computes -= 1;
is_wal_backup_action_pending = shared_state.update_status(self.zttid);
}
// Wake up wal backup launcher, if it is time to stop the offloading.
if is_wal_backup_action_pending {
// Can fail only if channel to a static thread got closed, which is not normal at all.
self.wal_backup_launcher_tx.blocking_send(self.zttid)?;
}
Ok(())
}
/// Whether we still need this walsender running?
/// Returns true if walsender should stop sending WAL to pageserver.
/// TODO: check this pageserver is actually interested in this timeline.
pub fn stop_walsender(&self, replica_id: usize) -> Result<bool> {
let mut shared_state = self.mutex.lock().unwrap();
pub fn should_walsender_stop(&self, replica_id: usize) -> bool {
if self.is_cancelled() {
return true;
}
let mut shared_state = self.write_shared_state();
if shared_state.num_computes == 0 {
let replica_state = shared_state.replicas[replica_id].unwrap();
let stop = shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
@@ -329,73 +482,65 @@ impl Timeline {
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
if stop {
shared_state.update_status(self.zttid);
return Ok(true);
return true;
}
}
Ok(false)
false
}
/// Returns whether s3 offloading is required and sets current status as
/// matching it.
pub fn wal_backup_attend(&self) -> bool {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.wal_backup_attend()
}
// Can this safekeeper offload to s3? Recently joined safekeepers might not
// have necessary WAL.
pub fn can_wal_backup(&self) -> bool {
self.mutex.lock().unwrap().can_wal_backup()
}
/// Deactivates the timeline, assuming it is being deleted.
/// Returns whether the timeline was already active.
///
/// We assume all threads will stop by themselves eventually (possibly with errors, but no panics).
/// There should be no compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but
/// we're deleting the timeline anyway.
pub async fn deactivate_for_delete(&self) -> Result<bool> {
let was_active: bool;
{
let shared_state = self.mutex.lock().unwrap();
was_active = shared_state.active;
if self.is_cancelled() {
return false;
}
self.wal_backup_launcher_tx.send(self.zttid).await?;
Ok(was_active)
self.write_shared_state().wal_backup_attend()
}
fn is_active(&self) -> bool {
let shared_state = self.mutex.lock().unwrap();
shared_state.active
/// Can this safekeeper offload to s3? Recently joined safekeepers might not
/// have necessary WAL.
pub fn can_wal_backup(&self) -> bool {
if self.is_cancelled() {
return false;
}
let shared_state = self.write_shared_state();
shared_state.can_wal_backup()
}
/// Returns full timeline info, required for the metrics.
/// If the timeline is not active, returns None instead.
/// Returns full timeline info, required for the metrics. If the timeline is
/// not active, returns None instead.
pub fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
let shared_state = self.mutex.lock().unwrap();
if !shared_state.active {
if self.is_cancelled() {
return None;
}
Some(FullTimelineInfo {
zttid: self.zttid,
replicas: shared_state
.replicas
.iter()
.filter_map(|r| r.as_ref())
.copied()
.collect(),
wal_backup_active: shared_state.wal_backup_active,
timeline_is_active: shared_state.active,
num_computes: shared_state.num_computes,
last_removed_segno: shared_state.last_removed_segno,
epoch_start_lsn: shared_state.sk.epoch_start_lsn,
mem_state: shared_state.sk.inmem.clone(),
persisted_state: shared_state.sk.state.clone(),
flush_lsn: shared_state.sk.wal_store.flush_lsn(),
})
let state = self.write_shared_state();
if state.active {
Some(FullTimelineInfo {
zttid: self.zttid,
replicas: state
.replicas
.iter()
.filter_map(|r| r.as_ref())
.copied()
.collect(),
wal_backup_active: state.wal_backup_active,
timeline_is_active: state.active,
num_computes: state.num_computes,
last_removed_segno: state.last_removed_segno,
epoch_start_lsn: state.sk.epoch_start_lsn,
mem_state: state.sk.inmem.clone(),
persisted_state: state.sk.state.clone(),
flush_lsn: state.sk.wal_store.flush_lsn(),
})
} else {
None
}
}
/// Returns commit_lsn watch channel.
pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
self.commit_lsn_watch_rx.clone()
}
@@ -405,10 +550,14 @@ impl Timeline {
&self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.zttid));
}
let mut rmsg: Option<AcceptorProposerMessage>;
let commit_lsn: Lsn;
{
let mut shared_state = self.mutex.lock().unwrap();
let mut shared_state = self.write_shared_state();
rmsg = shared_state.sk.process_msg(msg)?;
// if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
@@ -426,28 +575,46 @@ impl Timeline {
Ok(rmsg)
}
/// Returns wal_seg_size.
pub fn get_wal_seg_size(&self) -> usize {
self.mutex.lock().unwrap().get_wal_seg_size()
self.write_shared_state().get_wal_seg_size()
}
/// Returns true only if the timeline is loaded and active.
pub fn is_active(&self) -> bool {
if self.is_cancelled() {
return false;
}
self.write_shared_state().active
}
/// Returns state of the timeline.
pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
let shared_state = self.mutex.lock().unwrap();
(shared_state.sk.inmem.clone(), shared_state.sk.state.clone())
let state = self.write_shared_state();
(state.sk.inmem.clone(), state.sk.state.clone())
}
/// Returns latest backup_lsn.
pub fn get_wal_backup_lsn(&self) -> Lsn {
self.mutex.lock().unwrap().sk.inmem.backup_lsn
self.write_shared_state().sk.inmem.backup_lsn
}
pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) {
self.mutex.lock().unwrap().sk.inmem.backup_lsn = backup_lsn;
/// Sets backup_lsn to the given value.
pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.zttid));
}
self.write_shared_state().sk.inmem.backup_lsn = backup_lsn;
// we should check whether to shut down offloader, but this will be done
// soon by peer communication anyway.
Ok(())
}
/// Prepare public safekeeper info for reporting.
/// Return public safekeeper info for broadcasting to broker and other peers.
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
let shared_state = self.mutex.lock().unwrap();
let shared_state = self.write_shared_state();
SkTimelineInfo {
last_log_term: Some(shared_state.sk.get_epoch()),
flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()),
@@ -473,12 +640,7 @@ impl Timeline {
let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn;
{
let mut shared_state = self.mutex.lock().unwrap();
// WAL seg size not initialized yet (no message from compute ever
// received), can't do much without it.
if shared_state.get_wal_seg_size() == 0 {
return Ok(());
}
let mut shared_state = self.write_shared_state();
shared_state.sk.record_safekeeper_info(sk_info)?;
is_wal_backup_action_pending = shared_state.update_status(self.zttid);
commit_lsn = shared_state.sk.inmem.commit_lsn;
@@ -491,36 +653,40 @@ impl Timeline {
Ok(())
}
/// Add send_wal replica to the in-memory vector of replicas.
pub fn add_replica(&self, state: ReplicaState) -> usize {
let mut shared_state = self.mutex.lock().unwrap();
shared_state.add_replica(state)
self.write_shared_state().add_replica(state)
}
/// Update replication replica state.
pub fn update_replica_state(&self, id: usize, state: ReplicaState) {
let mut shared_state = self.mutex.lock().unwrap();
let mut shared_state = self.write_shared_state();
shared_state.replicas[id] = Some(state);
}
/// Remove send_wal replica from the in-memory vector of replicas.
pub fn remove_replica(&self, id: usize) {
let mut shared_state = self.mutex.lock().unwrap();
let mut shared_state = self.write_shared_state();
assert!(shared_state.replicas[id].is_some());
shared_state.replicas[id] = None;
}
pub fn get_end_of_wal(&self) -> Lsn {
let shared_state = self.mutex.lock().unwrap();
shared_state.sk.wal_store.flush_lsn()
/// Returns flush_lsn.
pub fn get_flush_lsn(&self) -> Lsn {
self.write_shared_state().sk.wal_store.flush_lsn()
}
/// Delete WAL segments from disk that are no longer needed. This is determined
/// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
pub fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.zttid));
}
let horizon_segno: XLogSegNo;
let remover: Box<dyn Fn(u64) -> Result<(), anyhow::Error>>;
{
let shared_state = self.mutex.lock().unwrap();
// WAL seg size not initialized yet, no WAL exists.
if shared_state.get_wal_seg_size() == 0 {
return Ok(());
}
let shared_state = self.write_shared_state();
horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
remover = shared_state.sk.wal_store.remove_up_to();
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
@@ -528,244 +694,22 @@ impl Timeline {
}
// release the lock before removing
}
let _enter =
info_span!("", tenant = %self.zttid.tenant_id, timeline = %self.zttid.timeline_id)
.entered();
// delete old WAL files
remover(horizon_segno - 1)?;
self.mutex.lock().unwrap().last_removed_segno = horizon_segno;
// update last_removed_segno
let mut shared_state = self.write_shared_state();
shared_state.last_removed_segno = horizon_segno;
Ok(())
}
}
// Utilities needed by various Connection-like objects
pub trait TimelineTools {
fn set(&mut self, conf: &SafeKeeperConf, zttid: ZTenantTimelineId, create: bool) -> Result<()>;
fn get(&self) -> &Arc<Timeline>;
}
impl TimelineTools for Option<Arc<Timeline>> {
fn set(&mut self, conf: &SafeKeeperConf, zttid: ZTenantTimelineId, create: bool) -> Result<()> {
*self = Some(GlobalTimelines::get(conf, zttid, create)?);
Ok(())
}
fn get(&self) -> &Arc<Timeline> {
self.as_ref().unwrap()
}
}
struct GlobalTimelinesState {
timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>,
wal_backup_launcher_tx: Option<Sender<ZTenantTimelineId>>,
}
static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
wal_backup_launcher_tx: None,
})
});
#[derive(Clone, Copy, Serialize)]
pub struct TimelineDeleteForceResult {
pub dir_existed: bool,
pub was_active: bool,
}
/// A zero-sized struct used to manage access to the global timelines map.
pub struct GlobalTimelines;
impl GlobalTimelines {
pub fn init(wal_backup_launcher_tx: Sender<ZTenantTimelineId>) {
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(state.wal_backup_launcher_tx.is_none());
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
}
fn create_internal(
mut state: MutexGuard<GlobalTimelinesState>,
conf: &SafeKeeperConf,
zttid: ZTenantTimelineId,
peer_ids: Vec<NodeId>,
) -> Result<Arc<Timeline>> {
match state.timelines.get(&zttid) {
Some(_) => bail!("timeline {} already exists", zttid),
None => {
// TODO: check directory existence
let dir = conf.timeline_dir(&zttid);
fs::create_dir_all(dir)?;
let shared_state = SharedState::create(conf, &zttid, peer_ids)
.context("failed to create shared state")?;
let new_tli = Arc::new(Timeline::new(
zttid,
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
shared_state,
));
state.timelines.insert(zttid, Arc::clone(&new_tli));
Ok(new_tli)
}
}
}
pub fn create(
conf: &SafeKeeperConf,
zttid: ZTenantTimelineId,
peer_ids: Vec<NodeId>,
) -> Result<Arc<Timeline>> {
let state = TIMELINES_STATE.lock().unwrap();
GlobalTimelines::create_internal(state, conf, zttid, peer_ids)
}
/// Get a timeline with control file loaded from the global TIMELINES_STATE.timelines map.
/// If control file doesn't exist and create=false, bails out.
pub fn get(
conf: &SafeKeeperConf,
zttid: ZTenantTimelineId,
create: bool,
) -> Result<Arc<Timeline>> {
let _enter = info_span!("", timeline = %zttid.timeline_id).entered();
let mut state = TIMELINES_STATE.lock().unwrap();
match state.timelines.get(&zttid) {
Some(result) => Ok(Arc::clone(result)),
None => {
let shared_state = SharedState::restore(conf, &zttid);
let shared_state = match shared_state {
Ok(shared_state) => shared_state,
Err(error) => {
// TODO: always create timeline explicitly
if error
.root_cause()
.to_string()
.contains("No such file or directory")
&& create
{
return GlobalTimelines::create_internal(state, conf, zttid, vec![]);
} else {
return Err(error);
}
}
};
let new_tli = Arc::new(Timeline::new(
zttid,
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
shared_state,
));
state.timelines.insert(zttid, Arc::clone(&new_tli));
Ok(new_tli)
}
}
}
/// Get loaded timeline, if it exists.
pub fn get_loaded(zttid: ZTenantTimelineId) -> Option<Arc<Timeline>> {
let state = TIMELINES_STATE.lock().unwrap();
state.timelines.get(&zttid).map(Arc::clone)
}
/// Get ZTenantTimelineIDs of all active timelines.
pub fn get_active_timelines() -> HashSet<ZTenantTimelineId> {
let state = TIMELINES_STATE.lock().unwrap();
state
.timelines
.iter()
.filter(|&(_, tli)| tli.is_active())
.map(|(zttid, _)| *zttid)
.collect()
}
/// Return FullTimelineInfo for all active timelines.
pub fn active_timelines_metrics() -> Vec<FullTimelineInfo> {
let state = TIMELINES_STATE.lock().unwrap();
state
.timelines
.iter()
.filter_map(|(_, tli)| tli.info_for_metrics())
.collect()
}
fn delete_force_internal(
conf: &SafeKeeperConf,
zttid: &ZTenantTimelineId,
was_active: bool,
) -> Result<TimelineDeleteForceResult> {
match std::fs::remove_dir_all(conf.timeline_dir(zttid)) {
Ok(_) => Ok(TimelineDeleteForceResult {
dir_existed: true,
was_active,
}),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(TimelineDeleteForceResult {
dir_existed: false,
was_active,
}),
Err(e) => Err(e.into()),
}
}
/// Deactivates and deletes the timeline, see `Timeline::deactivate_for_delete()`, the deletes
/// the corresponding data directory.
/// We assume all timeline threads do not care about `GlobalTimelines` not containing the timeline
/// anymore, and they will eventually terminate without panics.
///
/// There are multiple ways the timeline may be accidentally "re-created" (so we end up with two
/// `Timeline` objects in memory):
/// a) a compute node connects after this method is called, or
/// b) an HTTP GET request about the timeline is made and it's able to restore the current state, or
/// c) an HTTP POST request for timeline creation is made after the timeline is already deleted.
/// TODO: ensure all of the above never happens.
pub async fn delete_force(
conf: &SafeKeeperConf,
zttid: &ZTenantTimelineId,
) -> Result<TimelineDeleteForceResult> {
info!("deleting timeline {}", zttid);
let timeline = TIMELINES_STATE.lock().unwrap().timelines.remove(zttid);
let mut was_active = false;
if let Some(tli) = timeline {
was_active = tli.deactivate_for_delete().await?;
}
GlobalTimelines::delete_force_internal(conf, zttid, was_active)
}
/// Deactivates and deletes all timelines for the tenant, see `delete()`.
/// Returns map of all timelines which the tenant had, `true` if a timeline was active.
/// There may be a race if new timelines are created simultaneously.
pub async fn delete_force_all_for_tenant(
conf: &SafeKeeperConf,
tenant_id: &ZTenantId,
) -> Result<HashMap<ZTenantTimelineId, TimelineDeleteForceResult>> {
info!("deleting all timelines for tenant {}", tenant_id);
let mut to_delete = HashMap::new();
{
// Keep mutex in this scope.
let timelines = &mut TIMELINES_STATE.lock().unwrap().timelines;
for (&zttid, tli) in timelines.iter() {
if zttid.tenant_id == *tenant_id {
to_delete.insert(zttid, tli.clone());
}
}
// TODO: test that the correct subset of timelines is removed. It's complicated because they are implicitly created currently.
timelines.retain(|zttid, _| !to_delete.contains_key(zttid));
}
let mut deleted = HashMap::new();
for (zttid, timeline) in to_delete {
let was_active = timeline.deactivate_for_delete().await?;
deleted.insert(
zttid,
GlobalTimelines::delete_force_internal(conf, &zttid, was_active)?,
);
}
// There may be inactive timelines, so delete the whole tenant dir as well.
match std::fs::remove_dir_all(conf.tenant_dir(tenant_id)) {
Ok(_) => (),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => (),
e => e?,
};
Ok(deleted)
/// Deletes directory and it's contents. Returns false if directory does not exist.
fn delete_dir(path: &PathBuf) -> Result<bool> {
match std::fs::remove_dir_all(path) {
Ok(_) => Ok(true),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
Err(e) => Err(e.into()),
}
}

View File

@@ -0,0 +1,348 @@
//! This module contains global (tenant_id, timeline_id) -> Arc<Timeline> mapping.
//! All timelines should always be present in this map, this is done by loading them
//! all from the disk on startup and keeping them in memory.
use crate::safekeeper::ServerInfo;
use crate::timeline::{Timeline, TimelineError};
use crate::SafeKeeperConf;
use anyhow::{anyhow, bail, Context, Result};
use once_cell::sync::Lazy;
use serde::Serialize;
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex, MutexGuard};
use tokio::sync::mpsc::Sender;
use tracing::*;
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
struct GlobalTimelinesState {
timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>,
wal_backup_launcher_tx: Option<Sender<ZTenantTimelineId>>,
conf: SafeKeeperConf,
}
impl GlobalTimelinesState {
/// Get dependencies for a timeline constructor.
fn get_dependencies(&self) -> (SafeKeeperConf, Sender<ZTenantTimelineId>) {
(
self.conf.clone(),
self.wal_backup_launcher_tx.as_ref().unwrap().clone(),
)
}
/// Insert timeline into the map. Returns error if timeline with the same id already exists.
fn try_insert(&mut self, timeline: Arc<Timeline>) -> Result<()> {
let zttid = timeline.zttid;
if self.timelines.contains_key(&zttid) {
bail!(TimelineError::AlreadyExists(zttid));
}
self.timelines.insert(zttid, timeline);
Ok(())
}
/// Get timeline from the map. Returns error if timeline doesn't exist.
fn get(&self, zttid: &ZTenantTimelineId) -> Result<Arc<Timeline>> {
self.timelines
.get(zttid)
.cloned()
.ok_or_else(|| anyhow!(TimelineError::NotFound(*zttid)))
}
}
static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
wal_backup_launcher_tx: None,
conf: SafeKeeperConf::default(),
})
});
/// A zero-sized struct used to manage access to the global timelines map.
pub struct GlobalTimelines;
impl GlobalTimelines {
/// Inject dependencies needed for the timeline constructors and load all timelines to memory.
pub fn init(
conf: SafeKeeperConf,
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
) -> Result<()> {
let mut state = TIMELINES_STATE.lock().unwrap();
assert!(state.wal_backup_launcher_tx.is_none());
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
state.conf = conf;
// Iterate through all directories and load tenants for all directories
// named as a valid tenant_id.
let mut tenant_count = 0;
let tenants_dir = state.conf.workdir.clone();
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
.with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))?
{
match &tenants_dir_entry {
Ok(tenants_dir_entry) => {
if let Ok(tenant_id) =
ZTenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
{
tenant_count += 1;
GlobalTimelines::load_tenant_timelines(&mut state, tenant_id)?;
}
}
Err(e) => error!(
"failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
tenants_dir_entry,
tenants_dir.display(),
e
),
}
}
info!(
"found {} tenants directories, successfully loaded {} timelines",
tenant_count,
state.timelines.len()
);
Ok(())
}
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir errors if any.
fn load_tenant_timelines(
state: &mut MutexGuard<GlobalTimelinesState>,
tenant_id: ZTenantId,
) -> Result<()> {
let timelines_dir = state.conf.tenant_dir(&tenant_id);
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
.with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))?
{
match &timelines_dir_entry {
Ok(timeline_dir_entry) => {
if let Ok(timeline_id) =
ZTimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{
let zttid = ZTenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(
state.conf.clone(),
zttid,
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
) {
Ok(timeline) => {
state.timelines.insert(zttid, Arc::new(timeline));
}
// If we can't load a timeline, it's most likely because of a corrupted
// directory. We will log an error and won't allow to delete/recreate
// this timeline. The only way to fix this timeline is to repair manually
// and restart the safekeeper.
Err(e) => error!(
"failed to load timeline {} for tenant {}, reason: {:?}",
timeline_id, tenant_id, e
),
}
}
}
Err(e) => error!(
"failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
timelines_dir_entry,
timelines_dir.display(),
e
),
}
}
Ok(())
}
/// Create a new timeline with the given id. If the timeline already exists, returns
/// an existing timeline.
pub fn create(zttid: ZTenantTimelineId, server_info: ServerInfo) -> Result<Arc<Timeline>> {
let (conf, wal_backup_launcher_tx) = {
let state = TIMELINES_STATE.lock().unwrap();
if let Ok(timeline) = state.get(&zttid) {
// Timeline already exists, return it.
return Ok(timeline);
}
state.get_dependencies()
};
info!("creating new timeline {}", zttid);
let timeline = Arc::new(Timeline::create_empty(
conf,
zttid,
wal_backup_launcher_tx,
server_info,
)?);
// Take a lock and finish the initialization holding this mutex. No other threads
// can interfere with creation after we will insert timeline into the map.
let mut shared_state = timeline.write_shared_state();
// We can get a race condition here in case of concurrent create calls, but only
// in theory. create() will return valid timeline on the next try.
TIMELINES_STATE
.lock()
.unwrap()
.try_insert(timeline.clone())?;
// Write the new timeline to the disk and start background workers.
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
// and the state on disk should remain unchanged.
match timeline.bootstrap(&mut shared_state) {
Ok(_) => {
// We are done with bootstrap, release the lock, return the timeline.
drop(shared_state);
Ok(timeline)
}
Err(e) => {
// Note: the most likely reason for bootstrap failure is that the timeline
// directory already exists on disk. This happens when timeline is corrupted
// and wasn't loaded from disk on startup because of that. We want to preserve
// the timeline directory in this case, for further inspection.
// TODO: this is an unusual error, perhaps we should send it to sentry
// TODO: compute will try to create timeline every second, we should add backoff
error!("failed to bootstrap timeline {}: {}", zttid, e);
// Timeline failed to bootstrap, it cannot be used. Remove it from the map.
TIMELINES_STATE.lock().unwrap().timelines.remove(&zttid);
Err(e)
}
}
}
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
/// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
/// i.e. loaded in memory and not cancelled.
pub fn get(zttid: ZTenantTimelineId) -> Result<Arc<Timeline>> {
let res = TIMELINES_STATE.lock().unwrap().get(&zttid);
match res {
Ok(tli) => {
if tli.is_cancelled() {
anyhow::bail!(TimelineError::Cancelled(zttid));
}
Ok(tli)
}
Err(e) => Err(e),
}
}
/// Returns all timelines. This is used for background timeline proccesses.
pub fn get_all() -> Vec<Arc<Timeline>> {
let global_lock = TIMELINES_STATE.lock().unwrap();
global_lock
.timelines
.values()
.cloned()
.filter(|t| !t.is_cancelled())
.collect()
}
/// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
/// and that's why it can return cancelled timelines, to retry deleting them.
fn get_all_for_tenant(tenant_id: ZTenantId) -> Vec<Arc<Timeline>> {
let global_lock = TIMELINES_STATE.lock().unwrap();
global_lock
.timelines
.values()
.filter(|t| t.zttid.tenant_id == tenant_id)
.cloned()
.collect()
}
/// Cancels timeline, then deletes the corresponding data directory.
pub fn delete_force(zttid: &ZTenantTimelineId) -> Result<TimelineDeleteForceResult> {
let tli_res = TIMELINES_STATE.lock().unwrap().get(zttid);
match tli_res {
Ok(timeline) => {
// Take a lock and finish the deletion holding this mutex.
let mut shared_state = timeline.write_shared_state();
info!("deleting timeline {}", zttid);
let (dir_existed, was_active) = timeline.delete_from_disk(&mut shared_state)?;
// Remove timeline from the map.
TIMELINES_STATE.lock().unwrap().timelines.remove(zttid);
Ok(TimelineDeleteForceResult {
dir_existed,
was_active,
})
}
Err(_) => {
// Timeline is not memory, but it may still exist on disk in broken state.
let dir_path = TIMELINES_STATE.lock().unwrap().conf.timeline_dir(zttid);
let dir_existed = delete_dir(dir_path)?;
Ok(TimelineDeleteForceResult {
dir_existed,
was_active: false,
})
}
}
}
/// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
/// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
/// created simultaneously. In that case the function will return error and the caller should
/// retry tenant deletion again later.
pub fn delete_force_all_for_tenant(
tenant_id: &ZTenantId,
) -> Result<HashMap<ZTenantTimelineId, TimelineDeleteForceResult>> {
info!("deleting all timelines for tenant {}", tenant_id);
let to_delete = Self::get_all_for_tenant(*tenant_id);
let mut err = None;
let mut deleted = HashMap::new();
for tli in &to_delete {
match Self::delete_force(&tli.zttid) {
Ok(result) => {
deleted.insert(tli.zttid, result);
}
Err(e) => {
error!("failed to delete timeline {}: {}", tli.zttid, e);
// Save error to return later.
err = Some(e);
}
}
}
// If there was an error, return it.
if let Some(e) = err {
return Err(e);
}
// There may be broken timelines on disk, so delete the whole tenant dir as well.
// Note that we could concurrently create new timelines while we were deleting them,
// so the directory may be not empty. In this case timelines will have bad state
// and timeline background jobs can panic.
delete_dir(TIMELINES_STATE.lock().unwrap().conf.tenant_dir(tenant_id))?;
let tlis_after_delete = Self::get_all_for_tenant(*tenant_id);
if !tlis_after_delete.is_empty() {
// Some timelines were created while we were deleting them, returning error
// to the caller, so it can retry later.
bail!(
"failed to delete all timelines for tenant {}: some timelines were created while we were deleting them",
tenant_id
);
}
Ok(deleted)
}
}
#[derive(Clone, Copy, Serialize)]
pub struct TimelineDeleteForceResult {
pub dir_existed: bool,
pub was_active: bool,
}
/// Deletes directory and it's contents. Returns false if directory does not exist.
fn delete_dir(path: PathBuf) -> Result<bool> {
match std::fs::remove_dir_all(path) {
Ok(_) => Ok(true),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
Err(e) => Err(e.into()),
}
}

View File

@@ -26,8 +26,8 @@ use tracing::*;
use utils::{lsn::Lsn, zid::ZTenantTimelineId};
use crate::broker::{Election, ElectionLeader};
use crate::timeline::{GlobalTimelines, Timeline};
use crate::{broker, SafeKeeperConf};
use crate::timeline::Timeline;
use crate::{broker, GlobalTimelines, SafeKeeperConf};
use once_cell::sync::OnceCell;
@@ -54,7 +54,9 @@ pub fn wal_backup_launcher_thread_main(
/// Check whether wal backup is required for timeline. If yes, mark that launcher is
/// aware of current status and return the timeline.
fn is_wal_backup_required(zttid: ZTenantTimelineId) -> Option<Arc<Timeline>> {
GlobalTimelines::get_loaded(zttid).filter(|t| t.wal_backup_attend())
GlobalTimelines::get(zttid)
.ok()
.filter(|tli| tli.wal_backup_attend())
}
struct WalBackupTaskHandle {
@@ -191,7 +193,8 @@ struct WalBackupTask {
election: Election,
}
/// Offload single timeline.
/// Offload single timeline. Called only after we checked that backup
/// is required (wal_backup_attend) and possible (can_wal_backup).
async fn backup_task_main(
zttid: ZTenantTimelineId,
timeline_dir: PathBuf,
@@ -199,18 +202,17 @@ async fn backup_task_main(
election: Election,
) {
info!("started");
let timeline: Arc<Timeline> = if let Some(tli) = GlobalTimelines::get_loaded(zttid) {
tli
} else {
/* Timeline could get deleted while task was starting, just exit then. */
info!("no timeline, exiting");
let res = GlobalTimelines::get(zttid);
if let Err(e) = res {
error!("backup error for timeline {}: {}", zttid, e);
return;
};
}
let tli = res.unwrap();
let mut wb = WalBackupTask {
wal_seg_size: timeline.get_wal_seg_size(),
commit_lsn_watch_rx: timeline.get_commit_lsn_watch_rx(),
timeline,
wal_seg_size: tli.get_wal_seg_size(),
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
timeline: tli,
timeline_dir,
leader: None,
election,
@@ -322,7 +324,11 @@ impl WalBackupTask {
{
Ok(backup_lsn_result) => {
backup_lsn = backup_lsn_result;
self.timeline.set_wal_backup_lsn(backup_lsn_result);
let res = self.timeline.set_wal_backup_lsn(backup_lsn_result);
if let Err(e) = res {
error!("backup error: {}", e);
return;
}
retry_attempt = 0;
}
Err(e) => {

View File

@@ -7,7 +7,7 @@
//!
//! Note that last file has `.partial` suffix, that's different from postgres.
use anyhow::{anyhow, bail, Context, Result};
use anyhow::{bail, Context, Result};
use std::io::{self, Seek, SeekFrom};
use std::pin::Pin;
use tokio::io::AsyncRead;
@@ -17,7 +17,7 @@ use postgres_ffi::v14::xlog_utils::{
find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName,
};
use postgres_ffi::{XLogSegNo, PG_TLI};
use std::cmp::min;
use std::cmp::{max, min};
use std::fs::{self, remove_file, File, OpenOptions};
use std::io::Write;
@@ -101,9 +101,6 @@ pub trait Storage {
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;
/// Init storage with wal_seg_size and read WAL from disk to get latest LSN.
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()>;
/// Write piece of WAL from buf to disk, but not necessarily sync it.
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
@@ -119,7 +116,7 @@ pub trait Storage {
}
/// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
/// for better performance. Storage must be initialized before use.
/// for better performance. Storage is initialized in the constructor.
///
/// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
/// its filename and may be not fully flushed.
@@ -127,16 +124,14 @@ pub trait Storage {
/// Relationship of LSNs:
/// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
///
/// When storage is just created, all LSNs are zeroes and there are no segments on disk.
/// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
pub struct PhysicalStorage {
metrics: WalStorageMetrics,
zttid: ZTenantTimelineId,
timeline_dir: PathBuf,
conf: SafeKeeperConf,
// fields below are filled upon initialization
/// None if uninitialized, Some(usize) if storage is initialized.
wal_seg_size: Option<usize>,
/// Size of WAL segment in bytes.
wal_seg_size: usize,
/// Written to disk, but possibly still in the cache and not fully persisted.
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
@@ -161,25 +156,47 @@ pub struct PhysicalStorage {
}
impl PhysicalStorage {
pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> PhysicalStorage {
/// Create new storage. If commit_lsn is not zero, flush_lsn is tried to be restored from
/// the disk. Otherwise, all LSNs are set to zero.
pub fn new(
zttid: &ZTenantTimelineId,
conf: &SafeKeeperConf,
state: &SafeKeeperState,
) -> Result<PhysicalStorage> {
let timeline_dir = conf.timeline_dir(zttid);
PhysicalStorage {
let wal_seg_size = state.server.wal_seg_size as usize;
// Find out where stored WAL ends, starting at commit_lsn which is a
// known recent record boundary (unless we don't have WAL at all).
let write_lsn = if state.commit_lsn == Lsn(0) {
Lsn(0)
} else {
find_end_of_wal(&timeline_dir, wal_seg_size, state.commit_lsn)?
};
// TODO: do we really know that write_lsn is fully flushed to disk?
// If not, maybe it's better to call fsync() here to be sure?
let flush_lsn = write_lsn;
info!(
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
zttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
);
if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", zttid.timeline_id);
}
Ok(PhysicalStorage {
metrics: WalStorageMetrics::new(zttid),
zttid: *zttid,
timeline_dir,
conf: conf.clone(),
wal_seg_size: None,
write_lsn: Lsn(0),
write_record_lsn: Lsn(0),
flush_record_lsn: Lsn(0),
decoder: WalStreamDecoder::new(Lsn(0)),
wal_seg_size,
write_lsn,
write_record_lsn: write_lsn,
flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(write_lsn),
file: None,
}
}
/// Wrapper for flush_lsn updates that also updates metrics.
fn update_flush_lsn(&mut self) {
self.flush_record_lsn = self.write_record_lsn;
})
}
/// Call fdatasync if config requires so.
@@ -204,9 +221,9 @@ impl PhysicalStorage {
/// Open or create WAL segment file. Caller must call seek to the wanted position.
/// Returns `file` and `is_partial`.
fn open_or_create(&self, segno: XLogSegNo, wal_seg_size: usize) -> Result<(File, bool)> {
fn open_or_create(&self, segno: XLogSegNo) -> Result<(File, bool)> {
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
// Try to open already completed segment
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
@@ -222,24 +239,18 @@ impl PhysicalStorage {
.open(&wal_file_partial_path)
.with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
write_zeroes(&mut file, wal_seg_size)?;
write_zeroes(&mut file, self.wal_seg_size)?;
self.fsync_file(&mut file)?;
Ok((file, true))
}
}
/// Write WAL bytes, which are known to be located in a single WAL segment.
fn write_in_segment(
&mut self,
segno: u64,
xlogoff: usize,
buf: &[u8],
wal_seg_size: usize,
) -> Result<()> {
fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
let mut file = if let Some(file) = self.file.take() {
file
} else {
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
let (mut file, is_partial) = self.open_or_create(segno)?;
assert!(is_partial, "unexpected write into non-partial segment file");
file.seek(SeekFrom::Start(xlogoff as u64))?;
file
@@ -247,13 +258,13 @@ impl PhysicalStorage {
file.write_all(buf)?;
if xlogoff + buf.len() == wal_seg_size {
if xlogoff + buf.len() == self.wal_seg_size {
// If we reached the end of a WAL segment, flush and close it.
self.fdatasync_file(&mut file)?;
// Rename partial file to completed file
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(&wal_file_partial_path, &wal_file_path)?;
} else {
// otherwise, file can be reused later
@@ -269,10 +280,6 @@ impl PhysicalStorage {
///
/// Updates `write_lsn`.
fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
let wal_seg_size = self
.wal_seg_size
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
if self.write_lsn != pos {
// need to flush the file before discarding it
if let Some(mut file) = self.file.take() {
@@ -284,17 +291,17 @@ impl PhysicalStorage {
while !buf.is_empty() {
// Extract WAL location for this block
let xlogoff = self.write_lsn.segment_offset(wal_seg_size) as usize;
let segno = self.write_lsn.segment_number(wal_seg_size);
let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size) as usize;
let segno = self.write_lsn.segment_number(self.wal_seg_size);
// If crossing a WAL boundary, only write up until we reach wal segment size.
let bytes_write = if xlogoff + buf.len() > wal_seg_size {
wal_seg_size - xlogoff
let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
self.wal_seg_size - xlogoff
} else {
buf.len()
};
self.write_in_segment(segno, xlogoff, &buf[..bytes_write], wal_seg_size)?;
self.write_in_segment(segno, xlogoff, &buf[..bytes_write])?;
self.write_lsn += bytes_write as u64;
buf = &buf[bytes_write..];
}
@@ -309,53 +316,6 @@ impl Storage for PhysicalStorage {
self.flush_record_lsn
}
/// Storage needs to know wal_seg_size to know which segment to read/write, but
/// wal_seg_size is not always known at the moment of storage creation. This method
/// allows to postpone its initialization.
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()> {
if state.server.wal_seg_size == 0 {
// wal_seg_size is still unknown. This is dead path normally, should
// be used only in tests.
return Ok(());
}
if let Some(wal_seg_size) = self.wal_seg_size {
// physical storage is already initialized
assert_eq!(wal_seg_size, state.server.wal_seg_size as usize);
return Ok(());
}
// initialize physical storage
let wal_seg_size = state.server.wal_seg_size as usize;
self.wal_seg_size = Some(wal_seg_size);
// Find out where stored WAL ends, starting at commit_lsn which is a
// known recent record boundary (unless we don't have WAL at all).
self.write_lsn = if state.commit_lsn == Lsn(0) {
Lsn(0)
} else {
find_end_of_wal(&self.timeline_dir, wal_seg_size, state.commit_lsn)?
};
self.write_record_lsn = self.write_lsn;
// TODO: do we really know that write_lsn is fully flushed to disk?
// If not, maybe it's better to call fsync() here to be sure?
self.update_flush_lsn();
info!(
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
self.zttid.timeline_id, self.flush_record_lsn, state.commit_lsn, state.peer_horizon_lsn,
);
if self.flush_record_lsn < state.commit_lsn
|| self.flush_record_lsn < state.peer_horizon_lsn
{
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", self.zttid.timeline_id);
}
Ok(())
}
/// Write WAL to disk.
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
// Disallow any non-sequential writes, which can result in gaps or overwrites.
@@ -419,80 +379,83 @@ impl Storage for PhysicalStorage {
// We have unflushed data (write_lsn != flush_lsn), but no file.
// This should only happen if last file was fully written and flushed,
// but haven't updated flush_lsn yet.
assert!(self.write_lsn.segment_offset(self.wal_seg_size.unwrap()) == 0);
if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
bail!(
"unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
self.write_lsn,
self.flush_record_lsn
);
}
}
// everything is flushed now, let's update flush_lsn
self.update_flush_lsn();
self.flush_record_lsn = self.write_record_lsn;
Ok(())
}
/// Truncate written WAL by removing all WAL segments after the given LSN.
/// end_pos must point to the end of the WAL record.
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
let wal_seg_size = self
.wal_seg_size
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
// Streaming must not create a hole, so truncate cannot be called on non-written lsn
assert!(self.write_lsn == Lsn(0) || self.write_lsn >= end_pos);
if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
bail!(
"truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
self.write_lsn,
end_pos
);
}
// Close previously opened file, if any
if let Some(mut unflushed_file) = self.file.take() {
self.fdatasync_file(&mut unflushed_file)?;
}
let xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
let segno = end_pos.segment_number(wal_seg_size);
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
let xlogoff = end_pos.segment_offset(self.wal_seg_size) as usize;
let segno = end_pos.segment_number(self.wal_seg_size);
// Remove all segments after the given LSN.
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno)?;
let (mut file, is_partial) = self.open_or_create(segno)?;
// Fill end with zeroes
file.seek(SeekFrom::Start(xlogoff as u64))?;
write_zeroes(&mut file, wal_seg_size - xlogoff)?;
write_zeroes(&mut file, self.wal_seg_size - xlogoff)?;
self.fdatasync_file(&mut file)?;
if !is_partial {
// Make segment partial once again
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
fs::rename(&wal_file_path, &wal_file_partial_path)?;
}
// Remove all subsequent segments
let mut segno = segno;
loop {
segno += 1;
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
// TODO: better use fs::try_exists which is currently available only in nightly build
if wal_file_path.exists() {
fs::remove_file(&wal_file_path)?;
} else if wal_file_partial_path.exists() {
fs::remove_file(&wal_file_partial_path)?;
} else {
break;
}
}
// Update LSNs
self.write_lsn = end_pos;
self.write_record_lsn = end_pos;
self.update_flush_lsn();
self.flush_record_lsn = end_pos;
Ok(())
}
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
let timeline_dir = self.timeline_dir.clone();
let wal_seg_size = self.wal_seg_size.unwrap();
let wal_seg_size = self.wal_seg_size;
Box::new(move |segno_up_to: XLogSegNo| {
remove_up_to(&timeline_dir, wal_seg_size, segno_up_to)
remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to)
})
}
}
/// Remove all WAL segments in timeline_dir <= given segno.
fn remove_up_to(timeline_dir: &Path, wal_seg_size: usize, segno_up_to: XLogSegNo) -> Result<()> {
/// Remove all WAL segments in timeline_dir that match the given predicate.
fn remove_segments_from_disk(
timeline_dir: &Path,
wal_seg_size: usize,
remove_predicate: impl Fn(XLogSegNo) -> bool,
) -> Result<()> {
let mut n_removed = 0;
let mut min_removed = u64::MAX;
let mut max_removed = u64::MIN;
for entry in fs::read_dir(&timeline_dir)? {
let entry = entry?;
let entry_path = entry.path();
@@ -504,19 +467,21 @@ fn remove_up_to(timeline_dir: &Path, wal_seg_size: usize, segno_up_to: XLogSegNo
continue;
}
let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
if segno <= segno_up_to {
if remove_predicate(segno) {
remove_file(entry_path)?;
n_removed += 1;
min_removed = min(min_removed, segno);
max_removed = max(max_removed, segno);
}
}
}
let segno_from = segno_up_to - n_removed + 1;
info!(
"removed {} WAL segments [{}; {}]",
n_removed,
XLogFileName(PG_TLI, segno_from, wal_seg_size),
XLogFileName(PG_TLI, segno_up_to, wal_seg_size)
);
if n_removed > 0 {
info!(
"removed {} WAL segments [{}; {}]",
n_removed, min_removed, max_removed
);
}
Ok(())
}
@@ -526,8 +491,10 @@ pub struct WalReader {
pos: Lsn,
wal_segment: Option<Pin<Box<dyn AsyncRead>>>,
enable_remote_read: bool,
// S3 will be used to read WAL if LSN is not available locally
enable_remote_read: bool,
// We don't have WAL locally if LSN is less than local_start_lsn
local_start_lsn: Lsn,
}