mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-03 10:40:37 +00:00
Compare commits
5 Commits
erik/commu
...
arthur/tli
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b735e92546 | ||
|
|
c71f637d65 | ||
|
|
780e44c07b | ||
|
|
f8aecd53cd | ||
|
|
0faf0e92ec |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -2721,6 +2721,7 @@ dependencies = [
|
||||
"hyper",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres_ffi",
|
||||
@@ -2731,6 +2732,7 @@ dependencies = [
|
||||
"serde_with",
|
||||
"signal-hook",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"toml_edit",
|
||||
|
||||
@@ -429,8 +429,22 @@ impl PostgresBackend {
|
||||
// full cause of the error, not just the top-level context + its trace.
|
||||
// We don't want to send that in the ErrorResponse though,
|
||||
// because it's not relevant to the compute node logs.
|
||||
error!("query handler for '{}' failed: {:?}", query_string, e);
|
||||
self.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?;
|
||||
//
|
||||
// We also don't want to log full stacktrace when the error is primitive,
|
||||
// such as usual connection closed.
|
||||
let short_error = format!("{:#}", e);
|
||||
let root_cause = e.root_cause().to_string();
|
||||
if root_cause.contains("connection closed unexpectedly")
|
||||
|| root_cause.contains("Broken pipe (os error 32)")
|
||||
{
|
||||
error!(
|
||||
"query handler for '{}' failed: {}",
|
||||
query_string, short_error
|
||||
);
|
||||
} else {
|
||||
error!("query handler for '{}' failed: {:?}", query_string, e);
|
||||
}
|
||||
self.write_message_noflush(&BeMessage::ErrorResponse(&short_error))?;
|
||||
// TODO: untangle convoluted control flow
|
||||
if e.to_string().contains("failed to run") {
|
||||
return Ok(ProcessMsgResult::Break);
|
||||
|
||||
@@ -30,6 +30,8 @@ git-version = "0.3.5"
|
||||
async-trait = "0.1"
|
||||
once_cell = "1.13.0"
|
||||
toml_edit = { version = "0.13", features = ["easy"] }
|
||||
thiserror = "1"
|
||||
parking_lot = "0.12.1"
|
||||
|
||||
postgres_ffi = { path = "../libs/postgres_ffi" }
|
||||
metrics = { path = "../libs/metrics" }
|
||||
|
||||
@@ -24,9 +24,9 @@ use safekeeper::defaults::{
|
||||
};
|
||||
use safekeeper::http;
|
||||
use safekeeper::remove_wal;
|
||||
use safekeeper::timeline::GlobalTimelines;
|
||||
use safekeeper::wal_backup;
|
||||
use safekeeper::wal_service;
|
||||
use safekeeper::GlobalTimelines;
|
||||
use safekeeper::SafeKeeperConf;
|
||||
use utils::auth::JwtAuth;
|
||||
use utils::{
|
||||
@@ -298,7 +298,9 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
||||
let signals = signals::install_shutdown_handlers()?;
|
||||
let mut threads = vec![];
|
||||
let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100);
|
||||
GlobalTimelines::init(wal_backup_launcher_tx);
|
||||
|
||||
// Load all timelines from disk to memory.
|
||||
GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?;
|
||||
|
||||
let conf_ = conf.clone();
|
||||
threads.push(
|
||||
|
||||
@@ -10,6 +10,7 @@ use etcd_broker::LeaseKeeper;
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::time::Duration;
|
||||
use tokio::spawn;
|
||||
use tokio::task::JoinHandle;
|
||||
@@ -17,7 +18,8 @@ use tokio::{runtime, time::sleep};
|
||||
use tracing::*;
|
||||
use url::Url;
|
||||
|
||||
use crate::{timeline::GlobalTimelines, SafeKeeperConf};
|
||||
use crate::GlobalTimelines;
|
||||
use crate::SafeKeeperConf;
|
||||
use etcd_broker::{
|
||||
subscription_key::{OperationKind, SkOperationKind, SubscriptionKey},
|
||||
Client, PutOptions,
|
||||
@@ -210,11 +212,15 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
// is under plain mutex. That's ok, all this code is not performance
|
||||
// sensitive and there is no risk of deadlock as we don't await while
|
||||
// lock is held.
|
||||
let active_tlis = GlobalTimelines::get_active_timelines();
|
||||
let mut active_tlis = GlobalTimelines::get_all();
|
||||
active_tlis.retain(|tli| tli.is_active());
|
||||
|
||||
let active_tlis_set: HashSet<ZTenantTimelineId> =
|
||||
active_tlis.iter().map(|tli| tli.zttid).collect();
|
||||
|
||||
// // Get and maintain (if not yet) per timeline lease to automatically delete obsolete data.
|
||||
for zttid in active_tlis.iter() {
|
||||
if let Entry::Vacant(v) = leases.entry(*zttid) {
|
||||
for tli in &active_tlis {
|
||||
if let Entry::Vacant(v) = leases.entry(tli.zttid) {
|
||||
let lease = client.lease_grant(LEASE_TTL_SEC, None).await?;
|
||||
let (keeper, ka_stream) = client.lease_keep_alive(lease.id()).await?;
|
||||
v.insert(Lease {
|
||||
@@ -224,12 +230,11 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
});
|
||||
}
|
||||
}
|
||||
leases.retain(|zttid, _| active_tlis.contains(zttid));
|
||||
leases.retain(|zttid, _| active_tlis_set.contains(zttid));
|
||||
|
||||
// Push data concurrently to not suffer from latency, with many timelines it can be slow.
|
||||
let handles = active_tlis
|
||||
.iter()
|
||||
.filter_map(|zttid| GlobalTimelines::get_loaded(*zttid))
|
||||
.map(|tli| {
|
||||
let sk_info = tli.get_public_info(&conf);
|
||||
let key = timeline_safekeeper_path(
|
||||
@@ -279,7 +284,7 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
|
||||
match subscription.value_updates.recv().await {
|
||||
Some(new_info) => {
|
||||
// note: there are blocking operations below, but it's considered fine for now
|
||||
if let Ok(tli) = GlobalTimelines::get(&conf, new_info.key.id, false) {
|
||||
if let Ok(tli) = GlobalTimelines::get(new_info.key.id) {
|
||||
tli.record_safekeeper_info(&new_info.value, new_info.key.node_id)
|
||||
.await?
|
||||
}
|
||||
|
||||
@@ -9,8 +9,6 @@ use std::io::{Read, Write};
|
||||
use std::ops::Deref;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use tracing::*;
|
||||
|
||||
use crate::control_file_upgrade::upgrade_control_file;
|
||||
use crate::safekeeper::{SafeKeeperState, SK_FORMAT_VERSION, SK_MAGIC};
|
||||
use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS};
|
||||
@@ -55,6 +53,7 @@ pub struct FileStorage {
|
||||
}
|
||||
|
||||
impl FileStorage {
|
||||
/// Initialize storage by loading state from disk.
|
||||
pub fn restore_new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> Result<FileStorage> {
|
||||
let timeline_dir = conf.timeline_dir(zttid);
|
||||
let tenant_id = zttid.tenant_id.to_string();
|
||||
@@ -71,6 +70,7 @@ impl FileStorage {
|
||||
})
|
||||
}
|
||||
|
||||
/// Create file storage for a new timeline, but don't persist it yet.
|
||||
pub fn create_new(
|
||||
zttid: &ZTenantTimelineId,
|
||||
conf: &SafeKeeperConf,
|
||||
@@ -80,19 +80,18 @@ impl FileStorage {
|
||||
let tenant_id = zttid.tenant_id.to_string();
|
||||
let timeline_id = zttid.timeline_id.to_string();
|
||||
|
||||
let mut store = FileStorage {
|
||||
let store = FileStorage {
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS
|
||||
.with_label_values(&[&tenant_id, &timeline_id]),
|
||||
state: state.clone(),
|
||||
state,
|
||||
};
|
||||
|
||||
store.persist(&state)?;
|
||||
Ok(store)
|
||||
}
|
||||
|
||||
// Check the magic/version in the on-disk data and deserialize it, if possible.
|
||||
/// Check the magic/version in the on-disk data and deserialize it, if possible.
|
||||
fn deser_sk_state(buf: &mut &[u8]) -> Result<SafeKeeperState> {
|
||||
// Read the version independent part
|
||||
let magic = buf.read_u32::<LittleEndian>()?;
|
||||
@@ -112,7 +111,7 @@ impl FileStorage {
|
||||
upgrade_control_file(buf, version)
|
||||
}
|
||||
|
||||
// Load control file for given zttid at path specified by conf.
|
||||
/// Load control file for given zttid at path specified by conf.
|
||||
pub fn load_control_file_conf(
|
||||
conf: &SafeKeeperConf,
|
||||
zttid: &ZTenantTimelineId,
|
||||
@@ -122,13 +121,7 @@ impl FileStorage {
|
||||
}
|
||||
|
||||
/// Read in the control file.
|
||||
/// If create=false and file doesn't exist, bails out.
|
||||
pub fn load_control_file<P: AsRef<Path>>(control_file_path: P) -> Result<SafeKeeperState> {
|
||||
info!(
|
||||
"loading control file {}",
|
||||
control_file_path.as_ref().display(),
|
||||
);
|
||||
|
||||
let mut control_file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
@@ -179,8 +172,8 @@ impl Deref for FileStorage {
|
||||
}
|
||||
|
||||
impl Storage for FileStorage {
|
||||
// persists state durably to underlying storage
|
||||
// for description see https://lwn.net/Articles/457667/
|
||||
/// persists state durably to underlying storage
|
||||
/// for description see https://lwn.net/Articles/457667/
|
||||
fn persist(&mut self, s: &SafeKeeperState) -> Result<()> {
|
||||
let _timer = &self.persist_control_file_seconds.start_timer();
|
||||
|
||||
|
||||
@@ -3,15 +3,15 @@
|
||||
|
||||
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
|
||||
use crate::receive_wal::ReceiveWalConn;
|
||||
use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage};
|
||||
|
||||
use crate::send_wal::ReplicationConn;
|
||||
use crate::timeline::{Timeline, TimelineTools};
|
||||
use crate::SafeKeeperConf;
|
||||
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
use anyhow::{bail, Context, Result};
|
||||
|
||||
use postgres_ffi::PG_TLI;
|
||||
use regex::Regex;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tracing::info;
|
||||
use utils::{
|
||||
lsn::Lsn,
|
||||
@@ -27,7 +27,7 @@ pub struct SafekeeperPostgresHandler {
|
||||
pub appname: Option<String>,
|
||||
pub ztenantid: Option<ZTenantId>,
|
||||
pub ztimelineid: Option<ZTimelineId>,
|
||||
pub timeline: Option<Arc<Timeline>>,
|
||||
pub zttid: ZTenantTimelineId,
|
||||
}
|
||||
|
||||
/// Parsed Postgres command.
|
||||
@@ -98,30 +98,19 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
|
||||
query_string, self.ztimelineid
|
||||
);
|
||||
|
||||
let create = !(matches!(cmd, SafekeeperPostgresCommand::StartReplication { .. })
|
||||
|| matches!(cmd, SafekeeperPostgresCommand::IdentifySystem));
|
||||
|
||||
let tenantid = self.ztenantid.context("tenantid is required")?;
|
||||
let timelineid = self.ztimelineid.context("timelineid is required")?;
|
||||
if self.timeline.is_none() {
|
||||
self.timeline.set(
|
||||
&self.conf,
|
||||
ZTenantTimelineId::new(tenantid, timelineid),
|
||||
create,
|
||||
)?;
|
||||
}
|
||||
self.zttid = ZTenantTimelineId::new(tenantid, timelineid);
|
||||
|
||||
match cmd {
|
||||
SafekeeperPostgresCommand::StartWalPush => ReceiveWalConn::new(pgb)
|
||||
.run(self)
|
||||
.context("failed to run ReceiveWalConn"),
|
||||
SafekeeperPostgresCommand::StartReplication { start_lsn } => ReplicationConn::new(pgb)
|
||||
.run(self, pgb, start_lsn)
|
||||
.context("failed to run ReplicationConn"),
|
||||
SafekeeperPostgresCommand::StartWalPush => ReceiveWalConn::new(pgb).run(self),
|
||||
SafekeeperPostgresCommand::StartReplication { start_lsn } => {
|
||||
ReplicationConn::new(pgb).run(self, pgb, start_lsn)
|
||||
}
|
||||
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb),
|
||||
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => handle_json_ctrl(self, pgb, cmd),
|
||||
}
|
||||
.context(format!("timeline {timelineid}"))?;
|
||||
.context(format!("Failed to process query for timeline {timelineid}"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -134,42 +123,26 @@ impl SafekeeperPostgresHandler {
|
||||
appname: None,
|
||||
ztenantid: None,
|
||||
ztimelineid: None,
|
||||
timeline: None,
|
||||
zttid: ZTenantTimelineId::empty(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Shortcut for calling `process_msg` in the timeline.
|
||||
pub fn process_safekeeper_msg(
|
||||
&self,
|
||||
msg: &ProposerAcceptorMessage,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
self.timeline
|
||||
.get()
|
||||
.process_msg(msg)
|
||||
.context("failed to process ProposerAcceptorMessage")
|
||||
}
|
||||
|
||||
///
|
||||
/// Handle IDENTIFY_SYSTEM replication command
|
||||
///
|
||||
fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> {
|
||||
let tli = GlobalTimelines::get(self.zttid)?;
|
||||
|
||||
let lsn = if self.is_walproposer_recovery() {
|
||||
// walproposer should get all local WAL until flush_lsn
|
||||
self.timeline.get().get_end_of_wal()
|
||||
tli.get_flush_lsn()
|
||||
} else {
|
||||
// other clients shouldn't get any uncommitted WAL
|
||||
self.timeline.get().get_state().0.commit_lsn
|
||||
tli.get_state().0.commit_lsn
|
||||
}
|
||||
.to_string();
|
||||
|
||||
let sysid = self
|
||||
.timeline
|
||||
.get()
|
||||
.get_state()
|
||||
.1
|
||||
.server
|
||||
.system_id
|
||||
.to_string();
|
||||
let sysid = tli.get_state().1.server.system_id.to_string();
|
||||
let lsn_bytes = lsn.as_bytes();
|
||||
let tli = PG_TLI.to_string();
|
||||
let tli_bytes = tli.as_bytes();
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use anyhow::anyhow;
|
||||
use hyper::{Body, Request, Response, StatusCode, Uri};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -9,7 +10,9 @@ use std::sync::Arc;
|
||||
|
||||
use crate::safekeeper::Term;
|
||||
use crate::safekeeper::TermHistory;
|
||||
use crate::timeline::{GlobalTimelines, TimelineDeleteForceResult};
|
||||
|
||||
use crate::timelines_global_map::TimelineDeleteForceResult;
|
||||
use crate::GlobalTimelines;
|
||||
use crate::SafeKeeperConf;
|
||||
use etcd_broker::subscription_value::SkTimelineInfo;
|
||||
use utils::{
|
||||
@@ -96,9 +99,9 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
|
||||
);
|
||||
check_permission(&request, Some(zttid.tenant_id))?;
|
||||
|
||||
let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
|
||||
let tli = GlobalTimelines::get(zttid)?;
|
||||
let (inmem, state) = tli.get_state();
|
||||
let flush_lsn = tli.get_end_of_wal();
|
||||
let flush_lsn = tli.get_flush_lsn();
|
||||
|
||||
let acc_state = AcceptorStateStatus {
|
||||
term: state.acceptor_state.term,
|
||||
@@ -130,19 +133,11 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
timeline_id: request_data.timeline_id,
|
||||
};
|
||||
check_permission(&request, Some(zttid.tenant_id))?;
|
||||
GlobalTimelines::create(get_conf(&request), zttid, request_data.peer_ids)
|
||||
.map_err(ApiError::from_err)?;
|
||||
|
||||
json_response(StatusCode::CREATED, ())
|
||||
Err(ApiError::from_err(anyhow!("not implemented")))
|
||||
}
|
||||
|
||||
/// Deactivates the timeline and removes its data directory.
|
||||
///
|
||||
/// It does not try to stop any processing of the timeline; there is no such code at the time of writing.
|
||||
/// However, it tries to check whether the timeline was active and report it to caller just in case.
|
||||
/// Note that this information is inaccurate:
|
||||
/// 1. There is a race condition between checking the timeline for activity and actual directory deletion.
|
||||
/// 2. At the time of writing Safekeeper rarely marks a timeline inactive. E.g. disconnecting the compute node does nothing.
|
||||
async fn timeline_delete_force_handler(
|
||||
mut request: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
@@ -152,12 +147,10 @@ async fn timeline_delete_force_handler(
|
||||
);
|
||||
check_permission(&request, Some(zttid.tenant_id))?;
|
||||
ensure_no_body(&mut request).await?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
GlobalTimelines::delete_force(get_conf(&request), &zttid)
|
||||
.await
|
||||
.map_err(ApiError::from_err)?,
|
||||
)
|
||||
let resp = tokio::task::spawn_blocking(move || GlobalTimelines::delete_force(&zttid))
|
||||
.await
|
||||
.map_err(ApiError::from_err)??;
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
|
||||
/// Deactivates all timelines for the tenant and removes its data directory.
|
||||
@@ -168,11 +161,14 @@ async fn tenant_delete_force_handler(
|
||||
let tenant_id = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
ensure_no_body(&mut request).await?;
|
||||
let delete_info = tokio::task::spawn_blocking(move || {
|
||||
GlobalTimelines::delete_force_all_for_tenant(&tenant_id)
|
||||
})
|
||||
.await
|
||||
.map_err(ApiError::from_err)??;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
GlobalTimelines::delete_force_all_for_tenant(get_conf(&request), &tenant_id)
|
||||
.await
|
||||
.map_err(ApiError::from_err)?
|
||||
delete_info
|
||||
.iter()
|
||||
.map(|(zttid, resp)| (format!("{}", zttid.timeline_id), *resp))
|
||||
.collect::<HashMap<String, TimelineDeleteForceResult>>(),
|
||||
@@ -188,7 +184,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
check_permission(&request, Some(zttid.tenant_id))?;
|
||||
let safekeeper_info: SkTimelineInfo = json_request(&mut request).await?;
|
||||
|
||||
let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?;
|
||||
let tli = GlobalTimelines::get(zttid)?;
|
||||
tli.record_safekeeper_info(&safekeeper_info, NodeId(1))
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -6,18 +6,22 @@
|
||||
//! modifications in tests.
|
||||
//!
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use bytes::Bytes;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tracing::*;
|
||||
use utils::zid::ZTenantTimelineId;
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse};
|
||||
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};
|
||||
use crate::safekeeper::{
|
||||
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting,
|
||||
AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected,
|
||||
};
|
||||
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
|
||||
use crate::timeline::TimelineTools;
|
||||
use crate::timeline::Timeline;
|
||||
use crate::GlobalTimelines;
|
||||
use postgres_ffi::v14::xlog_utils;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use utils::{
|
||||
@@ -57,23 +61,23 @@ struct AppendResult {
|
||||
/// content, and then append it with specified term and lsn. This
|
||||
/// function is used to test safekeepers in different scenarios.
|
||||
pub fn handle_json_ctrl(
|
||||
spg: &mut SafekeeperPostgresHandler,
|
||||
spg: &SafekeeperPostgresHandler,
|
||||
pgb: &mut PostgresBackend,
|
||||
append_request: &AppendLogicalMessage,
|
||||
) -> Result<()> {
|
||||
info!("JSON_CTRL request: {:?}", append_request);
|
||||
|
||||
// need to init safekeeper state before AppendRequest
|
||||
prepare_safekeeper(spg)?;
|
||||
let tli = prepare_safekeeper(spg.zttid)?;
|
||||
|
||||
// if send_proposer_elected is true, we need to update local history
|
||||
if append_request.send_proposer_elected {
|
||||
send_proposer_elected(spg, append_request.term, append_request.epoch_start_lsn)?;
|
||||
send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn)?;
|
||||
}
|
||||
|
||||
let inserted_wal = append_logical_message(spg, append_request)?;
|
||||
let inserted_wal = append_logical_message(&tli, append_request)?;
|
||||
let response = AppendResult {
|
||||
state: spg.timeline.get().get_state().1,
|
||||
state: tli.get_state().1,
|
||||
inserted_wal,
|
||||
};
|
||||
let response_data = serde_json::to_vec(&response)?;
|
||||
@@ -91,28 +95,20 @@ pub fn handle_json_ctrl(
|
||||
|
||||
/// Prepare safekeeper to process append requests without crashes,
|
||||
/// by sending ProposerGreeting with default server.wal_seg_size.
|
||||
fn prepare_safekeeper(spg: &mut SafekeeperPostgresHandler) -> Result<()> {
|
||||
let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting {
|
||||
protocol_version: 2, // current protocol
|
||||
pg_version: 0, // unknown
|
||||
proposer_id: [0u8; 16],
|
||||
system_id: 0,
|
||||
ztli: spg.ztimelineid.unwrap(),
|
||||
tenant_id: spg.ztenantid.unwrap(),
|
||||
tli: 0,
|
||||
wal_seg_size: WAL_SEGMENT_SIZE as u32, // 16MB, default for tests
|
||||
});
|
||||
|
||||
let response = spg.timeline.get().process_msg(&greeting_request)?;
|
||||
match response {
|
||||
Some(AcceptorProposerMessage::Greeting(_)) => Ok(()),
|
||||
_ => anyhow::bail!("not GreetingResponse"),
|
||||
}
|
||||
fn prepare_safekeeper(zttid: ZTenantTimelineId) -> Result<Arc<Timeline>> {
|
||||
GlobalTimelines::create(
|
||||
zttid,
|
||||
ServerInfo {
|
||||
pg_version: 0, // unknown
|
||||
wal_seg_size: WAL_SEGMENT_SIZE as u32,
|
||||
system_id: 0,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: Lsn) -> Result<()> {
|
||||
fn send_proposer_elected(tli: &Arc<Timeline>, term: Term, lsn: Lsn) -> Result<()> {
|
||||
// add new term to existing history
|
||||
let history = spg.timeline.get().get_state().1.acceptor_state.term_history;
|
||||
let history = tli.get_state().1.acceptor_state.term_history;
|
||||
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
|
||||
let mut history_entries = history.0;
|
||||
history_entries.push(TermSwitchEntry { term, lsn });
|
||||
@@ -125,7 +121,7 @@ fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: L
|
||||
timeline_start_lsn: lsn,
|
||||
});
|
||||
|
||||
spg.timeline.get().process_msg(&proposer_elected_request)?;
|
||||
tli.process_msg(&proposer_elected_request)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -138,12 +134,9 @@ struct InsertedWAL {
|
||||
|
||||
/// Extend local WAL with new LogicalMessage record. To do that,
|
||||
/// create AppendRequest with new WAL and pass it to safekeeper.
|
||||
fn append_logical_message(
|
||||
spg: &mut SafekeeperPostgresHandler,
|
||||
msg: &AppendLogicalMessage,
|
||||
) -> Result<InsertedWAL> {
|
||||
fn append_logical_message(tli: &Arc<Timeline>, msg: &AppendLogicalMessage) -> Result<InsertedWAL> {
|
||||
let wal_data = xlog_utils::encode_logical_message(&msg.lm_prefix, &msg.lm_message);
|
||||
let sk_state = spg.timeline.get().get_state().1;
|
||||
let sk_state = tli.get_state().1;
|
||||
|
||||
let begin_lsn = msg.begin_lsn;
|
||||
let end_lsn = begin_lsn + wal_data.len() as u64;
|
||||
@@ -167,7 +160,7 @@ fn append_logical_message(
|
||||
wal_data: Bytes::from(wal_data),
|
||||
});
|
||||
|
||||
let response = spg.timeline.get().process_msg(&append_request)?;
|
||||
let response = tli.process_msg(&append_request)?;
|
||||
|
||||
let append_response = match response {
|
||||
Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
|
||||
|
||||
@@ -23,6 +23,9 @@ pub mod wal_backup;
|
||||
pub mod wal_service;
|
||||
pub mod wal_storage;
|
||||
|
||||
mod timelines_global_map;
|
||||
pub use timelines_global_map::GlobalTimelines;
|
||||
|
||||
pub mod defaults {
|
||||
use const_format::formatcp;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -12,7 +12,8 @@ use utils::{lsn::Lsn, zid::ZTenantTimelineId};
|
||||
|
||||
use crate::{
|
||||
safekeeper::{SafeKeeperState, SafekeeperMemState},
|
||||
timeline::{GlobalTimelines, ReplicaState},
|
||||
timeline::ReplicaState,
|
||||
GlobalTimelines,
|
||||
};
|
||||
|
||||
pub struct FullTimelineInfo {
|
||||
@@ -235,9 +236,15 @@ impl Collector for TimelineCollector {
|
||||
self.disk_usage.reset();
|
||||
self.acceptor_term.reset();
|
||||
|
||||
let timelines = GlobalTimelines::active_timelines_metrics();
|
||||
let timelines = GlobalTimelines::get_all();
|
||||
|
||||
for arc_tli in timelines {
|
||||
let tli = arc_tli.info_for_metrics();
|
||||
if tli.is_none() {
|
||||
continue;
|
||||
}
|
||||
let tli = tli.unwrap();
|
||||
|
||||
for tli in timelines {
|
||||
let tenant_id = tli.zttid.tenant_id.to_string();
|
||||
let timeline_id = tli.zttid.timeline_id.to_string();
|
||||
let labels = &[tenant_id.as_str(), timeline_id.as_str()];
|
||||
|
||||
@@ -7,7 +7,9 @@ use anyhow::{anyhow, bail, Result};
|
||||
use bytes::BytesMut;
|
||||
use tracing::*;
|
||||
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::timeline::Timeline;
|
||||
use crate::GlobalTimelines;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::mpsc::channel;
|
||||
@@ -20,7 +22,6 @@ use crate::safekeeper::AcceptorProposerMessage;
|
||||
use crate::safekeeper::ProposerAcceptorMessage;
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::timeline::TimelineTools;
|
||||
use utils::{
|
||||
postgres_backend::PostgresBackend,
|
||||
pq_proto::{BeMessage, FeMessage},
|
||||
@@ -67,15 +68,21 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
|
||||
// Receive information about server
|
||||
let next_msg = poll_reader.recv_msg()?;
|
||||
match next_msg {
|
||||
let tli = match next_msg {
|
||||
ProposerAcceptorMessage::Greeting(ref greeting) => {
|
||||
info!(
|
||||
"start handshake with wal proposer {} sysid {} timeline {}",
|
||||
self.peer_addr, greeting.system_id, greeting.tli,
|
||||
);
|
||||
let server_info = ServerInfo {
|
||||
pg_version: greeting.pg_version,
|
||||
system_id: greeting.system_id,
|
||||
wal_seg_size: greeting.wal_seg_size,
|
||||
};
|
||||
GlobalTimelines::create(spg.zttid, server_info)?
|
||||
}
|
||||
_ => bail!("unexpected message {:?} instead of greeting", next_msg),
|
||||
}
|
||||
};
|
||||
|
||||
let mut next_msg = Some(next_msg);
|
||||
|
||||
@@ -88,7 +95,7 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
while let Some(ProposerAcceptorMessage::AppendRequest(append_request)) = next_msg {
|
||||
let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
|
||||
let reply = spg.process_safekeeper_msg(&msg)?;
|
||||
let reply = tli.process_msg(&msg)?;
|
||||
if let Some(reply) = reply {
|
||||
self.write_msg(&reply)?;
|
||||
}
|
||||
@@ -97,13 +104,13 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
}
|
||||
|
||||
// flush all written WAL to the disk
|
||||
let reply = spg.process_safekeeper_msg(&ProposerAcceptorMessage::FlushWAL)?;
|
||||
let reply = tli.process_msg(&ProposerAcceptorMessage::FlushWAL)?;
|
||||
if let Some(reply) = reply {
|
||||
self.write_msg(&reply)?;
|
||||
}
|
||||
} else if let Some(msg) = next_msg.take() {
|
||||
// process other message
|
||||
let reply = spg.process_safekeeper_msg(&msg)?;
|
||||
let reply = tli.process_msg(&msg)?;
|
||||
if let Some(reply) = reply {
|
||||
self.write_msg(&reply)?;
|
||||
}
|
||||
@@ -112,9 +119,9 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
// Register the connection and defer unregister. Do that only
|
||||
// after processing first message, as it sets wal_seg_size,
|
||||
// wanted by many.
|
||||
spg.timeline.get().on_compute_connect()?;
|
||||
tli.on_compute_connect()?;
|
||||
_guard = Some(ComputeConnectionGuard {
|
||||
timeline: Arc::clone(spg.timeline.get()),
|
||||
timeline: Arc::clone(&tli),
|
||||
});
|
||||
first_time_through = false;
|
||||
}
|
||||
@@ -190,6 +197,8 @@ struct ComputeConnectionGuard {
|
||||
|
||||
impl Drop for ComputeConnectionGuard {
|
||||
fn drop(&mut self) {
|
||||
self.timeline.on_compute_disconnect().unwrap();
|
||||
if let Err(e) = self.timeline.on_compute_disconnect() {
|
||||
error!("failed to unregister compute connection: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,20 +4,21 @@ use std::{thread, time::Duration};
|
||||
|
||||
use tracing::*;
|
||||
|
||||
use crate::{timeline::GlobalTimelines, SafeKeeperConf};
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
|
||||
pub fn thread_main(conf: SafeKeeperConf) {
|
||||
let wal_removal_interval = Duration::from_millis(5000);
|
||||
loop {
|
||||
let active_tlis = GlobalTimelines::get_active_timelines();
|
||||
for zttid in &active_tlis {
|
||||
if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) {
|
||||
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) {
|
||||
warn!(
|
||||
"failed to remove WAL for tenant {} timeline {}: {}",
|
||||
tli.zttid.tenant_id, tli.zttid.timeline_id, e
|
||||
);
|
||||
}
|
||||
let tlis = GlobalTimelines::get_all();
|
||||
for tli in &tlis {
|
||||
if !tli.is_active() {
|
||||
continue;
|
||||
}
|
||||
let zttid = tli.zttid;
|
||||
let _enter =
|
||||
info_span!("", tenant = %zttid.tenant_id, timeline = %zttid.timeline_id).entered();
|
||||
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) {
|
||||
warn!("failed to remove WAL: {}", e);
|
||||
}
|
||||
}
|
||||
thread::sleep(wal_removal_interval)
|
||||
|
||||
@@ -219,7 +219,11 @@ pub struct SafekeeperMemState {
|
||||
}
|
||||
|
||||
impl SafeKeeperState {
|
||||
pub fn new(zttid: &ZTenantTimelineId, peers: Vec<NodeId>) -> SafeKeeperState {
|
||||
pub fn new(
|
||||
zttid: &ZTenantTimelineId,
|
||||
server_info: ServerInfo,
|
||||
peers: Vec<NodeId>,
|
||||
) -> SafeKeeperState {
|
||||
SafeKeeperState {
|
||||
tenant_id: zttid.tenant_id,
|
||||
timeline_id: zttid.timeline_id,
|
||||
@@ -227,11 +231,7 @@ impl SafeKeeperState {
|
||||
term: 0,
|
||||
term_history: TermHistory::empty(),
|
||||
},
|
||||
server: ServerInfo {
|
||||
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
|
||||
system_id: 0, /* Postgres system identifier */
|
||||
wal_seg_size: 0,
|
||||
},
|
||||
server: server_info,
|
||||
proposer_uuid: [0; 16],
|
||||
timeline_start_lsn: Lsn(0),
|
||||
local_start_lsn: Lsn(0),
|
||||
@@ -245,7 +245,15 @@ impl SafeKeeperState {
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn empty() -> Self {
|
||||
SafeKeeperState::new(&ZTenantTimelineId::empty(), vec![])
|
||||
SafeKeeperState::new(
|
||||
&ZTenantTimelineId::empty(),
|
||||
ServerInfo {
|
||||
pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */
|
||||
system_id: 0, /* Postgres system identifier */
|
||||
wal_seg_size: 0,
|
||||
},
|
||||
vec![],
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -481,8 +489,12 @@ impl AcceptorProposerMessage {
|
||||
}
|
||||
}
|
||||
|
||||
/// SafeKeeper which consumes events (messages from compute) and provides
|
||||
/// replies.
|
||||
/// Safekeeper implements consensus to reliably persist WAL across nodes.
|
||||
/// It controls all WAL disk writes and updates of control file.
|
||||
///
|
||||
/// Currently safekeeper processes:
|
||||
/// - messages from compute (proposers) and provides replies
|
||||
/// - messages from etcd peers
|
||||
pub struct SafeKeeper<CTRL: control_file::Storage, WAL: wal_storage::Storage> {
|
||||
/// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn.
|
||||
/// Note: be careful to set only if we are sure our WAL (term history) matches
|
||||
@@ -505,20 +517,20 @@ where
|
||||
CTRL: control_file::Storage,
|
||||
WAL: wal_storage::Storage,
|
||||
{
|
||||
// constructor
|
||||
pub fn new(
|
||||
ztli: ZTimelineId,
|
||||
state: CTRL,
|
||||
mut wal_store: WAL,
|
||||
node_id: NodeId,
|
||||
) -> Result<SafeKeeper<CTRL, WAL>> {
|
||||
if state.timeline_id != ZTimelineId::from([0u8; 16]) && ztli != state.timeline_id {
|
||||
bail!("Calling SafeKeeper::new with inconsistent ztli ({}) and SafeKeeperState.server.timeline_id ({})", ztli, state.timeline_id);
|
||||
/// Accepts a control file storage containing the safekeeper state.
|
||||
/// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id`
|
||||
/// and `server` (`wal_seg_size` inside it) fields.
|
||||
pub fn new(state: CTRL, wal_store: WAL, node_id: NodeId) -> Result<SafeKeeper<CTRL, WAL>> {
|
||||
if state.tenant_id == ZTenantId::from([0u8; 16])
|
||||
|| state.timeline_id == ZTimelineId::from([0u8; 16])
|
||||
{
|
||||
bail!(
|
||||
"Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})",
|
||||
state.tenant_id,
|
||||
state.timeline_id
|
||||
);
|
||||
}
|
||||
|
||||
// initialize wal_store, if state is already initialized
|
||||
wal_store.init_storage(&state)?;
|
||||
|
||||
Ok(SafeKeeper {
|
||||
global_commit_lsn: state.commit_lsn,
|
||||
epoch_start_lsn: Lsn(0),
|
||||
@@ -576,7 +588,7 @@ where
|
||||
&mut self,
|
||||
msg: &ProposerGreeting,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
/* Check protocol compatibility */
|
||||
// Check protocol compatibility
|
||||
if msg.protocol_version != SK_PROTOCOL_VERSION {
|
||||
bail!(
|
||||
"incompatible protocol version {}, expected {}",
|
||||
@@ -584,11 +596,11 @@ where
|
||||
SK_PROTOCOL_VERSION
|
||||
);
|
||||
}
|
||||
/* Postgres upgrade is not treated as fatal error */
|
||||
// Postgres upgrade is not treated as fatal error
|
||||
if msg.pg_version != self.state.server.pg_version
|
||||
&& self.state.server.pg_version != UNKNOWN_SERVER_VERSION
|
||||
{
|
||||
info!(
|
||||
warn!(
|
||||
"incompatible server version {}, expected {}",
|
||||
msg.pg_version, self.state.server.pg_version
|
||||
);
|
||||
@@ -607,17 +619,25 @@ where
|
||||
self.state.timeline_id
|
||||
);
|
||||
}
|
||||
|
||||
// set basic info about server, if not yet
|
||||
// TODO: verify that is doesn't change after
|
||||
{
|
||||
let mut state = self.state.clone();
|
||||
state.server.system_id = msg.system_id;
|
||||
state.server.wal_seg_size = msg.wal_seg_size;
|
||||
self.state.persist(&state)?;
|
||||
if self.state.server.wal_seg_size != msg.wal_seg_size {
|
||||
bail!(
|
||||
"invalid wal_seg_size, got {}, expected {}",
|
||||
msg.wal_seg_size,
|
||||
self.state.server.wal_seg_size
|
||||
);
|
||||
}
|
||||
|
||||
self.wal_store.init_storage(&self.state)?;
|
||||
// system_id will be updated on mismatch
|
||||
if self.state.server.system_id != msg.system_id {
|
||||
warn!(
|
||||
"unexpected system ID arrived, got {}, expected {}",
|
||||
msg.system_id, self.state.server.system_id
|
||||
);
|
||||
|
||||
let mut state = self.state.clone();
|
||||
state.server.system_id = msg.system_id;
|
||||
self.state.persist(&state)?;
|
||||
}
|
||||
|
||||
info!(
|
||||
"processed greeting from proposer {:?}, sending term {:?}",
|
||||
@@ -667,16 +687,6 @@ where
|
||||
Ok(Some(AcceptorProposerMessage::VoteResponse(resp)))
|
||||
}
|
||||
|
||||
/// Bump our term if received a note from elected proposer with higher one
|
||||
fn bump_if_higher(&mut self, term: Term) -> Result<()> {
|
||||
if self.state.acceptor_state.term < term {
|
||||
let mut state = self.state.clone();
|
||||
state.acceptor_state.term = term;
|
||||
self.state.persist(&state)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Form AppendResponse from current state.
|
||||
fn append_response(&self) -> AppendResponse {
|
||||
let ar = AppendResponse {
|
||||
@@ -693,7 +703,12 @@ where
|
||||
|
||||
fn handle_elected(&mut self, msg: &ProposerElected) -> Result<Option<AcceptorProposerMessage>> {
|
||||
info!("received ProposerElected {:?}", msg);
|
||||
self.bump_if_higher(msg.term)?;
|
||||
if self.state.acceptor_state.term < msg.term {
|
||||
let mut state = self.state.clone();
|
||||
state.acceptor_state.term = msg.term;
|
||||
self.state.persist(&state)?;
|
||||
}
|
||||
|
||||
// If our term is higher, ignore the message (next feedback will inform the compute)
|
||||
if self.state.acceptor_state.term > msg.term {
|
||||
return Ok(None);
|
||||
@@ -750,7 +765,7 @@ where
|
||||
}
|
||||
|
||||
/// Advance commit_lsn taking into account what we have locally
|
||||
pub fn update_commit_lsn(&mut self) -> Result<()> {
|
||||
fn update_commit_lsn(&mut self) -> Result<()> {
|
||||
let commit_lsn = min(self.global_commit_lsn, self.flush_lsn());
|
||||
assert!(commit_lsn >= self.inmem.commit_lsn);
|
||||
|
||||
@@ -770,6 +785,11 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Persist control file to disk, called only after timeline creation (bootstrap).
|
||||
pub fn persist(&mut self) -> Result<()> {
|
||||
self.persist_control_file(self.state.clone())
|
||||
}
|
||||
|
||||
/// Persist in-memory state to the disk, taking other data from state.
|
||||
fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> {
|
||||
state.commit_lsn = self.inmem.commit_lsn;
|
||||
@@ -920,6 +940,8 @@ where
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
|
||||
use super::*;
|
||||
use crate::wal_storage::Storage;
|
||||
use std::ops::Deref;
|
||||
@@ -944,6 +966,14 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn test_sk_state() -> SafeKeeperState {
|
||||
let mut state = SafeKeeperState::empty();
|
||||
state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32;
|
||||
state.tenant_id = ZTenantId::from([1u8; 16]);
|
||||
state.timeline_id = ZTimelineId::from([1u8; 16]);
|
||||
state
|
||||
}
|
||||
|
||||
struct DummyWalStore {
|
||||
lsn: Lsn,
|
||||
}
|
||||
@@ -953,10 +983,6 @@ mod tests {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
fn init_storage(&mut self, _state: &SafeKeeperState) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
self.lsn = startpos + buf.len() as u64;
|
||||
Ok(())
|
||||
@@ -979,12 +1005,10 @@ mod tests {
|
||||
#[test]
|
||||
fn test_voting() {
|
||||
let storage = InMemoryState {
|
||||
persisted_state: SafeKeeperState::empty(),
|
||||
persisted_state: test_sk_state(),
|
||||
};
|
||||
let wal_store = DummyWalStore { lsn: Lsn(0) };
|
||||
let ztli = ZTimelineId::from([0u8; 16]);
|
||||
|
||||
let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap();
|
||||
let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
|
||||
|
||||
// check voting for 1 is ok
|
||||
let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 });
|
||||
@@ -1000,7 +1024,7 @@ mod tests {
|
||||
persisted_state: state,
|
||||
};
|
||||
|
||||
sk = SafeKeeper::new(ztli, storage, sk.wal_store, NodeId(0)).unwrap();
|
||||
sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap();
|
||||
|
||||
// and ensure voting second time for 1 is not ok
|
||||
vote_resp = sk.process_msg(&vote_request);
|
||||
@@ -1013,12 +1037,11 @@ mod tests {
|
||||
#[test]
|
||||
fn test_epoch_switch() {
|
||||
let storage = InMemoryState {
|
||||
persisted_state: SafeKeeperState::empty(),
|
||||
persisted_state: test_sk_state(),
|
||||
};
|
||||
let wal_store = DummyWalStore { lsn: Lsn(0) };
|
||||
let ztli = ZTimelineId::from([0u8; 16]);
|
||||
|
||||
let mut sk = SafeKeeper::new(ztli, storage, wal_store, NodeId(0)).unwrap();
|
||||
let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap();
|
||||
|
||||
let mut ar_hdr = AppendRequestHeader {
|
||||
term: 1,
|
||||
|
||||
@@ -2,8 +2,9 @@
|
||||
//! with the "START_REPLICATION" message.
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::timeline::{ReplicaState, Timeline, TimelineTools};
|
||||
use crate::timeline::{ReplicaState, Timeline};
|
||||
use crate::wal_storage::WalReader;
|
||||
use crate::GlobalTimelines;
|
||||
use anyhow::{bail, Context, Result};
|
||||
|
||||
use bytes::Bytes;
|
||||
@@ -167,8 +168,10 @@ impl ReplicationConn {
|
||||
) -> Result<()> {
|
||||
let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap()).entered();
|
||||
|
||||
let tli = GlobalTimelines::get(spg.zttid)?;
|
||||
|
||||
// spawn the background thread which receives HotStandbyFeedback messages.
|
||||
let bg_timeline = Arc::clone(spg.timeline.get());
|
||||
let bg_timeline = Arc::clone(&tli);
|
||||
let bg_stream_in = self.stream_in.take().unwrap();
|
||||
let bg_timeline_id = spg.ztimelineid.unwrap();
|
||||
|
||||
@@ -201,11 +204,8 @@ impl ReplicationConn {
|
||||
.build()?;
|
||||
|
||||
runtime.block_on(async move {
|
||||
let (inmem_state, persisted_state) = spg.timeline.get().get_state();
|
||||
let (inmem_state, persisted_state) = tli.get_state();
|
||||
// add persisted_state.timeline_start_lsn == Lsn(0) check
|
||||
if persisted_state.server.wal_seg_size == 0 {
|
||||
bail!("Cannot start replication before connecting to walproposer");
|
||||
}
|
||||
|
||||
// Walproposer gets special handling: safekeeper must give proposer all
|
||||
// local WAL till the end, whether committed or not (walproposer will
|
||||
@@ -217,7 +217,7 @@ impl ReplicationConn {
|
||||
// on this safekeeper itself. That's ok as (old) proposer will never be
|
||||
// able to commit such WAL.
|
||||
let stop_pos: Option<Lsn> = if spg.is_walproposer_recovery() {
|
||||
let wal_end = spg.timeline.get().get_end_of_wal();
|
||||
let wal_end = tli.get_flush_lsn();
|
||||
Some(wal_end)
|
||||
} else {
|
||||
None
|
||||
@@ -231,7 +231,7 @@ impl ReplicationConn {
|
||||
let mut end_pos = stop_pos.unwrap_or(inmem_state.commit_lsn);
|
||||
|
||||
let mut wal_reader = WalReader::new(
|
||||
spg.conf.timeline_dir(&spg.timeline.get().zttid),
|
||||
spg.conf.timeline_dir(&tli.zttid),
|
||||
&persisted_state,
|
||||
start_pos,
|
||||
spg.conf.wal_backup_enabled,
|
||||
@@ -241,7 +241,7 @@ impl ReplicationConn {
|
||||
let mut send_buf = vec![0u8; MAX_SEND_SIZE];
|
||||
|
||||
// watcher for commit_lsn updates
|
||||
let mut commit_lsn_watch_rx = spg.timeline.get().get_commit_lsn_watch_rx();
|
||||
let mut commit_lsn_watch_rx = tli.get_commit_lsn_watch_rx();
|
||||
|
||||
loop {
|
||||
if let Some(stop_pos) = stop_pos {
|
||||
@@ -258,7 +258,7 @@ impl ReplicationConn {
|
||||
} else {
|
||||
// TODO: also check once in a while whether we are walsender
|
||||
// to right pageserver.
|
||||
if spg.timeline.get().stop_walsender(replica_id)? {
|
||||
if tli.should_walsender_stop(replica_id) {
|
||||
// Shut down, timeline is suspended.
|
||||
// TODO create proper error type for this
|
||||
bail!("end streaming to {:?}", spg.appname);
|
||||
|
||||
@@ -1,21 +1,19 @@
|
||||
//! This module contains timeline id -> safekeeper state map with file-backed
|
||||
//! persistence and support for interaction between sending and receiving wal.
|
||||
//! This module implements Timeline lifecycle management and has all neccessary code
|
||||
//! to glue together SafeKeeper and all other background services.
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use anyhow::{bail, Result};
|
||||
|
||||
use etcd_broker::subscription_value::SkTimelineInfo;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres_ffi::XLogSegNo;
|
||||
|
||||
use serde::Serialize;
|
||||
use tokio::sync::watch;
|
||||
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs::{self};
|
||||
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use parking_lot::{Mutex, MutexGuard};
|
||||
|
||||
use std::path::PathBuf;
|
||||
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing::*;
|
||||
@@ -23,13 +21,13 @@ use tracing::*;
|
||||
use utils::{
|
||||
lsn::Lsn,
|
||||
pq_proto::ReplicationFeedback,
|
||||
zid::{NodeId, ZTenantId, ZTenantTimelineId},
|
||||
zid::{NodeId, ZTenantTimelineId},
|
||||
};
|
||||
|
||||
use crate::control_file;
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
|
||||
SafekeeperMemState,
|
||||
SafekeeperMemState, ServerInfo,
|
||||
};
|
||||
use crate::send_wal::HotStandbyFeedback;
|
||||
|
||||
@@ -73,7 +71,7 @@ impl ReplicaState {
|
||||
}
|
||||
|
||||
/// Shared state associated with database instance
|
||||
struct SharedState {
|
||||
pub struct SharedState {
|
||||
/// Safekeeper object
|
||||
sk: SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>,
|
||||
/// State of replicas
|
||||
@@ -95,17 +93,21 @@ struct SharedState {
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
/// Initialize timeline state, creating control file
|
||||
fn create(
|
||||
/// Initialize fresh timeline state without persisting anything to disk.
|
||||
fn create_new(
|
||||
conf: &SafeKeeperConf,
|
||||
zttid: &ZTenantTimelineId,
|
||||
peer_ids: Vec<NodeId>,
|
||||
state: SafeKeeperState,
|
||||
) -> Result<Self> {
|
||||
let state = SafeKeeperState::new(zttid, peer_ids);
|
||||
let control_store = control_file::FileStorage::create_new(zttid, conf, state)?;
|
||||
if state.server.wal_seg_size == 0 {
|
||||
bail!(TimelineError::UninitializedWalSegSize(*zttid));
|
||||
}
|
||||
|
||||
let wal_store = wal_storage::PhysicalStorage::new(zttid, conf);
|
||||
let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?;
|
||||
// We don't want to write anything to disk, because we may have existing timeline there.
|
||||
// These functions should not change anything on disk.
|
||||
let control_store = control_file::FileStorage::create_new(zttid, conf, state)?;
|
||||
let wal_store = wal_storage::PhysicalStorage::new(zttid, conf, &control_store)?;
|
||||
let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?;
|
||||
|
||||
Ok(Self {
|
||||
sk,
|
||||
@@ -117,16 +119,17 @@ impl SharedState {
|
||||
})
|
||||
}
|
||||
|
||||
/// Restore SharedState from control file.
|
||||
/// If file doesn't exist, bails out.
|
||||
/// Restore SharedState from control file. If file doesn't exist, bails out.
|
||||
fn restore(conf: &SafeKeeperConf, zttid: &ZTenantTimelineId) -> Result<Self> {
|
||||
let control_store = control_file::FileStorage::restore_new(zttid, conf)?;
|
||||
let wal_store = wal_storage::PhysicalStorage::new(zttid, conf);
|
||||
if control_store.server.wal_seg_size == 0 {
|
||||
bail!(TimelineError::UninitializedWalSegSize(*zttid));
|
||||
}
|
||||
|
||||
info!("timeline {} restored", zttid.timeline_id);
|
||||
let wal_store = wal_storage::PhysicalStorage::new(zttid, conf, &control_store)?;
|
||||
|
||||
Ok(Self {
|
||||
sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?,
|
||||
sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?,
|
||||
replicas: Vec::new(),
|
||||
wal_backup_active: false,
|
||||
active: false,
|
||||
@@ -134,6 +137,7 @@ impl SharedState {
|
||||
last_removed_segno: 0,
|
||||
})
|
||||
}
|
||||
|
||||
fn is_active(&self) -> bool {
|
||||
self.is_wal_backup_required()
|
||||
// FIXME: add tracking of relevant pageservers and check them here individually,
|
||||
@@ -254,48 +258,189 @@ impl SharedState {
|
||||
}
|
||||
}
|
||||
|
||||
/// Database instance (tenant)
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum TimelineError {
|
||||
#[error("Timeline {0} was cancelled and cannot be used anymore")]
|
||||
Cancelled(ZTenantTimelineId),
|
||||
#[error("Timeline {0} was not found in global map")]
|
||||
NotFound(ZTenantTimelineId),
|
||||
#[error("Timeline {0} exists on disk, but wasn't loaded on startup")]
|
||||
Invalid(ZTenantTimelineId),
|
||||
#[error("Timeline {0} is already exists")]
|
||||
AlreadyExists(ZTenantTimelineId),
|
||||
#[error("Timeline {0} is not initialized, wal_seg_size is zero")]
|
||||
UninitializedWalSegSize(ZTenantTimelineId),
|
||||
}
|
||||
|
||||
/// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.
|
||||
/// It also holds SharedState and provides mutually exclusive access to it.
|
||||
pub struct Timeline {
|
||||
pub zttid: ZTenantTimelineId,
|
||||
|
||||
/// Sending here asks for wal backup launcher attention (start/stop
|
||||
/// offloading). Sending zttid instead of concrete command allows to do
|
||||
/// sending without timeline lock.
|
||||
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
|
||||
|
||||
/// Used to broadcast commit_lsn updates to all background jobs.
|
||||
commit_lsn_watch_tx: watch::Sender<Lsn>,
|
||||
/// For breeding receivers.
|
||||
commit_lsn_watch_rx: watch::Receiver<Lsn>,
|
||||
|
||||
/// Safekeeper and other state, that should remain consistent and synchronized
|
||||
/// with the disk.
|
||||
mutex: Mutex<SharedState>,
|
||||
|
||||
/// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal.
|
||||
cancellation_tx: watch::Sender<bool>,
|
||||
|
||||
/// Timeline should not be used after cancellation. Background tasks should
|
||||
/// monitor this channel and stop eventually after receiving `true` from this channel.
|
||||
cancellation_rx: watch::Receiver<bool>,
|
||||
|
||||
/// Directory where timeline state is stored.
|
||||
timeline_dir: PathBuf,
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
fn new(
|
||||
/// Load existing timeline from disk.
|
||||
pub fn load_timeline(
|
||||
conf: SafeKeeperConf,
|
||||
zttid: ZTenantTimelineId,
|
||||
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
|
||||
shared_state: SharedState,
|
||||
) -> Timeline {
|
||||
) -> Result<Timeline> {
|
||||
let shared_state = SharedState::restore(&conf, &zttid)?;
|
||||
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
|
||||
watch::channel(shared_state.sk.inmem.commit_lsn);
|
||||
Timeline {
|
||||
watch::channel(shared_state.sk.state.commit_lsn);
|
||||
let (cancellation_tx, cancellation_rx) = watch::channel(false);
|
||||
|
||||
Ok(Timeline {
|
||||
zttid,
|
||||
wal_backup_launcher_tx,
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
mutex: Mutex::new(shared_state),
|
||||
cancellation_rx,
|
||||
cancellation_tx,
|
||||
timeline_dir: conf.timeline_dir(&zttid),
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new timeline, which is not yet persisted to disk.
|
||||
pub fn create_empty(
|
||||
conf: SafeKeeperConf,
|
||||
zttid: ZTenantTimelineId,
|
||||
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
|
||||
server_info: ServerInfo,
|
||||
) -> Result<Timeline> {
|
||||
let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID);
|
||||
let (cancellation_tx, cancellation_rx) = watch::channel(false);
|
||||
let state = SafeKeeperState::new(&zttid, server_info, vec![]);
|
||||
|
||||
Ok(Timeline {
|
||||
zttid,
|
||||
wal_backup_launcher_tx,
|
||||
commit_lsn_watch_tx,
|
||||
commit_lsn_watch_rx,
|
||||
mutex: Mutex::new(SharedState::create_new(&conf, &zttid, state)?),
|
||||
cancellation_rx,
|
||||
cancellation_tx,
|
||||
timeline_dir: conf.timeline_dir(&zttid),
|
||||
})
|
||||
}
|
||||
|
||||
/// Initialize fresh timeline on disk and start background tasks. If bootstrap
|
||||
/// fails, timeline is cancelled and cannot be used anymore.
|
||||
///
|
||||
/// Bootstrap is transactional, so if it fails, created files will be deleted,
|
||||
/// and state on disk should remain unchanged.
|
||||
pub fn bootstrap(&self, shared_state: &mut MutexGuard<SharedState>) -> Result<()> {
|
||||
match std::fs::metadata(&self.timeline_dir) {
|
||||
Ok(_) => {
|
||||
// Timeline directory exists on disk, we should leave state unchanged
|
||||
// and return error.
|
||||
bail!(TimelineError::Invalid(self.zttid));
|
||||
}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(e) => {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
|
||||
// Create timeline directory.
|
||||
std::fs::create_dir_all(&self.timeline_dir)?;
|
||||
|
||||
// Write timeline to disk and TODO: start background tasks.
|
||||
match || -> Result<()> {
|
||||
shared_state.sk.persist()?;
|
||||
// TODO: add more initialization steps here
|
||||
Ok(())
|
||||
}() {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
// Bootstrap failed, cancel timeline and remove timeline directory.
|
||||
self.cancel();
|
||||
|
||||
if let Err(fs_err) = std::fs::remove_dir_all(&self.timeline_dir) {
|
||||
warn!(
|
||||
"failed to remove timeline {} directory after bootstrap failure: {}",
|
||||
self.zttid, fs_err
|
||||
);
|
||||
}
|
||||
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete timeline from disk completely, by removing timeline directory. Background
|
||||
/// timeline activities will stop eventually.
|
||||
pub fn delete_from_disk(
|
||||
&self,
|
||||
shared_state: &mut MutexGuard<SharedState>,
|
||||
) -> Result<(bool, bool)> {
|
||||
let was_active = shared_state.active;
|
||||
self.cancel();
|
||||
let dir_existed = delete_dir(&self.timeline_dir)?;
|
||||
Ok((dir_existed, was_active))
|
||||
}
|
||||
|
||||
/// Cancel timeline to prevent further usage. Background tasks will stop
|
||||
/// eventually after receiving cancellation signal.
|
||||
fn cancel(&self) {
|
||||
info!("Timeline {} is cancelled", self.zttid);
|
||||
let _ = self.cancellation_tx.send(true);
|
||||
let res = self.wal_backup_launcher_tx.blocking_send(self.zttid);
|
||||
if let Err(e) = res {
|
||||
error!("Failed to send stop signal to wal_backup_launcher: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns if timeline is cancelled.
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
*self.cancellation_rx.borrow()
|
||||
}
|
||||
|
||||
/// Take a writing mutual exclusive lock on timeline shared_state.
|
||||
pub fn write_shared_state(&self) -> MutexGuard<SharedState> {
|
||||
self.mutex.lock()
|
||||
}
|
||||
|
||||
/// Register compute connection, starting timeline-related activity if it is
|
||||
/// not running yet.
|
||||
/// Can fail only if channel to a static thread got closed, which is not normal at all.
|
||||
pub fn on_compute_connect(&self) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.zttid));
|
||||
}
|
||||
|
||||
let is_wal_backup_action_pending: bool;
|
||||
{
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
let mut shared_state = self.write_shared_state();
|
||||
shared_state.num_computes += 1;
|
||||
is_wal_backup_action_pending = shared_state.update_status(self.zttid);
|
||||
}
|
||||
// Wake up wal backup launcher, if offloading not started yet.
|
||||
if is_wal_backup_action_pending {
|
||||
// Can fail only if channel to a static thread got closed, which is not normal at all.
|
||||
self.wal_backup_launcher_tx.blocking_send(self.zttid)?;
|
||||
}
|
||||
Ok(())
|
||||
@@ -303,25 +448,33 @@ impl Timeline {
|
||||
|
||||
/// De-register compute connection, shutting down timeline activity if
|
||||
/// pageserver doesn't need catchup.
|
||||
/// Can fail only if channel to a static thread got closed, which is not normal at all.
|
||||
pub fn on_compute_disconnect(&self) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.zttid));
|
||||
}
|
||||
|
||||
let is_wal_backup_action_pending: bool;
|
||||
{
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
let mut shared_state = self.write_shared_state();
|
||||
shared_state.num_computes -= 1;
|
||||
is_wal_backup_action_pending = shared_state.update_status(self.zttid);
|
||||
}
|
||||
// Wake up wal backup launcher, if it is time to stop the offloading.
|
||||
if is_wal_backup_action_pending {
|
||||
// Can fail only if channel to a static thread got closed, which is not normal at all.
|
||||
self.wal_backup_launcher_tx.blocking_send(self.zttid)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Whether we still need this walsender running?
|
||||
/// Returns true if walsender should stop sending WAL to pageserver.
|
||||
/// TODO: check this pageserver is actually interested in this timeline.
|
||||
pub fn stop_walsender(&self, replica_id: usize) -> Result<bool> {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
pub fn should_walsender_stop(&self, replica_id: usize) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return true;
|
||||
}
|
||||
|
||||
let mut shared_state = self.write_shared_state();
|
||||
if shared_state.num_computes == 0 {
|
||||
let replica_state = shared_state.replicas[replica_id].unwrap();
|
||||
let stop = shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet
|
||||
@@ -329,73 +482,65 @@ impl Timeline {
|
||||
replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn);
|
||||
if stop {
|
||||
shared_state.update_status(self.zttid);
|
||||
return Ok(true);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns whether s3 offloading is required and sets current status as
|
||||
/// matching it.
|
||||
pub fn wal_backup_attend(&self) -> bool {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.wal_backup_attend()
|
||||
}
|
||||
|
||||
// Can this safekeeper offload to s3? Recently joined safekeepers might not
|
||||
// have necessary WAL.
|
||||
pub fn can_wal_backup(&self) -> bool {
|
||||
self.mutex.lock().unwrap().can_wal_backup()
|
||||
}
|
||||
|
||||
/// Deactivates the timeline, assuming it is being deleted.
|
||||
/// Returns whether the timeline was already active.
|
||||
///
|
||||
/// We assume all threads will stop by themselves eventually (possibly with errors, but no panics).
|
||||
/// There should be no compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but
|
||||
/// we're deleting the timeline anyway.
|
||||
pub async fn deactivate_for_delete(&self) -> Result<bool> {
|
||||
let was_active: bool;
|
||||
{
|
||||
let shared_state = self.mutex.lock().unwrap();
|
||||
was_active = shared_state.active;
|
||||
if self.is_cancelled() {
|
||||
return false;
|
||||
}
|
||||
self.wal_backup_launcher_tx.send(self.zttid).await?;
|
||||
Ok(was_active)
|
||||
|
||||
self.write_shared_state().wal_backup_attend()
|
||||
}
|
||||
|
||||
fn is_active(&self) -> bool {
|
||||
let shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.active
|
||||
/// Can this safekeeper offload to s3? Recently joined safekeepers might not
|
||||
/// have necessary WAL.
|
||||
pub fn can_wal_backup(&self) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let shared_state = self.write_shared_state();
|
||||
shared_state.can_wal_backup()
|
||||
}
|
||||
|
||||
/// Returns full timeline info, required for the metrics.
|
||||
/// If the timeline is not active, returns None instead.
|
||||
/// Returns full timeline info, required for the metrics. If the timeline is
|
||||
/// not active, returns None instead.
|
||||
pub fn info_for_metrics(&self) -> Option<FullTimelineInfo> {
|
||||
let shared_state = self.mutex.lock().unwrap();
|
||||
if !shared_state.active {
|
||||
if self.is_cancelled() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(FullTimelineInfo {
|
||||
zttid: self.zttid,
|
||||
replicas: shared_state
|
||||
.replicas
|
||||
.iter()
|
||||
.filter_map(|r| r.as_ref())
|
||||
.copied()
|
||||
.collect(),
|
||||
wal_backup_active: shared_state.wal_backup_active,
|
||||
timeline_is_active: shared_state.active,
|
||||
num_computes: shared_state.num_computes,
|
||||
last_removed_segno: shared_state.last_removed_segno,
|
||||
epoch_start_lsn: shared_state.sk.epoch_start_lsn,
|
||||
mem_state: shared_state.sk.inmem.clone(),
|
||||
persisted_state: shared_state.sk.state.clone(),
|
||||
flush_lsn: shared_state.sk.wal_store.flush_lsn(),
|
||||
})
|
||||
let state = self.write_shared_state();
|
||||
if state.active {
|
||||
Some(FullTimelineInfo {
|
||||
zttid: self.zttid,
|
||||
replicas: state
|
||||
.replicas
|
||||
.iter()
|
||||
.filter_map(|r| r.as_ref())
|
||||
.copied()
|
||||
.collect(),
|
||||
wal_backup_active: state.wal_backup_active,
|
||||
timeline_is_active: state.active,
|
||||
num_computes: state.num_computes,
|
||||
last_removed_segno: state.last_removed_segno,
|
||||
epoch_start_lsn: state.sk.epoch_start_lsn,
|
||||
mem_state: state.sk.inmem.clone(),
|
||||
persisted_state: state.sk.state.clone(),
|
||||
flush_lsn: state.sk.wal_store.flush_lsn(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns commit_lsn watch channel.
|
||||
pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver<Lsn> {
|
||||
self.commit_lsn_watch_rx.clone()
|
||||
}
|
||||
@@ -405,10 +550,14 @@ impl Timeline {
|
||||
&self,
|
||||
msg: &ProposerAcceptorMessage,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.zttid));
|
||||
}
|
||||
|
||||
let mut rmsg: Option<AcceptorProposerMessage>;
|
||||
let commit_lsn: Lsn;
|
||||
{
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
let mut shared_state = self.write_shared_state();
|
||||
rmsg = shared_state.sk.process_msg(msg)?;
|
||||
|
||||
// if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn
|
||||
@@ -426,28 +575,46 @@ impl Timeline {
|
||||
Ok(rmsg)
|
||||
}
|
||||
|
||||
/// Returns wal_seg_size.
|
||||
pub fn get_wal_seg_size(&self) -> usize {
|
||||
self.mutex.lock().unwrap().get_wal_seg_size()
|
||||
self.write_shared_state().get_wal_seg_size()
|
||||
}
|
||||
|
||||
/// Returns true only if the timeline is loaded and active.
|
||||
pub fn is_active(&self) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
self.write_shared_state().active
|
||||
}
|
||||
|
||||
/// Returns state of the timeline.
|
||||
pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) {
|
||||
let shared_state = self.mutex.lock().unwrap();
|
||||
(shared_state.sk.inmem.clone(), shared_state.sk.state.clone())
|
||||
let state = self.write_shared_state();
|
||||
(state.sk.inmem.clone(), state.sk.state.clone())
|
||||
}
|
||||
|
||||
/// Returns latest backup_lsn.
|
||||
pub fn get_wal_backup_lsn(&self) -> Lsn {
|
||||
self.mutex.lock().unwrap().sk.inmem.backup_lsn
|
||||
self.write_shared_state().sk.inmem.backup_lsn
|
||||
}
|
||||
|
||||
pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) {
|
||||
self.mutex.lock().unwrap().sk.inmem.backup_lsn = backup_lsn;
|
||||
/// Sets backup_lsn to the given value.
|
||||
pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.zttid));
|
||||
}
|
||||
|
||||
self.write_shared_state().sk.inmem.backup_lsn = backup_lsn;
|
||||
// we should check whether to shut down offloader, but this will be done
|
||||
// soon by peer communication anyway.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prepare public safekeeper info for reporting.
|
||||
/// Return public safekeeper info for broadcasting to broker and other peers.
|
||||
pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
|
||||
let shared_state = self.mutex.lock().unwrap();
|
||||
let shared_state = self.write_shared_state();
|
||||
SkTimelineInfo {
|
||||
last_log_term: Some(shared_state.sk.get_epoch()),
|
||||
flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()),
|
||||
@@ -473,12 +640,7 @@ impl Timeline {
|
||||
let is_wal_backup_action_pending: bool;
|
||||
let commit_lsn: Lsn;
|
||||
{
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
// WAL seg size not initialized yet (no message from compute ever
|
||||
// received), can't do much without it.
|
||||
if shared_state.get_wal_seg_size() == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
let mut shared_state = self.write_shared_state();
|
||||
shared_state.sk.record_safekeeper_info(sk_info)?;
|
||||
is_wal_backup_action_pending = shared_state.update_status(self.zttid);
|
||||
commit_lsn = shared_state.sk.inmem.commit_lsn;
|
||||
@@ -491,36 +653,40 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add send_wal replica to the in-memory vector of replicas.
|
||||
pub fn add_replica(&self, state: ReplicaState) -> usize {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.add_replica(state)
|
||||
self.write_shared_state().add_replica(state)
|
||||
}
|
||||
|
||||
/// Update replication replica state.
|
||||
pub fn update_replica_state(&self, id: usize, state: ReplicaState) {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
let mut shared_state = self.write_shared_state();
|
||||
shared_state.replicas[id] = Some(state);
|
||||
}
|
||||
|
||||
/// Remove send_wal replica from the in-memory vector of replicas.
|
||||
pub fn remove_replica(&self, id: usize) {
|
||||
let mut shared_state = self.mutex.lock().unwrap();
|
||||
let mut shared_state = self.write_shared_state();
|
||||
assert!(shared_state.replicas[id].is_some());
|
||||
shared_state.replicas[id] = None;
|
||||
}
|
||||
|
||||
pub fn get_end_of_wal(&self) -> Lsn {
|
||||
let shared_state = self.mutex.lock().unwrap();
|
||||
shared_state.sk.wal_store.flush_lsn()
|
||||
/// Returns flush_lsn.
|
||||
pub fn get_flush_lsn(&self) -> Lsn {
|
||||
self.write_shared_state().sk.wal_store.flush_lsn()
|
||||
}
|
||||
|
||||
/// Delete WAL segments from disk that are no longer needed. This is determined
|
||||
/// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
|
||||
pub fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.zttid));
|
||||
}
|
||||
|
||||
let horizon_segno: XLogSegNo;
|
||||
let remover: Box<dyn Fn(u64) -> Result<(), anyhow::Error>>;
|
||||
{
|
||||
let shared_state = self.mutex.lock().unwrap();
|
||||
// WAL seg size not initialized yet, no WAL exists.
|
||||
if shared_state.get_wal_seg_size() == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
let shared_state = self.write_shared_state();
|
||||
horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled);
|
||||
remover = shared_state.sk.wal_store.remove_up_to();
|
||||
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
|
||||
@@ -528,244 +694,22 @@ impl Timeline {
|
||||
}
|
||||
// release the lock before removing
|
||||
}
|
||||
let _enter =
|
||||
info_span!("", tenant = %self.zttid.tenant_id, timeline = %self.zttid.timeline_id)
|
||||
.entered();
|
||||
|
||||
// delete old WAL files
|
||||
remover(horizon_segno - 1)?;
|
||||
self.mutex.lock().unwrap().last_removed_segno = horizon_segno;
|
||||
|
||||
// update last_removed_segno
|
||||
let mut shared_state = self.write_shared_state();
|
||||
shared_state.last_removed_segno = horizon_segno;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Utilities needed by various Connection-like objects
|
||||
pub trait TimelineTools {
|
||||
fn set(&mut self, conf: &SafeKeeperConf, zttid: ZTenantTimelineId, create: bool) -> Result<()>;
|
||||
|
||||
fn get(&self) -> &Arc<Timeline>;
|
||||
}
|
||||
|
||||
impl TimelineTools for Option<Arc<Timeline>> {
|
||||
fn set(&mut self, conf: &SafeKeeperConf, zttid: ZTenantTimelineId, create: bool) -> Result<()> {
|
||||
*self = Some(GlobalTimelines::get(conf, zttid, create)?);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get(&self) -> &Arc<Timeline> {
|
||||
self.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
struct GlobalTimelinesState {
|
||||
timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>,
|
||||
wal_backup_launcher_tx: Option<Sender<ZTenantTimelineId>>,
|
||||
}
|
||||
|
||||
static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
|
||||
Mutex::new(GlobalTimelinesState {
|
||||
timelines: HashMap::new(),
|
||||
wal_backup_launcher_tx: None,
|
||||
})
|
||||
});
|
||||
|
||||
#[derive(Clone, Copy, Serialize)]
|
||||
pub struct TimelineDeleteForceResult {
|
||||
pub dir_existed: bool,
|
||||
pub was_active: bool,
|
||||
}
|
||||
|
||||
/// A zero-sized struct used to manage access to the global timelines map.
|
||||
pub struct GlobalTimelines;
|
||||
|
||||
impl GlobalTimelines {
|
||||
pub fn init(wal_backup_launcher_tx: Sender<ZTenantTimelineId>) {
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
assert!(state.wal_backup_launcher_tx.is_none());
|
||||
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
|
||||
}
|
||||
|
||||
fn create_internal(
|
||||
mut state: MutexGuard<GlobalTimelinesState>,
|
||||
conf: &SafeKeeperConf,
|
||||
zttid: ZTenantTimelineId,
|
||||
peer_ids: Vec<NodeId>,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
match state.timelines.get(&zttid) {
|
||||
Some(_) => bail!("timeline {} already exists", zttid),
|
||||
None => {
|
||||
// TODO: check directory existence
|
||||
let dir = conf.timeline_dir(&zttid);
|
||||
fs::create_dir_all(dir)?;
|
||||
|
||||
let shared_state = SharedState::create(conf, &zttid, peer_ids)
|
||||
.context("failed to create shared state")?;
|
||||
|
||||
let new_tli = Arc::new(Timeline::new(
|
||||
zttid,
|
||||
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
||||
shared_state,
|
||||
));
|
||||
state.timelines.insert(zttid, Arc::clone(&new_tli));
|
||||
Ok(new_tli)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create(
|
||||
conf: &SafeKeeperConf,
|
||||
zttid: ZTenantTimelineId,
|
||||
peer_ids: Vec<NodeId>,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
GlobalTimelines::create_internal(state, conf, zttid, peer_ids)
|
||||
}
|
||||
|
||||
/// Get a timeline with control file loaded from the global TIMELINES_STATE.timelines map.
|
||||
/// If control file doesn't exist and create=false, bails out.
|
||||
pub fn get(
|
||||
conf: &SafeKeeperConf,
|
||||
zttid: ZTenantTimelineId,
|
||||
create: bool,
|
||||
) -> Result<Arc<Timeline>> {
|
||||
let _enter = info_span!("", timeline = %zttid.timeline_id).entered();
|
||||
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
|
||||
match state.timelines.get(&zttid) {
|
||||
Some(result) => Ok(Arc::clone(result)),
|
||||
None => {
|
||||
let shared_state = SharedState::restore(conf, &zttid);
|
||||
|
||||
let shared_state = match shared_state {
|
||||
Ok(shared_state) => shared_state,
|
||||
Err(error) => {
|
||||
// TODO: always create timeline explicitly
|
||||
if error
|
||||
.root_cause()
|
||||
.to_string()
|
||||
.contains("No such file or directory")
|
||||
&& create
|
||||
{
|
||||
return GlobalTimelines::create_internal(state, conf, zttid, vec![]);
|
||||
} else {
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let new_tli = Arc::new(Timeline::new(
|
||||
zttid,
|
||||
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
||||
shared_state,
|
||||
));
|
||||
state.timelines.insert(zttid, Arc::clone(&new_tli));
|
||||
Ok(new_tli)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get loaded timeline, if it exists.
|
||||
pub fn get_loaded(zttid: ZTenantTimelineId) -> Option<Arc<Timeline>> {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
state.timelines.get(&zttid).map(Arc::clone)
|
||||
}
|
||||
|
||||
/// Get ZTenantTimelineIDs of all active timelines.
|
||||
pub fn get_active_timelines() -> HashSet<ZTenantTimelineId> {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
state
|
||||
.timelines
|
||||
.iter()
|
||||
.filter(|&(_, tli)| tli.is_active())
|
||||
.map(|(zttid, _)| *zttid)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Return FullTimelineInfo for all active timelines.
|
||||
pub fn active_timelines_metrics() -> Vec<FullTimelineInfo> {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
state
|
||||
.timelines
|
||||
.iter()
|
||||
.filter_map(|(_, tli)| tli.info_for_metrics())
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn delete_force_internal(
|
||||
conf: &SafeKeeperConf,
|
||||
zttid: &ZTenantTimelineId,
|
||||
was_active: bool,
|
||||
) -> Result<TimelineDeleteForceResult> {
|
||||
match std::fs::remove_dir_all(conf.timeline_dir(zttid)) {
|
||||
Ok(_) => Ok(TimelineDeleteForceResult {
|
||||
dir_existed: true,
|
||||
was_active,
|
||||
}),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(TimelineDeleteForceResult {
|
||||
dir_existed: false,
|
||||
was_active,
|
||||
}),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Deactivates and deletes the timeline, see `Timeline::deactivate_for_delete()`, the deletes
|
||||
/// the corresponding data directory.
|
||||
/// We assume all timeline threads do not care about `GlobalTimelines` not containing the timeline
|
||||
/// anymore, and they will eventually terminate without panics.
|
||||
///
|
||||
/// There are multiple ways the timeline may be accidentally "re-created" (so we end up with two
|
||||
/// `Timeline` objects in memory):
|
||||
/// a) a compute node connects after this method is called, or
|
||||
/// b) an HTTP GET request about the timeline is made and it's able to restore the current state, or
|
||||
/// c) an HTTP POST request for timeline creation is made after the timeline is already deleted.
|
||||
/// TODO: ensure all of the above never happens.
|
||||
pub async fn delete_force(
|
||||
conf: &SafeKeeperConf,
|
||||
zttid: &ZTenantTimelineId,
|
||||
) -> Result<TimelineDeleteForceResult> {
|
||||
info!("deleting timeline {}", zttid);
|
||||
let timeline = TIMELINES_STATE.lock().unwrap().timelines.remove(zttid);
|
||||
let mut was_active = false;
|
||||
if let Some(tli) = timeline {
|
||||
was_active = tli.deactivate_for_delete().await?;
|
||||
}
|
||||
GlobalTimelines::delete_force_internal(conf, zttid, was_active)
|
||||
}
|
||||
|
||||
/// Deactivates and deletes all timelines for the tenant, see `delete()`.
|
||||
/// Returns map of all timelines which the tenant had, `true` if a timeline was active.
|
||||
/// There may be a race if new timelines are created simultaneously.
|
||||
pub async fn delete_force_all_for_tenant(
|
||||
conf: &SafeKeeperConf,
|
||||
tenant_id: &ZTenantId,
|
||||
) -> Result<HashMap<ZTenantTimelineId, TimelineDeleteForceResult>> {
|
||||
info!("deleting all timelines for tenant {}", tenant_id);
|
||||
let mut to_delete = HashMap::new();
|
||||
{
|
||||
// Keep mutex in this scope.
|
||||
let timelines = &mut TIMELINES_STATE.lock().unwrap().timelines;
|
||||
for (&zttid, tli) in timelines.iter() {
|
||||
if zttid.tenant_id == *tenant_id {
|
||||
to_delete.insert(zttid, tli.clone());
|
||||
}
|
||||
}
|
||||
// TODO: test that the correct subset of timelines is removed. It's complicated because they are implicitly created currently.
|
||||
timelines.retain(|zttid, _| !to_delete.contains_key(zttid));
|
||||
}
|
||||
let mut deleted = HashMap::new();
|
||||
for (zttid, timeline) in to_delete {
|
||||
let was_active = timeline.deactivate_for_delete().await?;
|
||||
deleted.insert(
|
||||
zttid,
|
||||
GlobalTimelines::delete_force_internal(conf, &zttid, was_active)?,
|
||||
);
|
||||
}
|
||||
// There may be inactive timelines, so delete the whole tenant dir as well.
|
||||
match std::fs::remove_dir_all(conf.tenant_dir(tenant_id)) {
|
||||
Ok(_) => (),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => (),
|
||||
e => e?,
|
||||
};
|
||||
Ok(deleted)
|
||||
/// Deletes directory and it's contents. Returns false if directory does not exist.
|
||||
fn delete_dir(path: &PathBuf) -> Result<bool> {
|
||||
match std::fs::remove_dir_all(path) {
|
||||
Ok(_) => Ok(true),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
348
safekeeper/src/timelines_global_map.rs
Normal file
348
safekeeper/src/timelines_global_map.rs
Normal file
@@ -0,0 +1,348 @@
|
||||
//! This module contains global (tenant_id, timeline_id) -> Arc<Timeline> mapping.
|
||||
//! All timelines should always be present in this map, this is done by loading them
|
||||
//! all from the disk on startup and keeping them in memory.
|
||||
|
||||
use crate::safekeeper::ServerInfo;
|
||||
use crate::timeline::{Timeline, TimelineError};
|
||||
use crate::SafeKeeperConf;
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing::*;
|
||||
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
|
||||
|
||||
struct GlobalTimelinesState {
|
||||
timelines: HashMap<ZTenantTimelineId, Arc<Timeline>>,
|
||||
wal_backup_launcher_tx: Option<Sender<ZTenantTimelineId>>,
|
||||
conf: SafeKeeperConf,
|
||||
}
|
||||
|
||||
impl GlobalTimelinesState {
|
||||
/// Get dependencies for a timeline constructor.
|
||||
fn get_dependencies(&self) -> (SafeKeeperConf, Sender<ZTenantTimelineId>) {
|
||||
(
|
||||
self.conf.clone(),
|
||||
self.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Insert timeline into the map. Returns error if timeline with the same id already exists.
|
||||
fn try_insert(&mut self, timeline: Arc<Timeline>) -> Result<()> {
|
||||
let zttid = timeline.zttid;
|
||||
if self.timelines.contains_key(&zttid) {
|
||||
bail!(TimelineError::AlreadyExists(zttid));
|
||||
}
|
||||
self.timelines.insert(zttid, timeline);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get timeline from the map. Returns error if timeline doesn't exist.
|
||||
fn get(&self, zttid: &ZTenantTimelineId) -> Result<Arc<Timeline>> {
|
||||
self.timelines
|
||||
.get(zttid)
|
||||
.cloned()
|
||||
.ok_or_else(|| anyhow!(TimelineError::NotFound(*zttid)))
|
||||
}
|
||||
}
|
||||
|
||||
static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
|
||||
Mutex::new(GlobalTimelinesState {
|
||||
timelines: HashMap::new(),
|
||||
wal_backup_launcher_tx: None,
|
||||
conf: SafeKeeperConf::default(),
|
||||
})
|
||||
});
|
||||
|
||||
/// A zero-sized struct used to manage access to the global timelines map.
|
||||
pub struct GlobalTimelines;
|
||||
|
||||
impl GlobalTimelines {
|
||||
/// Inject dependencies needed for the timeline constructors and load all timelines to memory.
|
||||
pub fn init(
|
||||
conf: SafeKeeperConf,
|
||||
wal_backup_launcher_tx: Sender<ZTenantTimelineId>,
|
||||
) -> Result<()> {
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
assert!(state.wal_backup_launcher_tx.is_none());
|
||||
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
|
||||
state.conf = conf;
|
||||
|
||||
// Iterate through all directories and load tenants for all directories
|
||||
// named as a valid tenant_id.
|
||||
let mut tenant_count = 0;
|
||||
let tenants_dir = state.conf.workdir.clone();
|
||||
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
|
||||
.with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))?
|
||||
{
|
||||
match &tenants_dir_entry {
|
||||
Ok(tenants_dir_entry) => {
|
||||
if let Ok(tenant_id) =
|
||||
ZTenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
|
||||
{
|
||||
tenant_count += 1;
|
||||
GlobalTimelines::load_tenant_timelines(&mut state, tenant_id)?;
|
||||
}
|
||||
}
|
||||
Err(e) => error!(
|
||||
"failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
|
||||
tenants_dir_entry,
|
||||
tenants_dir.display(),
|
||||
e
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"found {} tenants directories, successfully loaded {} timelines",
|
||||
tenant_count,
|
||||
state.timelines.len()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir errors if any.
|
||||
fn load_tenant_timelines(
|
||||
state: &mut MutexGuard<GlobalTimelinesState>,
|
||||
tenant_id: ZTenantId,
|
||||
) -> Result<()> {
|
||||
let timelines_dir = state.conf.tenant_dir(&tenant_id);
|
||||
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
|
||||
.with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))?
|
||||
{
|
||||
match &timelines_dir_entry {
|
||||
Ok(timeline_dir_entry) => {
|
||||
if let Ok(timeline_id) =
|
||||
ZTimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
|
||||
{
|
||||
let zttid = ZTenantTimelineId::new(tenant_id, timeline_id);
|
||||
match Timeline::load_timeline(
|
||||
state.conf.clone(),
|
||||
zttid,
|
||||
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
||||
) {
|
||||
Ok(timeline) => {
|
||||
state.timelines.insert(zttid, Arc::new(timeline));
|
||||
}
|
||||
// If we can't load a timeline, it's most likely because of a corrupted
|
||||
// directory. We will log an error and won't allow to delete/recreate
|
||||
// this timeline. The only way to fix this timeline is to repair manually
|
||||
// and restart the safekeeper.
|
||||
Err(e) => error!(
|
||||
"failed to load timeline {} for tenant {}, reason: {:?}",
|
||||
timeline_id, tenant_id, e
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => error!(
|
||||
"failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
|
||||
timelines_dir_entry,
|
||||
timelines_dir.display(),
|
||||
e
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a new timeline with the given id. If the timeline already exists, returns
|
||||
/// an existing timeline.
|
||||
pub fn create(zttid: ZTenantTimelineId, server_info: ServerInfo) -> Result<Arc<Timeline>> {
|
||||
let (conf, wal_backup_launcher_tx) = {
|
||||
let state = TIMELINES_STATE.lock().unwrap();
|
||||
if let Ok(timeline) = state.get(&zttid) {
|
||||
// Timeline already exists, return it.
|
||||
return Ok(timeline);
|
||||
}
|
||||
state.get_dependencies()
|
||||
};
|
||||
|
||||
info!("creating new timeline {}", zttid);
|
||||
|
||||
let timeline = Arc::new(Timeline::create_empty(
|
||||
conf,
|
||||
zttid,
|
||||
wal_backup_launcher_tx,
|
||||
server_info,
|
||||
)?);
|
||||
|
||||
// Take a lock and finish the initialization holding this mutex. No other threads
|
||||
// can interfere with creation after we will insert timeline into the map.
|
||||
let mut shared_state = timeline.write_shared_state();
|
||||
|
||||
// We can get a race condition here in case of concurrent create calls, but only
|
||||
// in theory. create() will return valid timeline on the next try.
|
||||
TIMELINES_STATE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.try_insert(timeline.clone())?;
|
||||
|
||||
// Write the new timeline to the disk and start background workers.
|
||||
// Bootstrap is transactional, so if it fails, the timeline will be deleted,
|
||||
// and the state on disk should remain unchanged.
|
||||
match timeline.bootstrap(&mut shared_state) {
|
||||
Ok(_) => {
|
||||
// We are done with bootstrap, release the lock, return the timeline.
|
||||
drop(shared_state);
|
||||
Ok(timeline)
|
||||
}
|
||||
Err(e) => {
|
||||
// Note: the most likely reason for bootstrap failure is that the timeline
|
||||
// directory already exists on disk. This happens when timeline is corrupted
|
||||
// and wasn't loaded from disk on startup because of that. We want to preserve
|
||||
// the timeline directory in this case, for further inspection.
|
||||
|
||||
// TODO: this is an unusual error, perhaps we should send it to sentry
|
||||
// TODO: compute will try to create timeline every second, we should add backoff
|
||||
error!("failed to bootstrap timeline {}: {}", zttid, e);
|
||||
|
||||
// Timeline failed to bootstrap, it cannot be used. Remove it from the map.
|
||||
TIMELINES_STATE.lock().unwrap().timelines.remove(&zttid);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
|
||||
/// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
|
||||
/// i.e. loaded in memory and not cancelled.
|
||||
pub fn get(zttid: ZTenantTimelineId) -> Result<Arc<Timeline>> {
|
||||
let res = TIMELINES_STATE.lock().unwrap().get(&zttid);
|
||||
|
||||
match res {
|
||||
Ok(tli) => {
|
||||
if tli.is_cancelled() {
|
||||
anyhow::bail!(TimelineError::Cancelled(zttid));
|
||||
}
|
||||
Ok(tli)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns all timelines. This is used for background timeline proccesses.
|
||||
pub fn get_all() -> Vec<Arc<Timeline>> {
|
||||
let global_lock = TIMELINES_STATE.lock().unwrap();
|
||||
global_lock
|
||||
.timelines
|
||||
.values()
|
||||
.cloned()
|
||||
.filter(|t| !t.is_cancelled())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
|
||||
/// and that's why it can return cancelled timelines, to retry deleting them.
|
||||
fn get_all_for_tenant(tenant_id: ZTenantId) -> Vec<Arc<Timeline>> {
|
||||
let global_lock = TIMELINES_STATE.lock().unwrap();
|
||||
global_lock
|
||||
.timelines
|
||||
.values()
|
||||
.filter(|t| t.zttid.tenant_id == tenant_id)
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Cancels timeline, then deletes the corresponding data directory.
|
||||
pub fn delete_force(zttid: &ZTenantTimelineId) -> Result<TimelineDeleteForceResult> {
|
||||
let tli_res = TIMELINES_STATE.lock().unwrap().get(zttid);
|
||||
match tli_res {
|
||||
Ok(timeline) => {
|
||||
// Take a lock and finish the deletion holding this mutex.
|
||||
let mut shared_state = timeline.write_shared_state();
|
||||
|
||||
info!("deleting timeline {}", zttid);
|
||||
let (dir_existed, was_active) = timeline.delete_from_disk(&mut shared_state)?;
|
||||
|
||||
// Remove timeline from the map.
|
||||
TIMELINES_STATE.lock().unwrap().timelines.remove(zttid);
|
||||
|
||||
Ok(TimelineDeleteForceResult {
|
||||
dir_existed,
|
||||
was_active,
|
||||
})
|
||||
}
|
||||
Err(_) => {
|
||||
// Timeline is not memory, but it may still exist on disk in broken state.
|
||||
let dir_path = TIMELINES_STATE.lock().unwrap().conf.timeline_dir(zttid);
|
||||
let dir_existed = delete_dir(dir_path)?;
|
||||
|
||||
Ok(TimelineDeleteForceResult {
|
||||
dir_existed,
|
||||
was_active: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
|
||||
/// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
|
||||
/// created simultaneously. In that case the function will return error and the caller should
|
||||
/// retry tenant deletion again later.
|
||||
pub fn delete_force_all_for_tenant(
|
||||
tenant_id: &ZTenantId,
|
||||
) -> Result<HashMap<ZTenantTimelineId, TimelineDeleteForceResult>> {
|
||||
info!("deleting all timelines for tenant {}", tenant_id);
|
||||
let to_delete = Self::get_all_for_tenant(*tenant_id);
|
||||
|
||||
let mut err = None;
|
||||
|
||||
let mut deleted = HashMap::new();
|
||||
for tli in &to_delete {
|
||||
match Self::delete_force(&tli.zttid) {
|
||||
Ok(result) => {
|
||||
deleted.insert(tli.zttid, result);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("failed to delete timeline {}: {}", tli.zttid, e);
|
||||
// Save error to return later.
|
||||
err = Some(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If there was an error, return it.
|
||||
if let Some(e) = err {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
// There may be broken timelines on disk, so delete the whole tenant dir as well.
|
||||
// Note that we could concurrently create new timelines while we were deleting them,
|
||||
// so the directory may be not empty. In this case timelines will have bad state
|
||||
// and timeline background jobs can panic.
|
||||
delete_dir(TIMELINES_STATE.lock().unwrap().conf.tenant_dir(tenant_id))?;
|
||||
|
||||
let tlis_after_delete = Self::get_all_for_tenant(*tenant_id);
|
||||
if !tlis_after_delete.is_empty() {
|
||||
// Some timelines were created while we were deleting them, returning error
|
||||
// to the caller, so it can retry later.
|
||||
bail!(
|
||||
"failed to delete all timelines for tenant {}: some timelines were created while we were deleting them",
|
||||
tenant_id
|
||||
);
|
||||
}
|
||||
|
||||
Ok(deleted)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Serialize)]
|
||||
pub struct TimelineDeleteForceResult {
|
||||
pub dir_existed: bool,
|
||||
pub was_active: bool,
|
||||
}
|
||||
|
||||
/// Deletes directory and it's contents. Returns false if directory does not exist.
|
||||
fn delete_dir(path: PathBuf) -> Result<bool> {
|
||||
match std::fs::remove_dir_all(path) {
|
||||
Ok(_) => Ok(true),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
@@ -26,8 +26,8 @@ use tracing::*;
|
||||
use utils::{lsn::Lsn, zid::ZTenantTimelineId};
|
||||
|
||||
use crate::broker::{Election, ElectionLeader};
|
||||
use crate::timeline::{GlobalTimelines, Timeline};
|
||||
use crate::{broker, SafeKeeperConf};
|
||||
use crate::timeline::Timeline;
|
||||
use crate::{broker, GlobalTimelines, SafeKeeperConf};
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
|
||||
@@ -54,7 +54,9 @@ pub fn wal_backup_launcher_thread_main(
|
||||
/// Check whether wal backup is required for timeline. If yes, mark that launcher is
|
||||
/// aware of current status and return the timeline.
|
||||
fn is_wal_backup_required(zttid: ZTenantTimelineId) -> Option<Arc<Timeline>> {
|
||||
GlobalTimelines::get_loaded(zttid).filter(|t| t.wal_backup_attend())
|
||||
GlobalTimelines::get(zttid)
|
||||
.ok()
|
||||
.filter(|tli| tli.wal_backup_attend())
|
||||
}
|
||||
|
||||
struct WalBackupTaskHandle {
|
||||
@@ -191,7 +193,8 @@ struct WalBackupTask {
|
||||
election: Election,
|
||||
}
|
||||
|
||||
/// Offload single timeline.
|
||||
/// Offload single timeline. Called only after we checked that backup
|
||||
/// is required (wal_backup_attend) and possible (can_wal_backup).
|
||||
async fn backup_task_main(
|
||||
zttid: ZTenantTimelineId,
|
||||
timeline_dir: PathBuf,
|
||||
@@ -199,18 +202,17 @@ async fn backup_task_main(
|
||||
election: Election,
|
||||
) {
|
||||
info!("started");
|
||||
let timeline: Arc<Timeline> = if let Some(tli) = GlobalTimelines::get_loaded(zttid) {
|
||||
tli
|
||||
} else {
|
||||
/* Timeline could get deleted while task was starting, just exit then. */
|
||||
info!("no timeline, exiting");
|
||||
let res = GlobalTimelines::get(zttid);
|
||||
if let Err(e) = res {
|
||||
error!("backup error for timeline {}: {}", zttid, e);
|
||||
return;
|
||||
};
|
||||
}
|
||||
let tli = res.unwrap();
|
||||
|
||||
let mut wb = WalBackupTask {
|
||||
wal_seg_size: timeline.get_wal_seg_size(),
|
||||
commit_lsn_watch_rx: timeline.get_commit_lsn_watch_rx(),
|
||||
timeline,
|
||||
wal_seg_size: tli.get_wal_seg_size(),
|
||||
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
|
||||
timeline: tli,
|
||||
timeline_dir,
|
||||
leader: None,
|
||||
election,
|
||||
@@ -322,7 +324,11 @@ impl WalBackupTask {
|
||||
{
|
||||
Ok(backup_lsn_result) => {
|
||||
backup_lsn = backup_lsn_result;
|
||||
self.timeline.set_wal_backup_lsn(backup_lsn_result);
|
||||
let res = self.timeline.set_wal_backup_lsn(backup_lsn_result);
|
||||
if let Err(e) = res {
|
||||
error!("backup error: {}", e);
|
||||
return;
|
||||
}
|
||||
retry_attempt = 0;
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
//!
|
||||
//! Note that last file has `.partial` suffix, that's different from postgres.
|
||||
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use anyhow::{bail, Context, Result};
|
||||
use std::io::{self, Seek, SeekFrom};
|
||||
use std::pin::Pin;
|
||||
use tokio::io::AsyncRead;
|
||||
@@ -17,7 +17,7 @@ use postgres_ffi::v14::xlog_utils::{
|
||||
find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName,
|
||||
};
|
||||
use postgres_ffi::{XLogSegNo, PG_TLI};
|
||||
use std::cmp::min;
|
||||
use std::cmp::{max, min};
|
||||
|
||||
use std::fs::{self, remove_file, File, OpenOptions};
|
||||
use std::io::Write;
|
||||
@@ -101,9 +101,6 @@ pub trait Storage {
|
||||
/// LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn;
|
||||
|
||||
/// Init storage with wal_seg_size and read WAL from disk to get latest LSN.
|
||||
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()>;
|
||||
|
||||
/// Write piece of WAL from buf to disk, but not necessarily sync it.
|
||||
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
|
||||
|
||||
@@ -119,7 +116,7 @@ pub trait Storage {
|
||||
}
|
||||
|
||||
/// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
|
||||
/// for better performance. Storage must be initialized before use.
|
||||
/// for better performance. Storage is initialized in the constructor.
|
||||
///
|
||||
/// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
|
||||
/// its filename and may be not fully flushed.
|
||||
@@ -127,16 +124,14 @@ pub trait Storage {
|
||||
/// Relationship of LSNs:
|
||||
/// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
|
||||
///
|
||||
/// When storage is just created, all LSNs are zeroes and there are no segments on disk.
|
||||
/// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
|
||||
pub struct PhysicalStorage {
|
||||
metrics: WalStorageMetrics,
|
||||
zttid: ZTenantTimelineId,
|
||||
timeline_dir: PathBuf,
|
||||
conf: SafeKeeperConf,
|
||||
|
||||
// fields below are filled upon initialization
|
||||
/// None if uninitialized, Some(usize) if storage is initialized.
|
||||
wal_seg_size: Option<usize>,
|
||||
/// Size of WAL segment in bytes.
|
||||
wal_seg_size: usize,
|
||||
|
||||
/// Written to disk, but possibly still in the cache and not fully persisted.
|
||||
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
|
||||
@@ -161,25 +156,47 @@ pub struct PhysicalStorage {
|
||||
}
|
||||
|
||||
impl PhysicalStorage {
|
||||
pub fn new(zttid: &ZTenantTimelineId, conf: &SafeKeeperConf) -> PhysicalStorage {
|
||||
/// Create new storage. If commit_lsn is not zero, flush_lsn is tried to be restored from
|
||||
/// the disk. Otherwise, all LSNs are set to zero.
|
||||
pub fn new(
|
||||
zttid: &ZTenantTimelineId,
|
||||
conf: &SafeKeeperConf,
|
||||
state: &SafeKeeperState,
|
||||
) -> Result<PhysicalStorage> {
|
||||
let timeline_dir = conf.timeline_dir(zttid);
|
||||
PhysicalStorage {
|
||||
let wal_seg_size = state.server.wal_seg_size as usize;
|
||||
|
||||
// Find out where stored WAL ends, starting at commit_lsn which is a
|
||||
// known recent record boundary (unless we don't have WAL at all).
|
||||
let write_lsn = if state.commit_lsn == Lsn(0) {
|
||||
Lsn(0)
|
||||
} else {
|
||||
find_end_of_wal(&timeline_dir, wal_seg_size, state.commit_lsn)?
|
||||
};
|
||||
|
||||
// TODO: do we really know that write_lsn is fully flushed to disk?
|
||||
// If not, maybe it's better to call fsync() here to be sure?
|
||||
let flush_lsn = write_lsn;
|
||||
|
||||
info!(
|
||||
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
|
||||
zttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
|
||||
);
|
||||
if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
|
||||
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", zttid.timeline_id);
|
||||
}
|
||||
|
||||
Ok(PhysicalStorage {
|
||||
metrics: WalStorageMetrics::new(zttid),
|
||||
zttid: *zttid,
|
||||
timeline_dir,
|
||||
conf: conf.clone(),
|
||||
wal_seg_size: None,
|
||||
write_lsn: Lsn(0),
|
||||
write_record_lsn: Lsn(0),
|
||||
flush_record_lsn: Lsn(0),
|
||||
decoder: WalStreamDecoder::new(Lsn(0)),
|
||||
wal_seg_size,
|
||||
write_lsn,
|
||||
write_record_lsn: write_lsn,
|
||||
flush_record_lsn: flush_lsn,
|
||||
decoder: WalStreamDecoder::new(write_lsn),
|
||||
file: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper for flush_lsn updates that also updates metrics.
|
||||
fn update_flush_lsn(&mut self) {
|
||||
self.flush_record_lsn = self.write_record_lsn;
|
||||
})
|
||||
}
|
||||
|
||||
/// Call fdatasync if config requires so.
|
||||
@@ -204,9 +221,9 @@ impl PhysicalStorage {
|
||||
|
||||
/// Open or create WAL segment file. Caller must call seek to the wanted position.
|
||||
/// Returns `file` and `is_partial`.
|
||||
fn open_or_create(&self, segno: XLogSegNo, wal_seg_size: usize) -> Result<(File, bool)> {
|
||||
fn open_or_create(&self, segno: XLogSegNo) -> Result<(File, bool)> {
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
|
||||
// Try to open already completed segment
|
||||
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
|
||||
@@ -222,24 +239,18 @@ impl PhysicalStorage {
|
||||
.open(&wal_file_partial_path)
|
||||
.with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
|
||||
|
||||
write_zeroes(&mut file, wal_seg_size)?;
|
||||
write_zeroes(&mut file, self.wal_seg_size)?;
|
||||
self.fsync_file(&mut file)?;
|
||||
Ok((file, true))
|
||||
}
|
||||
}
|
||||
|
||||
/// Write WAL bytes, which are known to be located in a single WAL segment.
|
||||
fn write_in_segment(
|
||||
&mut self,
|
||||
segno: u64,
|
||||
xlogoff: usize,
|
||||
buf: &[u8],
|
||||
wal_seg_size: usize,
|
||||
) -> Result<()> {
|
||||
fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
|
||||
let mut file = if let Some(file) = self.file.take() {
|
||||
file
|
||||
} else {
|
||||
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
|
||||
let (mut file, is_partial) = self.open_or_create(segno)?;
|
||||
assert!(is_partial, "unexpected write into non-partial segment file");
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
file
|
||||
@@ -247,13 +258,13 @@ impl PhysicalStorage {
|
||||
|
||||
file.write_all(buf)?;
|
||||
|
||||
if xlogoff + buf.len() == wal_seg_size {
|
||||
if xlogoff + buf.len() == self.wal_seg_size {
|
||||
// If we reached the end of a WAL segment, flush and close it.
|
||||
self.fdatasync_file(&mut file)?;
|
||||
|
||||
// Rename partial file to completed file
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
fs::rename(&wal_file_partial_path, &wal_file_path)?;
|
||||
} else {
|
||||
// otherwise, file can be reused later
|
||||
@@ -269,10 +280,6 @@ impl PhysicalStorage {
|
||||
///
|
||||
/// Updates `write_lsn`.
|
||||
fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
|
||||
let wal_seg_size = self
|
||||
.wal_seg_size
|
||||
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
|
||||
|
||||
if self.write_lsn != pos {
|
||||
// need to flush the file before discarding it
|
||||
if let Some(mut file) = self.file.take() {
|
||||
@@ -284,17 +291,17 @@ impl PhysicalStorage {
|
||||
|
||||
while !buf.is_empty() {
|
||||
// Extract WAL location for this block
|
||||
let xlogoff = self.write_lsn.segment_offset(wal_seg_size) as usize;
|
||||
let segno = self.write_lsn.segment_number(wal_seg_size);
|
||||
let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size) as usize;
|
||||
let segno = self.write_lsn.segment_number(self.wal_seg_size);
|
||||
|
||||
// If crossing a WAL boundary, only write up until we reach wal segment size.
|
||||
let bytes_write = if xlogoff + buf.len() > wal_seg_size {
|
||||
wal_seg_size - xlogoff
|
||||
let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
|
||||
self.wal_seg_size - xlogoff
|
||||
} else {
|
||||
buf.len()
|
||||
};
|
||||
|
||||
self.write_in_segment(segno, xlogoff, &buf[..bytes_write], wal_seg_size)?;
|
||||
self.write_in_segment(segno, xlogoff, &buf[..bytes_write])?;
|
||||
self.write_lsn += bytes_write as u64;
|
||||
buf = &buf[bytes_write..];
|
||||
}
|
||||
@@ -309,53 +316,6 @@ impl Storage for PhysicalStorage {
|
||||
self.flush_record_lsn
|
||||
}
|
||||
|
||||
/// Storage needs to know wal_seg_size to know which segment to read/write, but
|
||||
/// wal_seg_size is not always known at the moment of storage creation. This method
|
||||
/// allows to postpone its initialization.
|
||||
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()> {
|
||||
if state.server.wal_seg_size == 0 {
|
||||
// wal_seg_size is still unknown. This is dead path normally, should
|
||||
// be used only in tests.
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(wal_seg_size) = self.wal_seg_size {
|
||||
// physical storage is already initialized
|
||||
assert_eq!(wal_seg_size, state.server.wal_seg_size as usize);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// initialize physical storage
|
||||
let wal_seg_size = state.server.wal_seg_size as usize;
|
||||
self.wal_seg_size = Some(wal_seg_size);
|
||||
|
||||
// Find out where stored WAL ends, starting at commit_lsn which is a
|
||||
// known recent record boundary (unless we don't have WAL at all).
|
||||
self.write_lsn = if state.commit_lsn == Lsn(0) {
|
||||
Lsn(0)
|
||||
} else {
|
||||
find_end_of_wal(&self.timeline_dir, wal_seg_size, state.commit_lsn)?
|
||||
};
|
||||
|
||||
self.write_record_lsn = self.write_lsn;
|
||||
|
||||
// TODO: do we really know that write_lsn is fully flushed to disk?
|
||||
// If not, maybe it's better to call fsync() here to be sure?
|
||||
self.update_flush_lsn();
|
||||
|
||||
info!(
|
||||
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
|
||||
self.zttid.timeline_id, self.flush_record_lsn, state.commit_lsn, state.peer_horizon_lsn,
|
||||
);
|
||||
if self.flush_record_lsn < state.commit_lsn
|
||||
|| self.flush_record_lsn < state.peer_horizon_lsn
|
||||
{
|
||||
warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", self.zttid.timeline_id);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write WAL to disk.
|
||||
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
// Disallow any non-sequential writes, which can result in gaps or overwrites.
|
||||
@@ -419,80 +379,83 @@ impl Storage for PhysicalStorage {
|
||||
// We have unflushed data (write_lsn != flush_lsn), but no file.
|
||||
// This should only happen if last file was fully written and flushed,
|
||||
// but haven't updated flush_lsn yet.
|
||||
assert!(self.write_lsn.segment_offset(self.wal_seg_size.unwrap()) == 0);
|
||||
if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
|
||||
bail!(
|
||||
"unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
|
||||
self.write_lsn,
|
||||
self.flush_record_lsn
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// everything is flushed now, let's update flush_lsn
|
||||
self.update_flush_lsn();
|
||||
self.flush_record_lsn = self.write_record_lsn;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Truncate written WAL by removing all WAL segments after the given LSN.
|
||||
/// end_pos must point to the end of the WAL record.
|
||||
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
|
||||
let wal_seg_size = self
|
||||
.wal_seg_size
|
||||
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
|
||||
|
||||
// Streaming must not create a hole, so truncate cannot be called on non-written lsn
|
||||
assert!(self.write_lsn == Lsn(0) || self.write_lsn >= end_pos);
|
||||
if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
|
||||
bail!(
|
||||
"truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
|
||||
self.write_lsn,
|
||||
end_pos
|
||||
);
|
||||
}
|
||||
|
||||
// Close previously opened file, if any
|
||||
if let Some(mut unflushed_file) = self.file.take() {
|
||||
self.fdatasync_file(&mut unflushed_file)?;
|
||||
}
|
||||
|
||||
let xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
|
||||
let segno = end_pos.segment_number(wal_seg_size);
|
||||
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
|
||||
let xlogoff = end_pos.segment_offset(self.wal_seg_size) as usize;
|
||||
let segno = end_pos.segment_number(self.wal_seg_size);
|
||||
|
||||
// Remove all segments after the given LSN.
|
||||
remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno)?;
|
||||
|
||||
let (mut file, is_partial) = self.open_or_create(segno)?;
|
||||
|
||||
// Fill end with zeroes
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
write_zeroes(&mut file, wal_seg_size - xlogoff)?;
|
||||
write_zeroes(&mut file, self.wal_seg_size - xlogoff)?;
|
||||
self.fdatasync_file(&mut file)?;
|
||||
|
||||
if !is_partial {
|
||||
// Make segment partial once again
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
|
||||
wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
|
||||
fs::rename(&wal_file_path, &wal_file_partial_path)?;
|
||||
}
|
||||
|
||||
// Remove all subsequent segments
|
||||
let mut segno = segno;
|
||||
loop {
|
||||
segno += 1;
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
|
||||
// TODO: better use fs::try_exists which is currently available only in nightly build
|
||||
if wal_file_path.exists() {
|
||||
fs::remove_file(&wal_file_path)?;
|
||||
} else if wal_file_partial_path.exists() {
|
||||
fs::remove_file(&wal_file_partial_path)?;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Update LSNs
|
||||
self.write_lsn = end_pos;
|
||||
self.write_record_lsn = end_pos;
|
||||
self.update_flush_lsn();
|
||||
self.flush_record_lsn = end_pos;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_up_to(&self) -> Box<dyn Fn(XLogSegNo) -> Result<()>> {
|
||||
let timeline_dir = self.timeline_dir.clone();
|
||||
let wal_seg_size = self.wal_seg_size.unwrap();
|
||||
let wal_seg_size = self.wal_seg_size;
|
||||
Box::new(move |segno_up_to: XLogSegNo| {
|
||||
remove_up_to(&timeline_dir, wal_seg_size, segno_up_to)
|
||||
remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove all WAL segments in timeline_dir <= given segno.
|
||||
fn remove_up_to(timeline_dir: &Path, wal_seg_size: usize, segno_up_to: XLogSegNo) -> Result<()> {
|
||||
/// Remove all WAL segments in timeline_dir that match the given predicate.
|
||||
fn remove_segments_from_disk(
|
||||
timeline_dir: &Path,
|
||||
wal_seg_size: usize,
|
||||
remove_predicate: impl Fn(XLogSegNo) -> bool,
|
||||
) -> Result<()> {
|
||||
let mut n_removed = 0;
|
||||
let mut min_removed = u64::MAX;
|
||||
let mut max_removed = u64::MIN;
|
||||
|
||||
for entry in fs::read_dir(&timeline_dir)? {
|
||||
let entry = entry?;
|
||||
let entry_path = entry.path();
|
||||
@@ -504,19 +467,21 @@ fn remove_up_to(timeline_dir: &Path, wal_seg_size: usize, segno_up_to: XLogSegNo
|
||||
continue;
|
||||
}
|
||||
let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
|
||||
if segno <= segno_up_to {
|
||||
if remove_predicate(segno) {
|
||||
remove_file(entry_path)?;
|
||||
n_removed += 1;
|
||||
min_removed = min(min_removed, segno);
|
||||
max_removed = max(max_removed, segno);
|
||||
}
|
||||
}
|
||||
}
|
||||
let segno_from = segno_up_to - n_removed + 1;
|
||||
info!(
|
||||
"removed {} WAL segments [{}; {}]",
|
||||
n_removed,
|
||||
XLogFileName(PG_TLI, segno_from, wal_seg_size),
|
||||
XLogFileName(PG_TLI, segno_up_to, wal_seg_size)
|
||||
);
|
||||
|
||||
if n_removed > 0 {
|
||||
info!(
|
||||
"removed {} WAL segments [{}; {}]",
|
||||
n_removed, min_removed, max_removed
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -526,8 +491,10 @@ pub struct WalReader {
|
||||
pos: Lsn,
|
||||
wal_segment: Option<Pin<Box<dyn AsyncRead>>>,
|
||||
|
||||
enable_remote_read: bool,
|
||||
// S3 will be used to read WAL if LSN is not available locally
|
||||
enable_remote_read: bool,
|
||||
|
||||
// We don't have WAL locally if LSN is less than local_start_lsn
|
||||
local_start_lsn: Lsn,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user