diff --git a/Cargo.lock b/Cargo.lock index ca169dc0c8..2f4a57b698 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2723,6 +2723,7 @@ dependencies = [ "hyper", "metrics", "once_cell", + "parking_lot 0.12.1", "postgres", "postgres-protocol", "postgres_ffi", @@ -2733,6 +2734,7 @@ dependencies = [ "serde_with", "signal-hook", "tempfile", + "thiserror", "tokio", "tokio-postgres", "toml_edit", diff --git a/libs/utils/src/postgres_backend.rs b/libs/utils/src/postgres_backend.rs index 0498e0887b..adee46c2dd 100644 --- a/libs/utils/src/postgres_backend.rs +++ b/libs/utils/src/postgres_backend.rs @@ -429,8 +429,22 @@ impl PostgresBackend { // full cause of the error, not just the top-level context + its trace. // We don't want to send that in the ErrorResponse though, // because it's not relevant to the compute node logs. - error!("query handler for '{}' failed: {:?}", query_string, e); - self.write_message_noflush(&BeMessage::ErrorResponse(&e.to_string()))?; + // + // We also don't want to log full stacktrace when the error is primitive, + // such as usual connection closed. + let short_error = format!("{:#}", e); + let root_cause = e.root_cause().to_string(); + if root_cause.contains("connection closed unexpectedly") + || root_cause.contains("Broken pipe (os error 32)") + { + error!( + "query handler for '{}' failed: {}", + query_string, short_error + ); + } else { + error!("query handler for '{}' failed: {:?}", query_string, e); + } + self.write_message_noflush(&BeMessage::ErrorResponse(&short_error))?; // TODO: untangle convoluted control flow if e.to_string().contains("failed to run") { return Ok(ProcessMsgResult::Break); diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 959942aa12..acd37161a0 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -105,7 +105,7 @@ fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds } /// A newtype to store arbitrary data grouped by tenant and timeline ids. -/// One could use [`utils::zid::TenantTimelineId`] for grouping, but that would +/// One could use [`utils::id::TenantTimelineId`] for grouping, but that would /// not include the cases where a certain tenant has zero timelines. /// This is sometimes important: a tenant could be registered during initial load from FS, /// even if he has no timelines on disk. diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index cae095c3c2..87ee63d1df 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -30,6 +30,8 @@ git-version = "0.3.5" async-trait = "0.1" once_cell = "1.13.0" toml_edit = { version = "0.13", features = ["easy"] } +thiserror = "1" +parking_lot = "0.12.1" postgres_ffi = { path = "../libs/postgres_ffi" } metrics = { path = "../libs/metrics" } diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index d518ac01cc..7726f25a2d 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -24,9 +24,9 @@ use safekeeper::defaults::{ }; use safekeeper::http; use safekeeper::remove_wal; -use safekeeper::timeline::GlobalTimelines; use safekeeper::wal_backup; use safekeeper::wal_service; +use safekeeper::GlobalTimelines; use safekeeper::SafeKeeperConf; use utils::auth::JwtAuth; use utils::{ @@ -298,7 +298,9 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo let signals = signals::install_shutdown_handlers()?; let mut threads = vec![]; let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100); - GlobalTimelines::init(wal_backup_launcher_tx); + + // Load all timelines from disk to memory. + GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?; let conf_ = conf.clone(); threads.push( diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index f276fad613..6a2456ecda 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -10,6 +10,7 @@ use etcd_broker::LeaseKeeper; use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::collections::HashSet; use std::time::Duration; use tokio::spawn; use tokio::task::JoinHandle; @@ -17,7 +18,8 @@ use tokio::{runtime, time::sleep}; use tracing::*; use url::Url; -use crate::{timeline::GlobalTimelines, SafeKeeperConf}; +use crate::GlobalTimelines; +use crate::SafeKeeperConf; use etcd_broker::{ subscription_key::{OperationKind, SkOperationKind, SubscriptionKey}, Client, PutOptions, @@ -45,12 +47,12 @@ pub fn thread_main(conf: SafeKeeperConf) { /// Key to per timeline per safekeeper data. fn timeline_safekeeper_path( broker_etcd_prefix: String, - zttid: TenantTimelineId, + ttid: TenantTimelineId, sk_id: NodeId, ) -> String { format!( "{}/{sk_id}", - SubscriptionKey::sk_timeline_info(broker_etcd_prefix, zttid).watch_key() + SubscriptionKey::sk_timeline_info(broker_etcd_prefix, ttid).watch_key() ) } @@ -162,7 +164,7 @@ pub fn get_candiate_name(system_id: NodeId) -> String { } async fn push_sk_info( - zttid: TenantTimelineId, + ttid: TenantTimelineId, mut client: Client, key: String, sk_info: SkTimelineInfo, @@ -190,7 +192,7 @@ async fn push_sk_info( .await .context("failed to receive LeaseKeepAliveResponse")?; - Ok((zttid, lease)) + Ok((ttid, lease)) } struct Lease { @@ -210,11 +212,15 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { // is under plain mutex. That's ok, all this code is not performance // sensitive and there is no risk of deadlock as we don't await while // lock is held. - let active_tlis = GlobalTimelines::get_active_timelines(); + let mut active_tlis = GlobalTimelines::get_all(); + active_tlis.retain(|tli| tli.is_active()); + + let active_tlis_set: HashSet = + active_tlis.iter().map(|tli| tli.ttid).collect(); // // Get and maintain (if not yet) per timeline lease to automatically delete obsolete data. - for zttid in active_tlis.iter() { - if let Entry::Vacant(v) = leases.entry(*zttid) { + for tli in &active_tlis { + if let Entry::Vacant(v) = leases.entry(tli.ttid) { let lease = client.lease_grant(LEASE_TTL_SEC, None).await?; let (keeper, ka_stream) = client.lease_keep_alive(lease.id()).await?; v.insert(Lease { @@ -224,30 +230,26 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { }); } } - leases.retain(|zttid, _| active_tlis.contains(zttid)); + leases.retain(|ttid, _| active_tlis_set.contains(ttid)); // Push data concurrently to not suffer from latency, with many timelines it can be slow. let handles = active_tlis .iter() - .filter_map(|zttid| GlobalTimelines::get_loaded(*zttid)) .map(|tli| { let sk_info = tli.get_public_info(&conf); - let key = timeline_safekeeper_path( - conf.broker_etcd_prefix.clone(), - tli.zttid, - conf.my_id, - ); - let lease = leases.remove(&tli.zttid).unwrap(); - tokio::spawn(push_sk_info(tli.zttid, client.clone(), key, sk_info, lease)) + let key = + timeline_safekeeper_path(conf.broker_etcd_prefix.clone(), tli.ttid, conf.my_id); + let lease = leases.remove(&tli.ttid).unwrap(); + tokio::spawn(push_sk_info(tli.ttid, client.clone(), key, sk_info, lease)) }) .collect::>(); for h in handles { - let (zttid, lease) = h.await??; + let (ttid, lease) = h.await??; // It is ugly to pull leases from hash and then put it back, but // otherwise we have to resort to long living per tli tasks (which // would generate a lot of errors when etcd is down) as task wants to // have 'static objects, we can't borrow to it. - leases.insert(zttid, lease); + leases.insert(ttid, lease); } sleep(push_interval).await; @@ -279,7 +281,7 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { match subscription.value_updates.recv().await { Some(new_info) => { // note: there are blocking operations below, but it's considered fine for now - if let Ok(tli) = GlobalTimelines::get(&conf, new_info.key.id, false) { + if let Ok(tli) = GlobalTimelines::get(new_info.key.id) { tli.record_safekeeper_info(&new_info.value, new_info.key.node_id) .await? } diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index ff23f0360f..22ed34cc00 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -9,8 +9,6 @@ use std::io::{Read, Write}; use std::ops::Deref; use std::path::{Path, PathBuf}; -use tracing::*; - use crate::control_file_upgrade::upgrade_control_file; use crate::safekeeper::{SafeKeeperState, SK_FORMAT_VERSION, SK_MAGIC}; use metrics::{register_histogram_vec, Histogram, HistogramVec, DISK_WRITE_SECONDS_BUCKETS}; @@ -55,12 +53,13 @@ pub struct FileStorage { } impl FileStorage { - pub fn restore_new(zttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result { - let timeline_dir = conf.timeline_dir(zttid); - let tenant_id = zttid.tenant_id.to_string(); - let timeline_id = zttid.timeline_id.to_string(); + /// Initialize storage by loading state from disk. + pub fn restore_new(ttid: &TenantTimelineId, conf: &SafeKeeperConf) -> Result { + let timeline_dir = conf.timeline_dir(ttid); + let tenant_id = ttid.tenant_id.to_string(); + let timeline_id = ttid.timeline_id.to_string(); - let state = Self::load_control_file_conf(conf, zttid)?; + let state = Self::load_control_file_conf(conf, ttid)?; Ok(FileStorage { timeline_dir, @@ -71,28 +70,28 @@ impl FileStorage { }) } + /// Create file storage for a new timeline, but don't persist it yet. pub fn create_new( - zttid: &TenantTimelineId, + ttid: &TenantTimelineId, conf: &SafeKeeperConf, state: SafeKeeperState, ) -> Result { - let timeline_dir = conf.timeline_dir(zttid); - let tenant_id = zttid.tenant_id.to_string(); - let timeline_id = zttid.timeline_id.to_string(); + let timeline_dir = conf.timeline_dir(ttid); + let tenant_id = ttid.tenant_id.to_string(); + let timeline_id = ttid.timeline_id.to_string(); - let mut store = FileStorage { + let store = FileStorage { timeline_dir, conf: conf.clone(), persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS .with_label_values(&[&tenant_id, &timeline_id]), - state: state.clone(), + state, }; - store.persist(&state)?; Ok(store) } - // Check the magic/version in the on-disk data and deserialize it, if possible. + /// Check the magic/version in the on-disk data and deserialize it, if possible. fn deser_sk_state(buf: &mut &[u8]) -> Result { // Read the version independent part let magic = buf.read_u32::()?; @@ -112,23 +111,17 @@ impl FileStorage { upgrade_control_file(buf, version) } - // Load control file for given zttid at path specified by conf. + /// Load control file for given ttid at path specified by conf. pub fn load_control_file_conf( conf: &SafeKeeperConf, - zttid: &TenantTimelineId, + ttid: &TenantTimelineId, ) -> Result { - let path = conf.timeline_dir(zttid).join(CONTROL_FILE_NAME); + let path = conf.timeline_dir(ttid).join(CONTROL_FILE_NAME); Self::load_control_file(path) } /// Read in the control file. - /// If create=false and file doesn't exist, bails out. pub fn load_control_file>(control_file_path: P) -> Result { - info!( - "loading control file {}", - control_file_path.as_ref().display(), - ); - let mut control_file = OpenOptions::new() .read(true) .write(true) @@ -179,8 +172,8 @@ impl Deref for FileStorage { } impl Storage for FileStorage { - // persists state durably to underlying storage - // for description see https://lwn.net/Articles/457667/ + /// persists state durably to underlying storage + /// for description see https://lwn.net/Articles/457667/ fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { let _timer = &self.persist_control_file_seconds.start_timer(); @@ -264,57 +257,57 @@ mod test { fn load_from_control_file( conf: &SafeKeeperConf, - zttid: &TenantTimelineId, + ttid: &TenantTimelineId, ) -> Result<(FileStorage, SafeKeeperState)> { - fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir"); + fs::create_dir_all(&conf.timeline_dir(ttid)).expect("failed to create timeline dir"); Ok(( - FileStorage::restore_new(zttid, conf)?, - FileStorage::load_control_file_conf(conf, zttid)?, + FileStorage::restore_new(ttid, conf)?, + FileStorage::load_control_file_conf(conf, ttid)?, )) } fn create( conf: &SafeKeeperConf, - zttid: &TenantTimelineId, + ttid: &TenantTimelineId, ) -> Result<(FileStorage, SafeKeeperState)> { - fs::create_dir_all(&conf.timeline_dir(zttid)).expect("failed to create timeline dir"); + fs::create_dir_all(&conf.timeline_dir(ttid)).expect("failed to create timeline dir"); let state = SafeKeeperState::empty(); - let storage = FileStorage::create_new(zttid, conf, state.clone())?; + let storage = FileStorage::create_new(ttid, conf, state.clone())?; Ok((storage, state)) } #[test] fn test_read_write_safekeeper_state() { let conf = stub_conf(); - let zttid = TenantTimelineId::generate(); + let ttid = TenantTimelineId::generate(); { - let (mut storage, mut state) = create(&conf, &zttid).expect("failed to create state"); + let (mut storage, mut state) = create(&conf, &ttid).expect("failed to create state"); // change something state.commit_lsn = Lsn(42); storage.persist(&state).expect("failed to persist state"); } - let (_, state) = load_from_control_file(&conf, &zttid).expect("failed to read state"); + let (_, state) = load_from_control_file(&conf, &ttid).expect("failed to read state"); assert_eq!(state.commit_lsn, Lsn(42)); } #[test] fn test_safekeeper_state_checksum_mismatch() { let conf = stub_conf(); - let zttid = TenantTimelineId::generate(); + let ttid = TenantTimelineId::generate(); { - let (mut storage, mut state) = create(&conf, &zttid).expect("failed to read state"); + let (mut storage, mut state) = create(&conf, &ttid).expect("failed to read state"); // change something state.commit_lsn = Lsn(42); storage.persist(&state).expect("failed to persist state"); } - let control_path = conf.timeline_dir(&zttid).join(CONTROL_FILE_NAME); + let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME); let mut data = fs::read(&control_path).unwrap(); data[0] += 1; // change the first byte of the file to fail checksum validation fs::write(&control_path, &data).expect("failed to write control file"); - match load_from_control_file(&conf, &zttid) { + match load_from_control_file(&conf, &ttid) { Err(err) => assert!(err .to_string() .contains("safekeeper control file checksum mismatch")), diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index ad2c0ec8bf..ca887399e1 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -3,15 +3,15 @@ use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage}; use crate::receive_wal::ReceiveWalConn; -use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage}; + use crate::send_wal::ReplicationConn; -use crate::timeline::{Timeline, TimelineTools}; -use crate::SafeKeeperConf; + +use crate::{GlobalTimelines, SafeKeeperConf}; use anyhow::{bail, Context, Result}; use postgres_ffi::PG_TLI; use regex::Regex; -use std::sync::Arc; + use tracing::info; use utils::{ id::{TenantId, TenantTimelineId, TimelineId}, @@ -27,7 +27,7 @@ pub struct SafekeeperPostgresHandler { pub appname: Option, pub tenant_id: Option, pub timeline_id: Option, - pub timeline: Option>, + pub ttid: TenantTimelineId, } /// Parsed Postgres command. @@ -101,30 +101,21 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { query_string, self.timeline_id ); - let create = !(matches!(cmd, SafekeeperPostgresCommand::StartReplication { .. }) - || matches!(cmd, SafekeeperPostgresCommand::IdentifySystem)); - - let tenant_id = self.tenant_id.context("tenant_id is required")?; - let timeline_id = self.timeline_id.context("timeline_id is required")?; - if self.timeline.is_none() { - self.timeline.set( - &self.conf, - TenantTimelineId::new(tenant_id, timeline_id), - create, - )?; - } + let tenant_id = self.tenant_id.context("tenantid is required")?; + let timeline_id = self.timeline_id.context("timelineid is required")?; + self.ttid = TenantTimelineId::new(tenant_id, timeline_id); match cmd { - SafekeeperPostgresCommand::StartWalPush => ReceiveWalConn::new(pgb) - .run(self) - .context("failed to run ReceiveWalConn"), - SafekeeperPostgresCommand::StartReplication { start_lsn } => ReplicationConn::new(pgb) - .run(self, pgb, start_lsn) - .context("failed to run ReplicationConn"), + SafekeeperPostgresCommand::StartWalPush => ReceiveWalConn::new(pgb).run(self), + SafekeeperPostgresCommand::StartReplication { start_lsn } => { + ReplicationConn::new(pgb).run(self, pgb, start_lsn) + } SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb), SafekeeperPostgresCommand::JSONCtrl { ref cmd } => handle_json_ctrl(self, pgb, cmd), } - .context(format!("timeline {timeline_id}"))?; + .context(format!( + "Failed to process query for timeline {timeline_id}" + ))?; Ok(()) } @@ -137,42 +128,26 @@ impl SafekeeperPostgresHandler { appname: None, tenant_id: None, timeline_id: None, - timeline: None, + ttid: TenantTimelineId::empty(), } } - /// Shortcut for calling `process_msg` in the timeline. - pub fn process_safekeeper_msg( - &self, - msg: &ProposerAcceptorMessage, - ) -> Result> { - self.timeline - .get() - .process_msg(msg) - .context("failed to process ProposerAcceptorMessage") - } - /// /// Handle IDENTIFY_SYSTEM replication command /// fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> { + let tli = GlobalTimelines::get(self.ttid)?; + let lsn = if self.is_walproposer_recovery() { // walproposer should get all local WAL until flush_lsn - self.timeline.get().get_end_of_wal() + tli.get_flush_lsn() } else { // other clients shouldn't get any uncommitted WAL - self.timeline.get().get_state().0.commit_lsn + tli.get_state().0.commit_lsn } .to_string(); - let sysid = self - .timeline - .get() - .get_state() - .1 - .server - .system_id - .to_string(); + let sysid = tli.get_state().1.server.system_id.to_string(); let lsn_bytes = lsn.as_bytes(); let tli = PG_TLI.to_string(); let tli_bytes = tli.as_bytes(); diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 14c9414c09..244325368b 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -1,3 +1,4 @@ +use anyhow::anyhow; use hyper::{Body, Request, Response, StatusCode, Uri}; use once_cell::sync::Lazy; @@ -9,7 +10,9 @@ use std::sync::Arc; use crate::safekeeper::Term; use crate::safekeeper::TermHistory; -use crate::timeline::{GlobalTimelines, TimelineDeleteForceResult}; + +use crate::timelines_global_map::TimelineDeleteForceResult; +use crate::GlobalTimelines; use crate::SafeKeeperConf; use etcd_broker::subscription_value::SkTimelineInfo; use utils::{ @@ -90,15 +93,15 @@ struct TimelineStatus { /// Report info about timeline. async fn timeline_status_handler(request: Request) -> Result, ApiError> { - let zttid = TenantTimelineId::new( + let ttid = TenantTimelineId::new( parse_request_param(&request, "tenant_id")?, parse_request_param(&request, "timeline_id")?, ); - check_permission(&request, Some(zttid.tenant_id))?; + check_permission(&request, Some(ttid.tenant_id))?; - let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?; + let tli = GlobalTimelines::get(ttid)?; let (inmem, state) = tli.get_state(); - let flush_lsn = tli.get_end_of_wal(); + let flush_lsn = tli.get_flush_lsn(); let acc_state = AcceptorStateStatus { term: state.acceptor_state.term, @@ -108,8 +111,8 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result) -> Result, ApiError> { let request_data: TimelineCreateRequest = json_request(&mut request).await?; - let zttid = TenantTimelineId { + let ttid = TenantTimelineId { tenant_id: parse_request_param(&request, "tenant_id")?, timeline_id: request_data.timeline_id, }; - check_permission(&request, Some(zttid.tenant_id))?; - GlobalTimelines::create(get_conf(&request), zttid, request_data.peer_ids) - .map_err(ApiError::from_err)?; + check_permission(&request, Some(ttid.tenant_id))?; - json_response(StatusCode::CREATED, ()) + Err(ApiError::from_err(anyhow!("not implemented"))) } /// Deactivates the timeline and removes its data directory. -/// -/// It does not try to stop any processing of the timeline; there is no such code at the time of writing. -/// However, it tries to check whether the timeline was active and report it to caller just in case. -/// Note that this information is inaccurate: -/// 1. There is a race condition between checking the timeline for activity and actual directory deletion. -/// 2. At the time of writing Safekeeper rarely marks a timeline inactive. E.g. disconnecting the compute node does nothing. async fn timeline_delete_force_handler( mut request: Request, ) -> Result, ApiError> { - let zttid = TenantTimelineId::new( + let ttid = TenantTimelineId::new( parse_request_param(&request, "tenant_id")?, parse_request_param(&request, "timeline_id")?, ); - check_permission(&request, Some(zttid.tenant_id))?; + check_permission(&request, Some(ttid.tenant_id))?; ensure_no_body(&mut request).await?; - json_response( - StatusCode::OK, - GlobalTimelines::delete_force(get_conf(&request), &zttid) - .await - .map_err(ApiError::from_err)?, - ) + let resp = tokio::task::spawn_blocking(move || GlobalTimelines::delete_force(&ttid)) + .await + .map_err(ApiError::from_err)??; + json_response(StatusCode::OK, resp) } /// Deactivates all timelines for the tenant and removes its data directory. @@ -168,27 +161,30 @@ async fn tenant_delete_force_handler( let tenant_id = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; ensure_no_body(&mut request).await?; + let delete_info = tokio::task::spawn_blocking(move || { + GlobalTimelines::delete_force_all_for_tenant(&tenant_id) + }) + .await + .map_err(ApiError::from_err)??; json_response( StatusCode::OK, - GlobalTimelines::delete_force_all_for_tenant(get_conf(&request), &tenant_id) - .await - .map_err(ApiError::from_err)? + delete_info .iter() - .map(|(zttid, resp)| (format!("{}", zttid.timeline_id), *resp)) + .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp)) .collect::>(), ) } /// Used only in tests to hand craft required data. async fn record_safekeeper_info(mut request: Request) -> Result, ApiError> { - let zttid = TenantTimelineId::new( + let ttid = TenantTimelineId::new( parse_request_param(&request, "tenant_id")?, parse_request_param(&request, "timeline_id")?, ); - check_permission(&request, Some(zttid.tenant_id))?; + check_permission(&request, Some(ttid.tenant_id))?; let safekeeper_info: SkTimelineInfo = json_request(&mut request).await?; - let tli = GlobalTimelines::get(get_conf(&request), zttid, false).map_err(ApiError::from_err)?; + let tli = GlobalTimelines::get(ttid)?; tli.record_safekeeper_info(&safekeeper_info, NodeId(1)) .await?; diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 00fc43521b..2456eb0752 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -6,18 +6,22 @@ //! modifications in tests. //! +use std::sync::Arc; + use anyhow::Result; use bytes::Bytes; use serde::{Deserialize, Serialize}; use tracing::*; +use utils::id::TenantTimelineId; use crate::handler::SafekeeperPostgresHandler; -use crate::safekeeper::{AcceptorProposerMessage, AppendResponse}; +use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo}; use crate::safekeeper::{ - AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, ProposerGreeting, + AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerElected, }; use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry}; -use crate::timeline::TimelineTools; +use crate::timeline::Timeline; +use crate::GlobalTimelines; use postgres_ffi::v14::xlog_utils; use postgres_ffi::WAL_SEGMENT_SIZE; use utils::{ @@ -57,23 +61,23 @@ struct AppendResult { /// content, and then append it with specified term and lsn. This /// function is used to test safekeepers in different scenarios. pub fn handle_json_ctrl( - spg: &mut SafekeeperPostgresHandler, + spg: &SafekeeperPostgresHandler, pgb: &mut PostgresBackend, append_request: &AppendLogicalMessage, ) -> Result<()> { info!("JSON_CTRL request: {:?}", append_request); // need to init safekeeper state before AppendRequest - prepare_safekeeper(spg)?; + let tli = prepare_safekeeper(spg.ttid)?; // if send_proposer_elected is true, we need to update local history if append_request.send_proposer_elected { - send_proposer_elected(spg, append_request.term, append_request.epoch_start_lsn)?; + send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn)?; } - let inserted_wal = append_logical_message(spg, append_request)?; + let inserted_wal = append_logical_message(&tli, append_request)?; let response = AppendResult { - state: spg.timeline.get().get_state().1, + state: tli.get_state().1, inserted_wal, }; let response_data = serde_json::to_vec(&response)?; @@ -91,28 +95,20 @@ pub fn handle_json_ctrl( /// Prepare safekeeper to process append requests without crashes, /// by sending ProposerGreeting with default server.wal_seg_size. -fn prepare_safekeeper(spg: &mut SafekeeperPostgresHandler) -> Result<()> { - let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting { - protocol_version: 2, // current protocol - pg_version: 0, // unknown - proposer_id: [0u8; 16], - system_id: 0, - timeline_id: spg.timeline_id.unwrap(), - tenant_id: spg.tenant_id.unwrap(), - tli: 0, - wal_seg_size: WAL_SEGMENT_SIZE as u32, // 16MB, default for tests - }); - - let response = spg.timeline.get().process_msg(&greeting_request)?; - match response { - Some(AcceptorProposerMessage::Greeting(_)) => Ok(()), - _ => anyhow::bail!("not GreetingResponse"), - } +fn prepare_safekeeper(ttid: TenantTimelineId) -> Result> { + GlobalTimelines::create( + ttid, + ServerInfo { + pg_version: 0, // unknown + wal_seg_size: WAL_SEGMENT_SIZE as u32, + system_id: 0, + }, + ) } -fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: Lsn) -> Result<()> { +fn send_proposer_elected(tli: &Arc, term: Term, lsn: Lsn) -> Result<()> { // add new term to existing history - let history = spg.timeline.get().get_state().1.acceptor_state.term_history; + let history = tli.get_state().1.acceptor_state.term_history; let history = history.up_to(lsn.checked_sub(1u64).unwrap()); let mut history_entries = history.0; history_entries.push(TermSwitchEntry { term, lsn }); @@ -125,7 +121,7 @@ fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: L timeline_start_lsn: lsn, }); - spg.timeline.get().process_msg(&proposer_elected_request)?; + tli.process_msg(&proposer_elected_request)?; Ok(()) } @@ -138,12 +134,9 @@ struct InsertedWAL { /// Extend local WAL with new LogicalMessage record. To do that, /// create AppendRequest with new WAL and pass it to safekeeper. -fn append_logical_message( - spg: &mut SafekeeperPostgresHandler, - msg: &AppendLogicalMessage, -) -> Result { +fn append_logical_message(tli: &Arc, msg: &AppendLogicalMessage) -> Result { let wal_data = xlog_utils::encode_logical_message(&msg.lm_prefix, &msg.lm_message); - let sk_state = spg.timeline.get().get_state().1; + let sk_state = tli.get_state().1; let begin_lsn = msg.begin_lsn; let end_lsn = begin_lsn + wal_data.len() as u64; @@ -167,7 +160,7 @@ fn append_logical_message( wal_data: Bytes::from(wal_data), }); - let response = spg.timeline.get().process_msg(&append_request)?; + let response = tli.process_msg(&append_request)?; let append_response = match response { Some(AcceptorProposerMessage::AppendResponse(resp)) => resp, diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index b466d5aab5..58a237a5d3 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -23,6 +23,9 @@ pub mod wal_backup; pub mod wal_service; pub mod wal_storage; +mod timelines_global_map; +pub use timelines_global_map::GlobalTimelines; + pub mod defaults { use const_format::formatcp; use std::time::Duration; @@ -65,9 +68,9 @@ impl SafeKeeperConf { self.workdir.join(tenant_id.to_string()) } - pub fn timeline_dir(&self, zttid: &TenantTimelineId) -> PathBuf { - self.tenant_dir(&zttid.tenant_id) - .join(zttid.timeline_id.to_string()) + pub fn timeline_dir(&self, ttid: &TenantTimelineId) -> PathBuf { + self.tenant_dir(&ttid.tenant_id) + .join(ttid.timeline_id.to_string()) } } diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 3fa3916266..851a568aec 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -12,11 +12,12 @@ use utils::{id::TenantTimelineId, lsn::Lsn}; use crate::{ safekeeper::{SafeKeeperState, SafekeeperMemState}, - timeline::{GlobalTimelines, ReplicaState}, + timeline::ReplicaState, + GlobalTimelines, }; pub struct FullTimelineInfo { - pub zttid: TenantTimelineId, + pub ttid: TenantTimelineId, pub replicas: Vec, pub wal_backup_active: bool, pub timeline_is_active: bool, @@ -235,11 +236,17 @@ impl Collector for TimelineCollector { self.disk_usage.reset(); self.acceptor_term.reset(); - let timelines = GlobalTimelines::active_timelines_metrics(); + let timelines = GlobalTimelines::get_all(); - for tli in timelines { - let tenant_id = tli.zttid.tenant_id.to_string(); - let timeline_id = tli.zttid.timeline_id.to_string(); + for arc_tli in timelines { + let tli = arc_tli.info_for_metrics(); + if tli.is_none() { + continue; + } + let tli = tli.unwrap(); + + let tenant_id = tli.ttid.tenant_id.to_string(); + let timeline_id = tli.ttid.timeline_id.to_string(); let labels = &[tenant_id.as_str(), timeline_id.as_str()]; let mut most_advanced: Option = None; diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index b0b6a73621..e28caa2f19 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -7,7 +7,9 @@ use anyhow::{anyhow, bail, Result}; use bytes::BytesMut; use tracing::*; +use crate::safekeeper::ServerInfo; use crate::timeline::Timeline; +use crate::GlobalTimelines; use std::net::SocketAddr; use std::sync::mpsc::channel; @@ -20,7 +22,6 @@ use crate::safekeeper::AcceptorProposerMessage; use crate::safekeeper::ProposerAcceptorMessage; use crate::handler::SafekeeperPostgresHandler; -use crate::timeline::TimelineTools; use utils::{ postgres_backend::PostgresBackend, pq_proto::{BeMessage, FeMessage}, @@ -67,15 +68,21 @@ impl<'pg> ReceiveWalConn<'pg> { // Receive information about server let next_msg = poll_reader.recv_msg()?; - match next_msg { + let tli = match next_msg { ProposerAcceptorMessage::Greeting(ref greeting) => { info!( "start handshake with wal proposer {} sysid {} timeline {}", self.peer_addr, greeting.system_id, greeting.tli, ); + let server_info = ServerInfo { + pg_version: greeting.pg_version, + system_id: greeting.system_id, + wal_seg_size: greeting.wal_seg_size, + }; + GlobalTimelines::create(spg.ttid, server_info)? } _ => bail!("unexpected message {:?} instead of greeting", next_msg), - } + }; let mut next_msg = Some(next_msg); @@ -88,7 +95,7 @@ impl<'pg> ReceiveWalConn<'pg> { while let Some(ProposerAcceptorMessage::AppendRequest(append_request)) = next_msg { let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); - let reply = spg.process_safekeeper_msg(&msg)?; + let reply = tli.process_msg(&msg)?; if let Some(reply) = reply { self.write_msg(&reply)?; } @@ -97,13 +104,13 @@ impl<'pg> ReceiveWalConn<'pg> { } // flush all written WAL to the disk - let reply = spg.process_safekeeper_msg(&ProposerAcceptorMessage::FlushWAL)?; + let reply = tli.process_msg(&ProposerAcceptorMessage::FlushWAL)?; if let Some(reply) = reply { self.write_msg(&reply)?; } } else if let Some(msg) = next_msg.take() { // process other message - let reply = spg.process_safekeeper_msg(&msg)?; + let reply = tli.process_msg(&msg)?; if let Some(reply) = reply { self.write_msg(&reply)?; } @@ -112,9 +119,9 @@ impl<'pg> ReceiveWalConn<'pg> { // Register the connection and defer unregister. Do that only // after processing first message, as it sets wal_seg_size, // wanted by many. - spg.timeline.get().on_compute_connect()?; + tli.on_compute_connect()?; _guard = Some(ComputeConnectionGuard { - timeline: Arc::clone(spg.timeline.get()), + timeline: Arc::clone(&tli), }); first_time_through = false; } @@ -190,6 +197,8 @@ struct ComputeConnectionGuard { impl Drop for ComputeConnectionGuard { fn drop(&mut self) { - self.timeline.on_compute_disconnect().unwrap(); + if let Err(e) = self.timeline.on_compute_disconnect() { + error!("failed to unregister compute connection: {}", e); + } } } diff --git a/safekeeper/src/remove_wal.rs b/safekeeper/src/remove_wal.rs index 004c0243f9..b6d497f34e 100644 --- a/safekeeper/src/remove_wal.rs +++ b/safekeeper/src/remove_wal.rs @@ -4,20 +4,21 @@ use std::{thread, time::Duration}; use tracing::*; -use crate::{timeline::GlobalTimelines, SafeKeeperConf}; +use crate::{GlobalTimelines, SafeKeeperConf}; pub fn thread_main(conf: SafeKeeperConf) { let wal_removal_interval = Duration::from_millis(5000); loop { - let active_tlis = GlobalTimelines::get_active_timelines(); - for zttid in &active_tlis { - if let Ok(tli) = GlobalTimelines::get(&conf, *zttid, false) { - if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) { - warn!( - "failed to remove WAL for tenant {} timeline {}: {}", - tli.zttid.tenant_id, tli.zttid.timeline_id, e - ); - } + let tlis = GlobalTimelines::get_all(); + for tli in &tlis { + if !tli.is_active() { + continue; + } + let ttid = tli.ttid; + let _enter = + info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id).entered(); + if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) { + warn!("failed to remove WAL: {}", e); } } thread::sleep(wal_removal_interval) diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index fa045eed90..d34a77e02b 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -218,19 +218,19 @@ pub struct SafekeeperMemState { } impl SafeKeeperState { - pub fn new(zttid: &TenantTimelineId, peers: Vec) -> SafeKeeperState { + pub fn new( + ttid: &TenantTimelineId, + server_info: ServerInfo, + peers: Vec, + ) -> SafeKeeperState { SafeKeeperState { - tenant_id: zttid.tenant_id, - timeline_id: zttid.timeline_id, + tenant_id: ttid.tenant_id, + timeline_id: ttid.timeline_id, acceptor_state: AcceptorState { term: 0, term_history: TermHistory::empty(), }, - server: ServerInfo { - pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ - system_id: 0, /* Postgres system identifier */ - wal_seg_size: 0, - }, + server: server_info, proposer_uuid: [0; 16], timeline_start_lsn: Lsn(0), local_start_lsn: Lsn(0), @@ -244,7 +244,15 @@ impl SafeKeeperState { #[cfg(test)] pub fn empty() -> Self { - SafeKeeperState::new(&TenantTimelineId::empty(), vec![]) + SafeKeeperState::new( + &TenantTimelineId::empty(), + ServerInfo { + pg_version: UNKNOWN_SERVER_VERSION, /* Postgres server version */ + system_id: 0, /* Postgres system identifier */ + wal_seg_size: 0, + }, + vec![], + ) } } @@ -479,8 +487,12 @@ impl AcceptorProposerMessage { } } -/// SafeKeeper which consumes events (messages from compute) and provides -/// replies. +/// Safekeeper implements consensus to reliably persist WAL across nodes. +/// It controls all WAL disk writes and updates of control file. +/// +/// Currently safekeeper processes: +/// - messages from compute (proposers) and provides replies +/// - messages from broker peers pub struct SafeKeeper { /// Maximum commit_lsn between all nodes, can be ahead of local flush_lsn. /// Note: be careful to set only if we are sure our WAL (term history) matches @@ -503,20 +515,20 @@ where CTRL: control_file::Storage, WAL: wal_storage::Storage, { - // constructor - pub fn new( - timeline_id: TimelineId, - state: CTRL, - mut wal_store: WAL, - node_id: NodeId, - ) -> Result> { - if state.timeline_id != TimelineId::from([0u8; 16]) && timeline_id != state.timeline_id { - bail!("Calling SafeKeeper::new with inconsistent timeline_id ({}) and SafeKeeperState.server.timeline_id ({})", timeline_id, state.timeline_id); + /// Accepts a control file storage containing the safekeeper state. + /// State must be initialized, i.e. contain filled `tenant_id`, `timeline_id` + /// and `server` (`wal_seg_size` inside it) fields. + pub fn new(state: CTRL, wal_store: WAL, node_id: NodeId) -> Result> { + if state.tenant_id == TenantId::from([0u8; 16]) + || state.timeline_id == TimelineId::from([0u8; 16]) + { + bail!( + "Calling SafeKeeper::new with empty tenant_id ({}) or timeline_id ({})", + state.tenant_id, + state.timeline_id + ); } - // initialize wal_store, if state is already initialized - wal_store.init_storage(&state)?; - Ok(SafeKeeper { global_commit_lsn: state.commit_lsn, epoch_start_lsn: Lsn(0), @@ -574,7 +586,7 @@ where &mut self, msg: &ProposerGreeting, ) -> Result> { - /* Check protocol compatibility */ + // Check protocol compatibility if msg.protocol_version != SK_PROTOCOL_VERSION { bail!( "incompatible protocol version {}, expected {}", @@ -582,11 +594,11 @@ where SK_PROTOCOL_VERSION ); } - /* Postgres upgrade is not treated as fatal error */ + // Postgres upgrade is not treated as fatal error if msg.pg_version != self.state.server.pg_version && self.state.server.pg_version != UNKNOWN_SERVER_VERSION { - info!( + warn!( "incompatible server version {}, expected {}", msg.pg_version, self.state.server.pg_version ); @@ -605,17 +617,25 @@ where self.state.timeline_id ); } - - // set basic info about server, if not yet - // TODO: verify that is doesn't change after - { - let mut state = self.state.clone(); - state.server.system_id = msg.system_id; - state.server.wal_seg_size = msg.wal_seg_size; - self.state.persist(&state)?; + if self.state.server.wal_seg_size != msg.wal_seg_size { + bail!( + "invalid wal_seg_size, got {}, expected {}", + msg.wal_seg_size, + self.state.server.wal_seg_size + ); } - self.wal_store.init_storage(&self.state)?; + // system_id will be updated on mismatch + if self.state.server.system_id != msg.system_id { + warn!( + "unexpected system ID arrived, got {}, expected {}", + msg.system_id, self.state.server.system_id + ); + + let mut state = self.state.clone(); + state.server.system_id = msg.system_id; + self.state.persist(&state)?; + } info!( "processed greeting from proposer {:?}, sending term {:?}", @@ -665,16 +685,6 @@ where Ok(Some(AcceptorProposerMessage::VoteResponse(resp))) } - /// Bump our term if received a note from elected proposer with higher one - fn bump_if_higher(&mut self, term: Term) -> Result<()> { - if self.state.acceptor_state.term < term { - let mut state = self.state.clone(); - state.acceptor_state.term = term; - self.state.persist(&state)?; - } - Ok(()) - } - /// Form AppendResponse from current state. fn append_response(&self) -> AppendResponse { let ar = AppendResponse { @@ -691,7 +701,12 @@ where fn handle_elected(&mut self, msg: &ProposerElected) -> Result> { info!("received ProposerElected {:?}", msg); - self.bump_if_higher(msg.term)?; + if self.state.acceptor_state.term < msg.term { + let mut state = self.state.clone(); + state.acceptor_state.term = msg.term; + self.state.persist(&state)?; + } + // If our term is higher, ignore the message (next feedback will inform the compute) if self.state.acceptor_state.term > msg.term { return Ok(None); @@ -748,7 +763,7 @@ where } /// Advance commit_lsn taking into account what we have locally - pub fn update_commit_lsn(&mut self) -> Result<()> { + fn update_commit_lsn(&mut self) -> Result<()> { let commit_lsn = min(self.global_commit_lsn, self.flush_lsn()); assert!(commit_lsn >= self.inmem.commit_lsn); @@ -768,6 +783,11 @@ where Ok(()) } + /// Persist control file to disk, called only after timeline creation (bootstrap). + pub fn persist(&mut self) -> Result<()> { + self.persist_control_file(self.state.clone()) + } + /// Persist in-memory state to the disk, taking other data from state. fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> { state.commit_lsn = self.inmem.commit_lsn; @@ -918,6 +938,8 @@ where #[cfg(test)] mod tests { + use postgres_ffi::WAL_SEGMENT_SIZE; + use super::*; use crate::wal_storage::Storage; use std::ops::Deref; @@ -942,6 +964,14 @@ mod tests { } } + fn test_sk_state() -> SafeKeeperState { + let mut state = SafeKeeperState::empty(); + state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32; + state.tenant_id = TenantId::from([1u8; 16]); + state.timeline_id = TimelineId::from([1u8; 16]); + state + } + struct DummyWalStore { lsn: Lsn, } @@ -951,10 +981,6 @@ mod tests { self.lsn } - fn init_storage(&mut self, _state: &SafeKeeperState) -> Result<()> { - Ok(()) - } - fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { self.lsn = startpos + buf.len() as u64; Ok(()) @@ -977,12 +1003,10 @@ mod tests { #[test] fn test_voting() { let storage = InMemoryState { - persisted_state: SafeKeeperState::empty(), + persisted_state: test_sk_state(), }; let wal_store = DummyWalStore { lsn: Lsn(0) }; - let timeline_id = TimelineId::from([0u8; 16]); - - let mut sk = SafeKeeper::new(timeline_id, storage, wal_store, NodeId(0)).unwrap(); + let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap(); // check voting for 1 is ok let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); @@ -998,7 +1022,7 @@ mod tests { persisted_state: state, }; - sk = SafeKeeper::new(timeline_id, storage, sk.wal_store, NodeId(0)).unwrap(); + sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap(); // and ensure voting second time for 1 is not ok vote_resp = sk.process_msg(&vote_request); @@ -1011,12 +1035,11 @@ mod tests { #[test] fn test_epoch_switch() { let storage = InMemoryState { - persisted_state: SafeKeeperState::empty(), + persisted_state: test_sk_state(), }; let wal_store = DummyWalStore { lsn: Lsn(0) }; - let timeline_id = TimelineId::from([0u8; 16]); - let mut sk = SafeKeeper::new(timeline_id, storage, wal_store, NodeId(0)).unwrap(); + let mut sk = SafeKeeper::new(storage, wal_store, NodeId(0)).unwrap(); let mut ar_hdr = AppendRequestHeader { term: 1, diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 375b6eea18..5a38558e9c 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -2,8 +2,9 @@ //! with the "START_REPLICATION" message. use crate::handler::SafekeeperPostgresHandler; -use crate::timeline::{ReplicaState, Timeline, TimelineTools}; +use crate::timeline::{ReplicaState, Timeline}; use crate::wal_storage::WalReader; +use crate::GlobalTimelines; use anyhow::{bail, Context, Result}; use bytes::Bytes; @@ -167,8 +168,10 @@ impl ReplicationConn { ) -> Result<()> { let _enter = info_span!("WAL sender", timeline = %spg.timeline_id.unwrap()).entered(); + let tli = GlobalTimelines::get(spg.ttid)?; + // spawn the background thread which receives HotStandbyFeedback messages. - let bg_timeline = Arc::clone(spg.timeline.get()); + let bg_timeline = Arc::clone(&tli); let bg_stream_in = self.stream_in.take().unwrap(); let bg_timeline_id = spg.timeline_id.unwrap(); @@ -201,11 +204,8 @@ impl ReplicationConn { .build()?; runtime.block_on(async move { - let (inmem_state, persisted_state) = spg.timeline.get().get_state(); + let (inmem_state, persisted_state) = tli.get_state(); // add persisted_state.timeline_start_lsn == Lsn(0) check - if persisted_state.server.wal_seg_size == 0 { - bail!("Cannot start replication before connecting to walproposer"); - } // Walproposer gets special handling: safekeeper must give proposer all // local WAL till the end, whether committed or not (walproposer will @@ -217,7 +217,7 @@ impl ReplicationConn { // on this safekeeper itself. That's ok as (old) proposer will never be // able to commit such WAL. let stop_pos: Option = if spg.is_walproposer_recovery() { - let wal_end = spg.timeline.get().get_end_of_wal(); + let wal_end = tli.get_flush_lsn(); Some(wal_end) } else { None @@ -231,7 +231,7 @@ impl ReplicationConn { let mut end_pos = stop_pos.unwrap_or(inmem_state.commit_lsn); let mut wal_reader = WalReader::new( - spg.conf.timeline_dir(&spg.timeline.get().zttid), + spg.conf.timeline_dir(&tli.ttid), &persisted_state, start_pos, spg.conf.wal_backup_enabled, @@ -241,7 +241,7 @@ impl ReplicationConn { let mut send_buf = vec![0u8; MAX_SEND_SIZE]; // watcher for commit_lsn updates - let mut commit_lsn_watch_rx = spg.timeline.get().get_commit_lsn_watch_rx(); + let mut commit_lsn_watch_rx = tli.get_commit_lsn_watch_rx(); loop { if let Some(stop_pos) = stop_pos { @@ -258,7 +258,7 @@ impl ReplicationConn { } else { // TODO: also check once in a while whether we are walsender // to right pageserver. - if spg.timeline.get().stop_walsender(replica_id)? { + if tli.should_walsender_stop(replica_id) { // Shut down, timeline is suspended. // TODO create proper error type for this bail!("end streaming to {:?}", spg.appname); diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index cf317c41c3..4000815857 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -1,27 +1,25 @@ -//! This module contains timeline id -> safekeeper state map with file-backed -//! persistence and support for interaction between sending and receiving wal. +//! This module implements Timeline lifecycle management and has all neccessary code +//! to glue together SafeKeeper and all other background services. -use anyhow::{bail, Context, Result}; +use anyhow::{bail, Result}; use etcd_broker::subscription_value::SkTimelineInfo; -use once_cell::sync::Lazy; use postgres_ffi::XLogSegNo; -use serde::Serialize; use tokio::sync::watch; use std::cmp::{max, min}; -use std::collections::{HashMap, HashSet}; -use std::fs::{self}; -use std::sync::{Arc, Mutex, MutexGuard}; +use parking_lot::{Mutex, MutexGuard}; + +use std::path::PathBuf; use tokio::sync::mpsc::Sender; use tracing::*; use utils::{ - id::{NodeId, TenantId, TenantTimelineId}, + id::{NodeId, TenantTimelineId}, lsn::Lsn, pq_proto::ReplicationFeedback, }; @@ -29,7 +27,7 @@ use utils::{ use crate::control_file; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, - SafekeeperMemState, + SafekeeperMemState, ServerInfo, }; use crate::send_wal::HotStandbyFeedback; @@ -73,7 +71,7 @@ impl ReplicaState { } /// Shared state associated with database instance -struct SharedState { +pub struct SharedState { /// Safekeeper object sk: SafeKeeper, /// State of replicas @@ -95,17 +93,21 @@ struct SharedState { } impl SharedState { - /// Initialize timeline state, creating control file - fn create( + /// Initialize fresh timeline state without persisting anything to disk. + fn create_new( conf: &SafeKeeperConf, - zttid: &TenantTimelineId, - peer_ids: Vec, + ttid: &TenantTimelineId, + state: SafeKeeperState, ) -> Result { - let state = SafeKeeperState::new(zttid, peer_ids); - let control_store = control_file::FileStorage::create_new(zttid, conf, state)?; + if state.server.wal_seg_size == 0 { + bail!(TimelineError::UninitializedWalSegSize(*ttid)); + } - let wal_store = wal_storage::PhysicalStorage::new(zttid, conf); - let sk = SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?; + // We don't want to write anything to disk, because we may have existing timeline there. + // These functions should not change anything on disk. + let control_store = control_file::FileStorage::create_new(ttid, conf, state)?; + let wal_store = wal_storage::PhysicalStorage::new(ttid, conf, &control_store)?; + let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?; Ok(Self { sk, @@ -117,16 +119,17 @@ impl SharedState { }) } - /// Restore SharedState from control file. - /// If file doesn't exist, bails out. - fn restore(conf: &SafeKeeperConf, zttid: &TenantTimelineId) -> Result { - let control_store = control_file::FileStorage::restore_new(zttid, conf)?; - let wal_store = wal_storage::PhysicalStorage::new(zttid, conf); + /// Restore SharedState from control file. If file doesn't exist, bails out. + fn restore(conf: &SafeKeeperConf, ttid: &TenantTimelineId) -> Result { + let control_store = control_file::FileStorage::restore_new(ttid, conf)?; + if control_store.server.wal_seg_size == 0 { + bail!(TimelineError::UninitializedWalSegSize(*ttid)); + } - info!("timeline {} restored", zttid.timeline_id); + let wal_store = wal_storage::PhysicalStorage::new(ttid, conf, &control_store)?; Ok(Self { - sk: SafeKeeper::new(zttid.timeline_id, control_store, wal_store, conf.my_id)?, + sk: SafeKeeper::new(control_store, wal_store, conf.my_id)?, replicas: Vec::new(), wal_backup_active: false, active: false, @@ -134,6 +137,7 @@ impl SharedState { last_removed_segno: 0, }) } + fn is_active(&self) -> bool { self.is_wal_backup_required() // FIXME: add tracking of relevant pageservers and check them here individually, @@ -254,148 +258,289 @@ impl SharedState { } } -/// Database instance (tenant) +#[derive(Debug, thiserror::Error)] +pub enum TimelineError { + #[error("Timeline {0} was cancelled and cannot be used anymore")] + Cancelled(TenantTimelineId), + #[error("Timeline {0} was not found in global map")] + NotFound(TenantTimelineId), + #[error("Timeline {0} exists on disk, but wasn't loaded on startup")] + Invalid(TenantTimelineId), + #[error("Timeline {0} is already exists")] + AlreadyExists(TenantTimelineId), + #[error("Timeline {0} is not initialized, wal_seg_size is zero")] + UninitializedWalSegSize(TenantTimelineId), +} + +/// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline. +/// It also holds SharedState and provides mutually exclusive access to it. pub struct Timeline { - pub zttid: TenantTimelineId, + pub ttid: TenantTimelineId, + /// Sending here asks for wal backup launcher attention (start/stop - /// offloading). Sending zttid instead of concrete command allows to do + /// offloading). Sending ttid instead of concrete command allows to do /// sending without timeline lock. wal_backup_launcher_tx: Sender, + + /// Used to broadcast commit_lsn updates to all background jobs. commit_lsn_watch_tx: watch::Sender, - /// For breeding receivers. commit_lsn_watch_rx: watch::Receiver, + + /// Safekeeper and other state, that should remain consistent and synchronized + /// with the disk. mutex: Mutex, + + /// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal. + cancellation_tx: watch::Sender, + + /// Timeline should not be used after cancellation. Background tasks should + /// monitor this channel and stop eventually after receiving `true` from this channel. + cancellation_rx: watch::Receiver, + + /// Directory where timeline state is stored. + timeline_dir: PathBuf, } impl Timeline { - fn new( - zttid: TenantTimelineId, + /// Load existing timeline from disk. + pub fn load_timeline( + conf: SafeKeeperConf, + ttid: TenantTimelineId, wal_backup_launcher_tx: Sender, - shared_state: SharedState, - ) -> Timeline { + ) -> Result { + let shared_state = SharedState::restore(&conf, &ttid)?; let (commit_lsn_watch_tx, commit_lsn_watch_rx) = - watch::channel(shared_state.sk.inmem.commit_lsn); - Timeline { - zttid, + watch::channel(shared_state.sk.state.commit_lsn); + let (cancellation_tx, cancellation_rx) = watch::channel(false); + + Ok(Timeline { + ttid, wal_backup_launcher_tx, commit_lsn_watch_tx, commit_lsn_watch_rx, mutex: Mutex::new(shared_state), + cancellation_rx, + cancellation_tx, + timeline_dir: conf.timeline_dir(&ttid), + }) + } + + /// Create a new timeline, which is not yet persisted to disk. + pub fn create_empty( + conf: SafeKeeperConf, + ttid: TenantTimelineId, + wal_backup_launcher_tx: Sender, + server_info: ServerInfo, + ) -> Result { + let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID); + let (cancellation_tx, cancellation_rx) = watch::channel(false); + let state = SafeKeeperState::new(&ttid, server_info, vec![]); + + Ok(Timeline { + ttid, + wal_backup_launcher_tx, + commit_lsn_watch_tx, + commit_lsn_watch_rx, + mutex: Mutex::new(SharedState::create_new(&conf, &ttid, state)?), + cancellation_rx, + cancellation_tx, + timeline_dir: conf.timeline_dir(&ttid), + }) + } + + /// Initialize fresh timeline on disk and start background tasks. If bootstrap + /// fails, timeline is cancelled and cannot be used anymore. + /// + /// Bootstrap is transactional, so if it fails, created files will be deleted, + /// and state on disk should remain unchanged. + pub fn bootstrap(&self, shared_state: &mut MutexGuard) -> Result<()> { + match std::fs::metadata(&self.timeline_dir) { + Ok(_) => { + // Timeline directory exists on disk, we should leave state unchanged + // and return error. + bail!(TimelineError::Invalid(self.ttid)); + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => { + return Err(e.into()); + } } + + // Create timeline directory. + std::fs::create_dir_all(&self.timeline_dir)?; + + // Write timeline to disk and TODO: start background tasks. + match || -> Result<()> { + shared_state.sk.persist()?; + // TODO: add more initialization steps here + Ok(()) + }() { + Ok(_) => Ok(()), + Err(e) => { + // Bootstrap failed, cancel timeline and remove timeline directory. + self.cancel(); + + if let Err(fs_err) = std::fs::remove_dir_all(&self.timeline_dir) { + warn!( + "failed to remove timeline {} directory after bootstrap failure: {}", + self.ttid, fs_err + ); + } + + Err(e) + } + } + } + + /// Delete timeline from disk completely, by removing timeline directory. Background + /// timeline activities will stop eventually. + pub fn delete_from_disk( + &self, + shared_state: &mut MutexGuard, + ) -> Result<(bool, bool)> { + let was_active = shared_state.active; + self.cancel(); + let dir_existed = delete_dir(&self.timeline_dir)?; + Ok((dir_existed, was_active)) + } + + /// Cancel timeline to prevent further usage. Background tasks will stop + /// eventually after receiving cancellation signal. + fn cancel(&self) { + info!("Timeline {} is cancelled", self.ttid); + let _ = self.cancellation_tx.send(true); + let res = self.wal_backup_launcher_tx.blocking_send(self.ttid); + if let Err(e) = res { + error!("Failed to send stop signal to wal_backup_launcher: {}", e); + } + } + + /// Returns if timeline is cancelled. + pub fn is_cancelled(&self) -> bool { + *self.cancellation_rx.borrow() + } + + /// Take a writing mutual exclusive lock on timeline shared_state. + pub fn write_shared_state(&self) -> MutexGuard { + self.mutex.lock() } /// Register compute connection, starting timeline-related activity if it is /// not running yet. - /// Can fail only if channel to a static thread got closed, which is not normal at all. pub fn on_compute_connect(&self) -> Result<()> { + if self.is_cancelled() { + bail!(TimelineError::Cancelled(self.ttid)); + } + let is_wal_backup_action_pending: bool; { - let mut shared_state = self.mutex.lock().unwrap(); + let mut shared_state = self.write_shared_state(); shared_state.num_computes += 1; - is_wal_backup_action_pending = shared_state.update_status(self.zttid); + is_wal_backup_action_pending = shared_state.update_status(self.ttid); } // Wake up wal backup launcher, if offloading not started yet. if is_wal_backup_action_pending { - self.wal_backup_launcher_tx.blocking_send(self.zttid)?; + // Can fail only if channel to a static thread got closed, which is not normal at all. + self.wal_backup_launcher_tx.blocking_send(self.ttid)?; } Ok(()) } /// De-register compute connection, shutting down timeline activity if /// pageserver doesn't need catchup. - /// Can fail only if channel to a static thread got closed, which is not normal at all. pub fn on_compute_disconnect(&self) -> Result<()> { + if self.is_cancelled() { + bail!(TimelineError::Cancelled(self.ttid)); + } + let is_wal_backup_action_pending: bool; { - let mut shared_state = self.mutex.lock().unwrap(); + let mut shared_state = self.write_shared_state(); shared_state.num_computes -= 1; - is_wal_backup_action_pending = shared_state.update_status(self.zttid); + is_wal_backup_action_pending = shared_state.update_status(self.ttid); } // Wake up wal backup launcher, if it is time to stop the offloading. if is_wal_backup_action_pending { - self.wal_backup_launcher_tx.blocking_send(self.zttid)?; + // Can fail only if channel to a static thread got closed, which is not normal at all. + self.wal_backup_launcher_tx.blocking_send(self.ttid)?; } Ok(()) } - /// Whether we still need this walsender running? + /// Returns true if walsender should stop sending WAL to pageserver. /// TODO: check this pageserver is actually interested in this timeline. - pub fn stop_walsender(&self, replica_id: usize) -> Result { - let mut shared_state = self.mutex.lock().unwrap(); + pub fn should_walsender_stop(&self, replica_id: usize) -> bool { + if self.is_cancelled() { + return true; + } + + let mut shared_state = self.write_shared_state(); if shared_state.num_computes == 0 { let replica_state = shared_state.replicas[replica_id].unwrap(); let stop = shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet (replica_state.remote_consistent_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet. replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn); if stop { - shared_state.update_status(self.zttid); - return Ok(true); + shared_state.update_status(self.ttid); + return true; } } - Ok(false) + false } /// Returns whether s3 offloading is required and sets current status as /// matching it. pub fn wal_backup_attend(&self) -> bool { - let mut shared_state = self.mutex.lock().unwrap(); - shared_state.wal_backup_attend() - } - - // Can this safekeeper offload to s3? Recently joined safekeepers might not - // have necessary WAL. - pub fn can_wal_backup(&self) -> bool { - self.mutex.lock().unwrap().can_wal_backup() - } - - /// Deactivates the timeline, assuming it is being deleted. - /// Returns whether the timeline was already active. - /// - /// We assume all threads will stop by themselves eventually (possibly with errors, but no panics). - /// There should be no compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but - /// we're deleting the timeline anyway. - pub async fn deactivate_for_delete(&self) -> Result { - let was_active: bool; - { - let shared_state = self.mutex.lock().unwrap(); - was_active = shared_state.active; + if self.is_cancelled() { + return false; } - self.wal_backup_launcher_tx.send(self.zttid).await?; - Ok(was_active) + + self.write_shared_state().wal_backup_attend() } - fn is_active(&self) -> bool { - let shared_state = self.mutex.lock().unwrap(); - shared_state.active + /// Can this safekeeper offload to s3? Recently joined safekeepers might not + /// have necessary WAL. + pub fn can_wal_backup(&self) -> bool { + if self.is_cancelled() { + return false; + } + + let shared_state = self.write_shared_state(); + shared_state.can_wal_backup() } - /// Returns full timeline info, required for the metrics. - /// If the timeline is not active, returns None instead. + /// Returns full timeline info, required for the metrics. If the timeline is + /// not active, returns None instead. pub fn info_for_metrics(&self) -> Option { - let shared_state = self.mutex.lock().unwrap(); - if !shared_state.active { + if self.is_cancelled() { return None; } - Some(FullTimelineInfo { - zttid: self.zttid, - replicas: shared_state - .replicas - .iter() - .filter_map(|r| r.as_ref()) - .copied() - .collect(), - wal_backup_active: shared_state.wal_backup_active, - timeline_is_active: shared_state.active, - num_computes: shared_state.num_computes, - last_removed_segno: shared_state.last_removed_segno, - epoch_start_lsn: shared_state.sk.epoch_start_lsn, - mem_state: shared_state.sk.inmem.clone(), - persisted_state: shared_state.sk.state.clone(), - flush_lsn: shared_state.sk.wal_store.flush_lsn(), - }) + let state = self.write_shared_state(); + if state.active { + Some(FullTimelineInfo { + ttid: self.ttid, + replicas: state + .replicas + .iter() + .filter_map(|r| r.as_ref()) + .copied() + .collect(), + wal_backup_active: state.wal_backup_active, + timeline_is_active: state.active, + num_computes: state.num_computes, + last_removed_segno: state.last_removed_segno, + epoch_start_lsn: state.sk.epoch_start_lsn, + mem_state: state.sk.inmem.clone(), + persisted_state: state.sk.state.clone(), + flush_lsn: state.sk.wal_store.flush_lsn(), + }) + } else { + None + } } + /// Returns commit_lsn watch channel. pub fn get_commit_lsn_watch_rx(&self) -> watch::Receiver { self.commit_lsn_watch_rx.clone() } @@ -405,10 +550,14 @@ impl Timeline { &self, msg: &ProposerAcceptorMessage, ) -> Result> { + if self.is_cancelled() { + bail!(TimelineError::Cancelled(self.ttid)); + } + let mut rmsg: Option; let commit_lsn: Lsn; { - let mut shared_state = self.mutex.lock().unwrap(); + let mut shared_state = self.write_shared_state(); rmsg = shared_state.sk.process_msg(msg)?; // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn @@ -426,28 +575,46 @@ impl Timeline { Ok(rmsg) } + /// Returns wal_seg_size. pub fn get_wal_seg_size(&self) -> usize { - self.mutex.lock().unwrap().get_wal_seg_size() + self.write_shared_state().get_wal_seg_size() } + /// Returns true only if the timeline is loaded and active. + pub fn is_active(&self) -> bool { + if self.is_cancelled() { + return false; + } + + self.write_shared_state().active + } + + /// Returns state of the timeline. pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) { - let shared_state = self.mutex.lock().unwrap(); - (shared_state.sk.inmem.clone(), shared_state.sk.state.clone()) + let state = self.write_shared_state(); + (state.sk.inmem.clone(), state.sk.state.clone()) } + /// Returns latest backup_lsn. pub fn get_wal_backup_lsn(&self) -> Lsn { - self.mutex.lock().unwrap().sk.inmem.backup_lsn + self.write_shared_state().sk.inmem.backup_lsn } - pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) { - self.mutex.lock().unwrap().sk.inmem.backup_lsn = backup_lsn; + /// Sets backup_lsn to the given value. + pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> { + if self.is_cancelled() { + bail!(TimelineError::Cancelled(self.ttid)); + } + + self.write_shared_state().sk.inmem.backup_lsn = backup_lsn; // we should check whether to shut down offloader, but this will be done // soon by peer communication anyway. + Ok(()) } - /// Prepare public safekeeper info for reporting. + /// Return public safekeeper info for broadcasting to broker and other peers. pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo { - let shared_state = self.mutex.lock().unwrap(); + let shared_state = self.write_shared_state(); SkTimelineInfo { last_log_term: Some(shared_state.sk.get_epoch()), flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()), @@ -473,54 +640,53 @@ impl Timeline { let is_wal_backup_action_pending: bool; let commit_lsn: Lsn; { - let mut shared_state = self.mutex.lock().unwrap(); - // WAL seg size not initialized yet (no message from compute ever - // received), can't do much without it. - if shared_state.get_wal_seg_size() == 0 { - return Ok(()); - } + let mut shared_state = self.write_shared_state(); shared_state.sk.record_safekeeper_info(sk_info)?; - is_wal_backup_action_pending = shared_state.update_status(self.zttid); + is_wal_backup_action_pending = shared_state.update_status(self.ttid); commit_lsn = shared_state.sk.inmem.commit_lsn; } self.commit_lsn_watch_tx.send(commit_lsn)?; // Wake up wal backup launcher, if it is time to stop the offloading. if is_wal_backup_action_pending { - self.wal_backup_launcher_tx.send(self.zttid).await?; + self.wal_backup_launcher_tx.send(self.ttid).await?; } Ok(()) } + /// Add send_wal replica to the in-memory vector of replicas. pub fn add_replica(&self, state: ReplicaState) -> usize { - let mut shared_state = self.mutex.lock().unwrap(); - shared_state.add_replica(state) + self.write_shared_state().add_replica(state) } + /// Update replication replica state. pub fn update_replica_state(&self, id: usize, state: ReplicaState) { - let mut shared_state = self.mutex.lock().unwrap(); + let mut shared_state = self.write_shared_state(); shared_state.replicas[id] = Some(state); } + /// Remove send_wal replica from the in-memory vector of replicas. pub fn remove_replica(&self, id: usize) { - let mut shared_state = self.mutex.lock().unwrap(); + let mut shared_state = self.write_shared_state(); assert!(shared_state.replicas[id].is_some()); shared_state.replicas[id] = None; } - pub fn get_end_of_wal(&self) -> Lsn { - let shared_state = self.mutex.lock().unwrap(); - shared_state.sk.wal_store.flush_lsn() + /// Returns flush_lsn. + pub fn get_flush_lsn(&self) -> Lsn { + self.write_shared_state().sk.wal_store.flush_lsn() } + /// Delete WAL segments from disk that are no longer needed. This is determined + /// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn. pub fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> { + if self.is_cancelled() { + bail!(TimelineError::Cancelled(self.ttid)); + } + let horizon_segno: XLogSegNo; let remover: Box Result<(), anyhow::Error>>; { - let shared_state = self.mutex.lock().unwrap(); - // WAL seg size not initialized yet, no WAL exists. - if shared_state.get_wal_seg_size() == 0 { - return Ok(()); - } + let shared_state = self.write_shared_state(); horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled); remover = shared_state.sk.wal_store.remove_up_to(); if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno { @@ -528,243 +694,22 @@ impl Timeline { } // release the lock before removing } - let _enter = - info_span!("", tenant = %self.zttid.tenant_id, timeline = %self.zttid.timeline_id) - .entered(); + + // delete old WAL files remover(horizon_segno - 1)?; - self.mutex.lock().unwrap().last_removed_segno = horizon_segno; + + // update last_removed_segno + let mut shared_state = self.write_shared_state(); + shared_state.last_removed_segno = horizon_segno; Ok(()) } } -// Utilities needed by various Connection-like objects -pub trait TimelineTools { - fn set(&mut self, conf: &SafeKeeperConf, zttid: TenantTimelineId, create: bool) -> Result<()>; - - fn get(&self) -> &Arc; -} - -impl TimelineTools for Option> { - fn set(&mut self, conf: &SafeKeeperConf, zttid: TenantTimelineId, create: bool) -> Result<()> { - *self = Some(GlobalTimelines::get(conf, zttid, create)?); - Ok(()) - } - - fn get(&self) -> &Arc { - self.as_ref().unwrap() - } -} - -struct GlobalTimelinesState { - timelines: HashMap>, - wal_backup_launcher_tx: Option>, -} - -static TIMELINES_STATE: Lazy> = Lazy::new(|| { - Mutex::new(GlobalTimelinesState { - timelines: HashMap::new(), - wal_backup_launcher_tx: None, - }) -}); - -#[derive(Clone, Copy, Serialize)] -pub struct TimelineDeleteForceResult { - pub dir_existed: bool, - pub was_active: bool, -} - -/// A zero-sized struct used to manage access to the global timelines map. -pub struct GlobalTimelines; - -impl GlobalTimelines { - pub fn init(wal_backup_launcher_tx: Sender) { - let mut state = TIMELINES_STATE.lock().unwrap(); - assert!(state.wal_backup_launcher_tx.is_none()); - state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx); - } - - fn create_internal( - mut state: MutexGuard, - conf: &SafeKeeperConf, - zttid: TenantTimelineId, - peer_ids: Vec, - ) -> Result> { - match state.timelines.get(&zttid) { - Some(_) => bail!("timeline {} already exists", zttid), - None => { - // TODO: check directory existence - let dir = conf.timeline_dir(&zttid); - fs::create_dir_all(dir)?; - - let shared_state = SharedState::create(conf, &zttid, peer_ids) - .context("failed to create shared state")?; - - let new_tli = Arc::new(Timeline::new( - zttid, - state.wal_backup_launcher_tx.as_ref().unwrap().clone(), - shared_state, - )); - state.timelines.insert(zttid, Arc::clone(&new_tli)); - Ok(new_tli) - } - } - } - - pub fn create( - conf: &SafeKeeperConf, - zttid: TenantTimelineId, - peer_ids: Vec, - ) -> Result> { - let state = TIMELINES_STATE.lock().unwrap(); - GlobalTimelines::create_internal(state, conf, zttid, peer_ids) - } - - /// Get a timeline with control file loaded from the global TIMELINES_STATE.timelines map. - /// If control file doesn't exist and create=false, bails out. - pub fn get( - conf: &SafeKeeperConf, - zttid: TenantTimelineId, - create: bool, - ) -> Result> { - let _enter = info_span!("", timeline = %zttid.timeline_id).entered(); - - let mut state = TIMELINES_STATE.lock().unwrap(); - - match state.timelines.get(&zttid) { - Some(result) => Ok(Arc::clone(result)), - None => { - let shared_state = SharedState::restore(conf, &zttid); - - let shared_state = match shared_state { - Ok(shared_state) => shared_state, - Err(error) => { - // TODO: always create timeline explicitly - if error - .root_cause() - .to_string() - .contains("No such file or directory") - && create - { - return GlobalTimelines::create_internal(state, conf, zttid, vec![]); - } else { - return Err(error); - } - } - }; - - let new_tli = Arc::new(Timeline::new( - zttid, - state.wal_backup_launcher_tx.as_ref().unwrap().clone(), - shared_state, - )); - state.timelines.insert(zttid, Arc::clone(&new_tli)); - Ok(new_tli) - } - } - } - - /// Get loaded timeline, if it exists. - pub fn get_loaded(zttid: TenantTimelineId) -> Option> { - let state = TIMELINES_STATE.lock().unwrap(); - state.timelines.get(&zttid).map(Arc::clone) - } - - pub fn get_active_timelines() -> HashSet { - let state = TIMELINES_STATE.lock().unwrap(); - state - .timelines - .iter() - .filter(|&(_, tli)| tli.is_active()) - .map(|(zttid, _)| *zttid) - .collect() - } - - /// Return FullTimelineInfo for all active timelines. - pub fn active_timelines_metrics() -> Vec { - let state = TIMELINES_STATE.lock().unwrap(); - state - .timelines - .iter() - .filter_map(|(_, tli)| tli.info_for_metrics()) - .collect() - } - - fn delete_force_internal( - conf: &SafeKeeperConf, - zttid: &TenantTimelineId, - was_active: bool, - ) -> Result { - match std::fs::remove_dir_all(conf.timeline_dir(zttid)) { - Ok(_) => Ok(TimelineDeleteForceResult { - dir_existed: true, - was_active, - }), - Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(TimelineDeleteForceResult { - dir_existed: false, - was_active, - }), - Err(e) => Err(e.into()), - } - } - - /// Deactivates and deletes the timeline, see `Timeline::deactivate_for_delete()`, the deletes - /// the corresponding data directory. - /// We assume all timeline threads do not care about `GlobalTimelines` not containing the timeline - /// anymore, and they will eventually terminate without panics. - /// - /// There are multiple ways the timeline may be accidentally "re-created" (so we end up with two - /// `Timeline` objects in memory): - /// a) a compute node connects after this method is called, or - /// b) an HTTP GET request about the timeline is made and it's able to restore the current state, or - /// c) an HTTP POST request for timeline creation is made after the timeline is already deleted. - /// TODO: ensure all of the above never happens. - pub async fn delete_force( - conf: &SafeKeeperConf, - zttid: &TenantTimelineId, - ) -> Result { - info!("deleting timeline {}", zttid); - let timeline = TIMELINES_STATE.lock().unwrap().timelines.remove(zttid); - let mut was_active = false; - if let Some(tli) = timeline { - was_active = tli.deactivate_for_delete().await?; - } - GlobalTimelines::delete_force_internal(conf, zttid, was_active) - } - - /// Deactivates and deletes all timelines for the tenant, see `delete()`. - /// Returns map of all timelines which the tenant had, `true` if a timeline was active. - /// There may be a race if new timelines are created simultaneously. - pub async fn delete_force_all_for_tenant( - conf: &SafeKeeperConf, - tenant_id: &TenantId, - ) -> Result> { - info!("deleting all timelines for tenant {}", tenant_id); - let mut to_delete = HashMap::new(); - { - // Keep mutex in this scope. - let timelines = &mut TIMELINES_STATE.lock().unwrap().timelines; - for (&zttid, tli) in timelines.iter() { - if zttid.tenant_id == *tenant_id { - to_delete.insert(zttid, tli.clone()); - } - } - // TODO: test that the correct subset of timelines is removed. It's complicated because they are implicitly created currently. - timelines.retain(|zttid, _| !to_delete.contains_key(zttid)); - } - let mut deleted = HashMap::new(); - for (zttid, timeline) in to_delete { - let was_active = timeline.deactivate_for_delete().await?; - deleted.insert( - zttid, - GlobalTimelines::delete_force_internal(conf, &zttid, was_active)?, - ); - } - // There may be inactive timelines, so delete the whole tenant dir as well. - match std::fs::remove_dir_all(conf.tenant_dir(tenant_id)) { - Ok(_) => (), - Err(e) if e.kind() == std::io::ErrorKind::NotFound => (), - e => e?, - }; - Ok(deleted) +/// Deletes directory and it's contents. Returns false if directory does not exist. +fn delete_dir(path: &PathBuf) -> Result { + match std::fs::remove_dir_all(path) { + Ok(_) => Ok(true), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false), + Err(e) => Err(e.into()), } } diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs new file mode 100644 index 0000000000..cf99a243d7 --- /dev/null +++ b/safekeeper/src/timelines_global_map.rs @@ -0,0 +1,348 @@ +//! This module contains global (tenant_id, timeline_id) -> Arc mapping. +//! All timelines should always be present in this map, this is done by loading them +//! all from the disk on startup and keeping them in memory. + +use crate::safekeeper::ServerInfo; +use crate::timeline::{Timeline, TimelineError}; +use crate::SafeKeeperConf; +use anyhow::{anyhow, bail, Context, Result}; +use once_cell::sync::Lazy; +use serde::Serialize; +use std::collections::HashMap; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::{Arc, Mutex, MutexGuard}; +use tokio::sync::mpsc::Sender; +use tracing::*; +use utils::id::{TenantId, TenantTimelineId, TimelineId}; + +struct GlobalTimelinesState { + timelines: HashMap>, + wal_backup_launcher_tx: Option>, + conf: SafeKeeperConf, +} + +impl GlobalTimelinesState { + /// Get dependencies for a timeline constructor. + fn get_dependencies(&self) -> (SafeKeeperConf, Sender) { + ( + self.conf.clone(), + self.wal_backup_launcher_tx.as_ref().unwrap().clone(), + ) + } + + /// Insert timeline into the map. Returns error if timeline with the same id already exists. + fn try_insert(&mut self, timeline: Arc) -> Result<()> { + let ttid = timeline.ttid; + if self.timelines.contains_key(&ttid) { + bail!(TimelineError::AlreadyExists(ttid)); + } + self.timelines.insert(ttid, timeline); + Ok(()) + } + + /// Get timeline from the map. Returns error if timeline doesn't exist. + fn get(&self, ttid: &TenantTimelineId) -> Result> { + self.timelines + .get(ttid) + .cloned() + .ok_or_else(|| anyhow!(TimelineError::NotFound(*ttid))) + } +} + +static TIMELINES_STATE: Lazy> = Lazy::new(|| { + Mutex::new(GlobalTimelinesState { + timelines: HashMap::new(), + wal_backup_launcher_tx: None, + conf: SafeKeeperConf::default(), + }) +}); + +/// A zero-sized struct used to manage access to the global timelines map. +pub struct GlobalTimelines; + +impl GlobalTimelines { + /// Inject dependencies needed for the timeline constructors and load all timelines to memory. + pub fn init( + conf: SafeKeeperConf, + wal_backup_launcher_tx: Sender, + ) -> Result<()> { + let mut state = TIMELINES_STATE.lock().unwrap(); + assert!(state.wal_backup_launcher_tx.is_none()); + state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx); + state.conf = conf; + + // Iterate through all directories and load tenants for all directories + // named as a valid tenant_id. + let mut tenant_count = 0; + let tenants_dir = state.conf.workdir.clone(); + for tenants_dir_entry in std::fs::read_dir(&tenants_dir) + .with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))? + { + match &tenants_dir_entry { + Ok(tenants_dir_entry) => { + if let Ok(tenant_id) = + TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or("")) + { + tenant_count += 1; + GlobalTimelines::load_tenant_timelines(&mut state, tenant_id)?; + } + } + Err(e) => error!( + "failed to list tenants dir entry {:?} in directory {}, reason: {:?}", + tenants_dir_entry, + tenants_dir.display(), + e + ), + } + } + + info!( + "found {} tenants directories, successfully loaded {} timelines", + tenant_count, + state.timelines.len() + ); + Ok(()) + } + + /// Loads all timelines for the given tenant to memory. Returns fs::read_dir errors if any. + fn load_tenant_timelines( + state: &mut MutexGuard, + tenant_id: TenantId, + ) -> Result<()> { + let timelines_dir = state.conf.tenant_dir(&tenant_id); + for timelines_dir_entry in std::fs::read_dir(&timelines_dir) + .with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))? + { + match &timelines_dir_entry { + Ok(timeline_dir_entry) => { + if let Ok(timeline_id) = + TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or("")) + { + let ttid = TenantTimelineId::new(tenant_id, timeline_id); + match Timeline::load_timeline( + state.conf.clone(), + ttid, + state.wal_backup_launcher_tx.as_ref().unwrap().clone(), + ) { + Ok(timeline) => { + state.timelines.insert(ttid, Arc::new(timeline)); + } + // If we can't load a timeline, it's most likely because of a corrupted + // directory. We will log an error and won't allow to delete/recreate + // this timeline. The only way to fix this timeline is to repair manually + // and restart the safekeeper. + Err(e) => error!( + "failed to load timeline {} for tenant {}, reason: {:?}", + timeline_id, tenant_id, e + ), + } + } + } + Err(e) => error!( + "failed to list timelines dir entry {:?} in directory {}, reason: {:?}", + timelines_dir_entry, + timelines_dir.display(), + e + ), + } + } + + Ok(()) + } + + /// Create a new timeline with the given id. If the timeline already exists, returns + /// an existing timeline. + pub fn create(ttid: TenantTimelineId, server_info: ServerInfo) -> Result> { + let (conf, wal_backup_launcher_tx) = { + let state = TIMELINES_STATE.lock().unwrap(); + if let Ok(timeline) = state.get(&ttid) { + // Timeline already exists, return it. + return Ok(timeline); + } + state.get_dependencies() + }; + + info!("creating new timeline {}", ttid); + + let timeline = Arc::new(Timeline::create_empty( + conf, + ttid, + wal_backup_launcher_tx, + server_info, + )?); + + // Take a lock and finish the initialization holding this mutex. No other threads + // can interfere with creation after we will insert timeline into the map. + let mut shared_state = timeline.write_shared_state(); + + // We can get a race condition here in case of concurrent create calls, but only + // in theory. create() will return valid timeline on the next try. + TIMELINES_STATE + .lock() + .unwrap() + .try_insert(timeline.clone())?; + + // Write the new timeline to the disk and start background workers. + // Bootstrap is transactional, so if it fails, the timeline will be deleted, + // and the state on disk should remain unchanged. + match timeline.bootstrap(&mut shared_state) { + Ok(_) => { + // We are done with bootstrap, release the lock, return the timeline. + drop(shared_state); + Ok(timeline) + } + Err(e) => { + // Note: the most likely reason for bootstrap failure is that the timeline + // directory already exists on disk. This happens when timeline is corrupted + // and wasn't loaded from disk on startup because of that. We want to preserve + // the timeline directory in this case, for further inspection. + + // TODO: this is an unusual error, perhaps we should send it to sentry + // TODO: compute will try to create timeline every second, we should add backoff + error!("failed to bootstrap timeline {}: {}", ttid, e); + + // Timeline failed to bootstrap, it cannot be used. Remove it from the map. + TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid); + Err(e) + } + } + } + + /// Get a timeline from the global map. If it's not present, it doesn't exist on disk, + /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid, + /// i.e. loaded in memory and not cancelled. + pub fn get(ttid: TenantTimelineId) -> Result> { + let res = TIMELINES_STATE.lock().unwrap().get(&ttid); + + match res { + Ok(tli) => { + if tli.is_cancelled() { + anyhow::bail!(TimelineError::Cancelled(ttid)); + } + Ok(tli) + } + Err(e) => Err(e), + } + } + + /// Returns all timelines. This is used for background timeline proccesses. + pub fn get_all() -> Vec> { + let global_lock = TIMELINES_STATE.lock().unwrap(); + global_lock + .timelines + .values() + .cloned() + .filter(|t| !t.is_cancelled()) + .collect() + } + + /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant, + /// and that's why it can return cancelled timelines, to retry deleting them. + fn get_all_for_tenant(tenant_id: TenantId) -> Vec> { + let global_lock = TIMELINES_STATE.lock().unwrap(); + global_lock + .timelines + .values() + .filter(|t| t.ttid.tenant_id == tenant_id) + .cloned() + .collect() + } + + /// Cancels timeline, then deletes the corresponding data directory. + pub fn delete_force(ttid: &TenantTimelineId) -> Result { + let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid); + match tli_res { + Ok(timeline) => { + // Take a lock and finish the deletion holding this mutex. + let mut shared_state = timeline.write_shared_state(); + + info!("deleting timeline {}", ttid); + let (dir_existed, was_active) = timeline.delete_from_disk(&mut shared_state)?; + + // Remove timeline from the map. + TIMELINES_STATE.lock().unwrap().timelines.remove(ttid); + + Ok(TimelineDeleteForceResult { + dir_existed, + was_active, + }) + } + Err(_) => { + // Timeline is not memory, but it may still exist on disk in broken state. + let dir_path = TIMELINES_STATE.lock().unwrap().conf.timeline_dir(ttid); + let dir_existed = delete_dir(dir_path)?; + + Ok(TimelineDeleteForceResult { + dir_existed, + was_active: false, + }) + } + } + } + + /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which + /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are + /// created simultaneously. In that case the function will return error and the caller should + /// retry tenant deletion again later. + pub fn delete_force_all_for_tenant( + tenant_id: &TenantId, + ) -> Result> { + info!("deleting all timelines for tenant {}", tenant_id); + let to_delete = Self::get_all_for_tenant(*tenant_id); + + let mut err = None; + + let mut deleted = HashMap::new(); + for tli in &to_delete { + match Self::delete_force(&tli.ttid) { + Ok(result) => { + deleted.insert(tli.ttid, result); + } + Err(e) => { + error!("failed to delete timeline {}: {}", tli.ttid, e); + // Save error to return later. + err = Some(e); + } + } + } + + // If there was an error, return it. + if let Some(e) = err { + return Err(e); + } + + // There may be broken timelines on disk, so delete the whole tenant dir as well. + // Note that we could concurrently create new timelines while we were deleting them, + // so the directory may be not empty. In this case timelines will have bad state + // and timeline background jobs can panic. + delete_dir(TIMELINES_STATE.lock().unwrap().conf.tenant_dir(tenant_id))?; + + let tlis_after_delete = Self::get_all_for_tenant(*tenant_id); + if !tlis_after_delete.is_empty() { + // Some timelines were created while we were deleting them, returning error + // to the caller, so it can retry later. + bail!( + "failed to delete all timelines for tenant {}: some timelines were created while we were deleting them", + tenant_id + ); + } + + Ok(deleted) + } +} + +#[derive(Clone, Copy, Serialize)] +pub struct TimelineDeleteForceResult { + pub dir_existed: bool, + pub was_active: bool, +} + +/// Deletes directory and it's contents. Returns false if directory does not exist. +fn delete_dir(path: PathBuf) -> Result { + match std::fs::remove_dir_all(path) { + Ok(_) => Ok(true), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false), + Err(e) => Err(e.into()), + } +} diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 85e967e218..0d5321fb3a 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -26,8 +26,8 @@ use tracing::*; use utils::{id::TenantTimelineId, lsn::Lsn}; use crate::broker::{Election, ElectionLeader}; -use crate::timeline::{GlobalTimelines, Timeline}; -use crate::{broker, SafeKeeperConf}; +use crate::timeline::Timeline; +use crate::{broker, GlobalTimelines, SafeKeeperConf}; use once_cell::sync::OnceCell; @@ -53,8 +53,10 @@ pub fn wal_backup_launcher_thread_main( /// Check whether wal backup is required for timeline. If yes, mark that launcher is /// aware of current status and return the timeline. -fn is_wal_backup_required(zttid: TenantTimelineId) -> Option> { - GlobalTimelines::get_loaded(zttid).filter(|t| t.wal_backup_attend()) +fn is_wal_backup_required(ttid: TenantTimelineId) -> Option> { + GlobalTimelines::get(ttid) + .ok() + .filter(|tli| tli.wal_backup_attend()) } struct WalBackupTaskHandle { @@ -70,20 +72,20 @@ struct WalBackupTimelineEntry { /// Start per timeline task, if it makes sense for this safekeeper to offload. fn consider_start_task( conf: &SafeKeeperConf, - zttid: TenantTimelineId, + ttid: TenantTimelineId, task: &mut WalBackupTimelineEntry, ) { if !task.timeline.can_wal_backup() { return; } - info!("starting WAL backup task for {}", zttid); + info!("starting WAL backup task for {}", ttid); // TODO: decide who should offload right here by simply checking current // state instead of running elections in offloading task. let election_name = SubscriptionKey { cluster_prefix: conf.broker_etcd_prefix.clone(), kind: SubscriptionKind::Operation( - zttid, + ttid, NodeKind::Safekeeper, OperationKind::Safekeeper(SkOperationKind::WalBackup), ), @@ -97,11 +99,11 @@ fn consider_start_task( ); let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - let timeline_dir = conf.timeline_dir(&zttid); + let timeline_dir = conf.timeline_dir(&ttid); let handle = tokio::spawn( - backup_task_main(zttid, timeline_dir, shutdown_rx, election) - .instrument(info_span!("WAL backup task", zttid = %zttid)), + backup_task_main(ttid, timeline_dir, shutdown_rx, election) + .instrument(info_span!("WAL backup task", ttid = %ttid)), ); task.handle = Some(WalBackupTaskHandle { @@ -140,33 +142,33 @@ async fn wal_backup_launcher_main_loop( let mut ticker = tokio::time::interval(Duration::from_millis(CHECK_TASKS_INTERVAL_MSEC)); loop { tokio::select! { - zttid = wal_backup_launcher_rx.recv() => { + ttid = wal_backup_launcher_rx.recv() => { // channel is never expected to get closed - let zttid = zttid.unwrap(); + let ttid = ttid.unwrap(); if conf.remote_storage.is_none() || !conf.wal_backup_enabled { continue; /* just drain the channel and do nothing */ } - let timeline = is_wal_backup_required(zttid); + let timeline = is_wal_backup_required(ttid); // do we need to do anything at all? - if timeline.is_some() != tasks.contains_key(&zttid) { + if timeline.is_some() != tasks.contains_key(&ttid) { if let Some(timeline) = timeline { // need to start the task - let entry = tasks.entry(zttid).or_insert(WalBackupTimelineEntry { + let entry = tasks.entry(ttid).or_insert(WalBackupTimelineEntry { timeline, handle: None, }); - consider_start_task(&conf, zttid, entry); + consider_start_task(&conf, ttid, entry); } else { // need to stop the task - info!("stopping WAL backup task for {}", zttid); + info!("stopping WAL backup task for {}", ttid); - let entry = tasks.remove(&zttid).unwrap(); + let entry = tasks.remove(&ttid).unwrap(); if let Some(wb_handle) = entry.handle { // Tell the task to shutdown. Error means task exited earlier, that's ok. let _ = wb_handle.shutdown_tx.send(()).await; // Await the task itself. TODO: restart panicked tasks earlier. if let Err(e) = wb_handle.handle.await { - warn!("WAL backup task for {} panicked: {}", zttid, e); + warn!("WAL backup task for {} panicked: {}", ttid, e); } } } @@ -174,8 +176,8 @@ async fn wal_backup_launcher_main_loop( } // Start known tasks, if needed and possible. _ = ticker.tick() => { - for (zttid, entry) in tasks.iter_mut().filter(|(_, entry)| entry.handle.is_none()) { - consider_start_task(&conf, *zttid, entry); + for (ttid, entry) in tasks.iter_mut().filter(|(_, entry)| entry.handle.is_none()) { + consider_start_task(&conf, *ttid, entry); } } } @@ -191,26 +193,26 @@ struct WalBackupTask { election: Election, } -/// Offload single timeline. +/// Offload single timeline. Called only after we checked that backup +/// is required (wal_backup_attend) and possible (can_wal_backup). async fn backup_task_main( - zttid: TenantTimelineId, + ttid: TenantTimelineId, timeline_dir: PathBuf, mut shutdown_rx: Receiver<()>, election: Election, ) { info!("started"); - let timeline: Arc = if let Some(tli) = GlobalTimelines::get_loaded(zttid) { - tli - } else { - /* Timeline could get deleted while task was starting, just exit then. */ - info!("no timeline, exiting"); + let res = GlobalTimelines::get(ttid); + if let Err(e) = res { + error!("backup error for timeline {}: {}", ttid, e); return; - }; + } + let tli = res.unwrap(); let mut wb = WalBackupTask { - wal_seg_size: timeline.get_wal_seg_size(), - commit_lsn_watch_rx: timeline.get_commit_lsn_watch_rx(), - timeline, + wal_seg_size: tli.get_wal_seg_size(), + commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(), + timeline: tli, timeline_dir, leader: None, election, @@ -322,7 +324,11 @@ impl WalBackupTask { { Ok(backup_lsn_result) => { backup_lsn = backup_lsn_result; - self.timeline.set_wal_backup_lsn(backup_lsn_result); + let res = self.timeline.set_wal_backup_lsn(backup_lsn_result); + if let Err(e) = res { + error!("backup error: {}", e); + return; + } retry_attempt = 0; } Err(e) => { diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 58b69f06e7..ea613dd0f1 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -7,7 +7,7 @@ //! //! Note that last file has `.partial` suffix, that's different from postgres. -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{bail, Context, Result}; use std::io::{self, Seek, SeekFrom}; use std::pin::Pin; use tokio::io::AsyncRead; @@ -17,7 +17,7 @@ use postgres_ffi::v14::xlog_utils::{ find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName, }; use postgres_ffi::{XLogSegNo, PG_TLI}; -use std::cmp::min; +use std::cmp::{max, min}; use std::fs::{self, remove_file, File, OpenOptions}; use std::io::Write; @@ -86,9 +86,9 @@ struct WalStorageMetrics { } impl WalStorageMetrics { - fn new(zttid: &TenantTimelineId) -> Self { - let tenant_id = zttid.tenant_id.to_string(); - let timeline_id = zttid.timeline_id.to_string(); + fn new(ttid: &TenantTimelineId) -> Self { + let tenant_id = ttid.tenant_id.to_string(); + let timeline_id = ttid.timeline_id.to_string(); Self { write_wal_bytes: WRITE_WAL_BYTES.with_label_values(&[&tenant_id, &timeline_id]), write_wal_seconds: WRITE_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]), @@ -101,9 +101,6 @@ pub trait Storage { /// LSN of last durably stored WAL record. fn flush_lsn(&self) -> Lsn; - /// Init storage with wal_seg_size and read WAL from disk to get latest LSN. - fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()>; - /// Write piece of WAL from buf to disk, but not necessarily sync it. fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>; @@ -119,7 +116,7 @@ pub trait Storage { } /// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes -/// for better performance. Storage must be initialized before use. +/// for better performance. Storage is initialized in the constructor. /// /// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in /// its filename and may be not fully flushed. @@ -127,16 +124,14 @@ pub trait Storage { /// Relationship of LSNs: /// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn` /// -/// When storage is just created, all LSNs are zeroes and there are no segments on disk. +/// When storage is created first time, all LSNs are zeroes and there are no segments on disk. pub struct PhysicalStorage { metrics: WalStorageMetrics, - zttid: TenantTimelineId, timeline_dir: PathBuf, conf: SafeKeeperConf, - // fields below are filled upon initialization - /// None if uninitialized, Some(usize) if storage is initialized. - wal_seg_size: Option, + /// Size of WAL segment in bytes. + wal_seg_size: usize, /// Written to disk, but possibly still in the cache and not fully persisted. /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record. @@ -161,25 +156,47 @@ pub struct PhysicalStorage { } impl PhysicalStorage { - pub fn new(zttid: &TenantTimelineId, conf: &SafeKeeperConf) -> PhysicalStorage { - let timeline_dir = conf.timeline_dir(zttid); - PhysicalStorage { - metrics: WalStorageMetrics::new(zttid), - zttid: *zttid, + /// Create new storage. If commit_lsn is not zero, flush_lsn is tried to be restored from + /// the disk. Otherwise, all LSNs are set to zero. + pub fn new( + ttid: &TenantTimelineId, + conf: &SafeKeeperConf, + state: &SafeKeeperState, + ) -> Result { + let timeline_dir = conf.timeline_dir(ttid); + let wal_seg_size = state.server.wal_seg_size as usize; + + // Find out where stored WAL ends, starting at commit_lsn which is a + // known recent record boundary (unless we don't have WAL at all). + let write_lsn = if state.commit_lsn == Lsn(0) { + Lsn(0) + } else { + find_end_of_wal(&timeline_dir, wal_seg_size, state.commit_lsn)? + }; + + // TODO: do we really know that write_lsn is fully flushed to disk? + // If not, maybe it's better to call fsync() here to be sure? + let flush_lsn = write_lsn; + + info!( + "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}", + ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn, + ); + if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn { + warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", ttid.timeline_id); + } + + Ok(PhysicalStorage { + metrics: WalStorageMetrics::new(ttid), timeline_dir, conf: conf.clone(), - wal_seg_size: None, - write_lsn: Lsn(0), - write_record_lsn: Lsn(0), - flush_record_lsn: Lsn(0), - decoder: WalStreamDecoder::new(Lsn(0)), + wal_seg_size, + write_lsn, + write_record_lsn: write_lsn, + flush_record_lsn: flush_lsn, + decoder: WalStreamDecoder::new(write_lsn), file: None, - } - } - - /// Wrapper for flush_lsn updates that also updates metrics. - fn update_flush_lsn(&mut self) { - self.flush_record_lsn = self.write_record_lsn; + }) } /// Call fdatasync if config requires so. @@ -204,9 +221,9 @@ impl PhysicalStorage { /// Open or create WAL segment file. Caller must call seek to the wanted position. /// Returns `file` and `is_partial`. - fn open_or_create(&self, segno: XLogSegNo, wal_seg_size: usize) -> Result<(File, bool)> { + fn open_or_create(&self, segno: XLogSegNo) -> Result<(File, bool)> { let (wal_file_path, wal_file_partial_path) = - wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?; + wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?; // Try to open already completed segment if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { @@ -222,24 +239,18 @@ impl PhysicalStorage { .open(&wal_file_partial_path) .with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?; - write_zeroes(&mut file, wal_seg_size)?; + write_zeroes(&mut file, self.wal_seg_size)?; self.fsync_file(&mut file)?; Ok((file, true)) } } /// Write WAL bytes, which are known to be located in a single WAL segment. - fn write_in_segment( - &mut self, - segno: u64, - xlogoff: usize, - buf: &[u8], - wal_seg_size: usize, - ) -> Result<()> { + fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> { let mut file = if let Some(file) = self.file.take() { file } else { - let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?; + let (mut file, is_partial) = self.open_or_create(segno)?; assert!(is_partial, "unexpected write into non-partial segment file"); file.seek(SeekFrom::Start(xlogoff as u64))?; file @@ -247,13 +258,13 @@ impl PhysicalStorage { file.write_all(buf)?; - if xlogoff + buf.len() == wal_seg_size { + if xlogoff + buf.len() == self.wal_seg_size { // If we reached the end of a WAL segment, flush and close it. self.fdatasync_file(&mut file)?; // Rename partial file to completed file let (wal_file_path, wal_file_partial_path) = - wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?; + wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?; fs::rename(&wal_file_partial_path, &wal_file_path)?; } else { // otherwise, file can be reused later @@ -269,10 +280,6 @@ impl PhysicalStorage { /// /// Updates `write_lsn`. fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> { - let wal_seg_size = self - .wal_seg_size - .ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?; - if self.write_lsn != pos { // need to flush the file before discarding it if let Some(mut file) = self.file.take() { @@ -284,17 +291,17 @@ impl PhysicalStorage { while !buf.is_empty() { // Extract WAL location for this block - let xlogoff = self.write_lsn.segment_offset(wal_seg_size) as usize; - let segno = self.write_lsn.segment_number(wal_seg_size); + let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size) as usize; + let segno = self.write_lsn.segment_number(self.wal_seg_size); // If crossing a WAL boundary, only write up until we reach wal segment size. - let bytes_write = if xlogoff + buf.len() > wal_seg_size { - wal_seg_size - xlogoff + let bytes_write = if xlogoff + buf.len() > self.wal_seg_size { + self.wal_seg_size - xlogoff } else { buf.len() }; - self.write_in_segment(segno, xlogoff, &buf[..bytes_write], wal_seg_size)?; + self.write_in_segment(segno, xlogoff, &buf[..bytes_write])?; self.write_lsn += bytes_write as u64; buf = &buf[bytes_write..]; } @@ -309,53 +316,6 @@ impl Storage for PhysicalStorage { self.flush_record_lsn } - /// Storage needs to know wal_seg_size to know which segment to read/write, but - /// wal_seg_size is not always known at the moment of storage creation. This method - /// allows to postpone its initialization. - fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()> { - if state.server.wal_seg_size == 0 { - // wal_seg_size is still unknown. This is dead path normally, should - // be used only in tests. - return Ok(()); - } - - if let Some(wal_seg_size) = self.wal_seg_size { - // physical storage is already initialized - assert_eq!(wal_seg_size, state.server.wal_seg_size as usize); - return Ok(()); - } - - // initialize physical storage - let wal_seg_size = state.server.wal_seg_size as usize; - self.wal_seg_size = Some(wal_seg_size); - - // Find out where stored WAL ends, starting at commit_lsn which is a - // known recent record boundary (unless we don't have WAL at all). - self.write_lsn = if state.commit_lsn == Lsn(0) { - Lsn(0) - } else { - find_end_of_wal(&self.timeline_dir, wal_seg_size, state.commit_lsn)? - }; - - self.write_record_lsn = self.write_lsn; - - // TODO: do we really know that write_lsn is fully flushed to disk? - // If not, maybe it's better to call fsync() here to be sure? - self.update_flush_lsn(); - - info!( - "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}", - self.zttid.timeline_id, self.flush_record_lsn, state.commit_lsn, state.peer_horizon_lsn, - ); - if self.flush_record_lsn < state.commit_lsn - || self.flush_record_lsn < state.peer_horizon_lsn - { - warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", self.zttid.timeline_id); - } - - Ok(()) - } - /// Write WAL to disk. fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { // Disallow any non-sequential writes, which can result in gaps or overwrites. @@ -419,80 +379,83 @@ impl Storage for PhysicalStorage { // We have unflushed data (write_lsn != flush_lsn), but no file. // This should only happen if last file was fully written and flushed, // but haven't updated flush_lsn yet. - assert!(self.write_lsn.segment_offset(self.wal_seg_size.unwrap()) == 0); + if self.write_lsn.segment_offset(self.wal_seg_size) != 0 { + bail!( + "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}", + self.write_lsn, + self.flush_record_lsn + ); + } } // everything is flushed now, let's update flush_lsn - self.update_flush_lsn(); + self.flush_record_lsn = self.write_record_lsn; Ok(()) } /// Truncate written WAL by removing all WAL segments after the given LSN. /// end_pos must point to the end of the WAL record. fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { - let wal_seg_size = self - .wal_seg_size - .ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?; - // Streaming must not create a hole, so truncate cannot be called on non-written lsn - assert!(self.write_lsn == Lsn(0) || self.write_lsn >= end_pos); + if self.write_lsn != Lsn(0) && end_pos > self.write_lsn { + bail!( + "truncate_wal called on non-written WAL, write_lsn={}, end_pos={}", + self.write_lsn, + end_pos + ); + } // Close previously opened file, if any if let Some(mut unflushed_file) = self.file.take() { self.fdatasync_file(&mut unflushed_file)?; } - let xlogoff = end_pos.segment_offset(wal_seg_size) as usize; - let segno = end_pos.segment_number(wal_seg_size); - let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?; + let xlogoff = end_pos.segment_offset(self.wal_seg_size) as usize; + let segno = end_pos.segment_number(self.wal_seg_size); + + // Remove all segments after the given LSN. + remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno)?; + + let (mut file, is_partial) = self.open_or_create(segno)?; // Fill end with zeroes file.seek(SeekFrom::Start(xlogoff as u64))?; - write_zeroes(&mut file, wal_seg_size - xlogoff)?; + write_zeroes(&mut file, self.wal_seg_size - xlogoff)?; self.fdatasync_file(&mut file)?; if !is_partial { // Make segment partial once again let (wal_file_path, wal_file_partial_path) = - wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?; + wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?; fs::rename(&wal_file_path, &wal_file_partial_path)?; } - // Remove all subsequent segments - let mut segno = segno; - loop { - segno += 1; - let (wal_file_path, wal_file_partial_path) = - wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?; - // TODO: better use fs::try_exists which is currently available only in nightly build - if wal_file_path.exists() { - fs::remove_file(&wal_file_path)?; - } else if wal_file_partial_path.exists() { - fs::remove_file(&wal_file_partial_path)?; - } else { - break; - } - } - // Update LSNs self.write_lsn = end_pos; self.write_record_lsn = end_pos; - self.update_flush_lsn(); + self.flush_record_lsn = end_pos; Ok(()) } fn remove_up_to(&self) -> Box Result<()>> { let timeline_dir = self.timeline_dir.clone(); - let wal_seg_size = self.wal_seg_size.unwrap(); + let wal_seg_size = self.wal_seg_size; Box::new(move |segno_up_to: XLogSegNo| { - remove_up_to(&timeline_dir, wal_seg_size, segno_up_to) + remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to) }) } } -/// Remove all WAL segments in timeline_dir <= given segno. -fn remove_up_to(timeline_dir: &Path, wal_seg_size: usize, segno_up_to: XLogSegNo) -> Result<()> { +/// Remove all WAL segments in timeline_dir that match the given predicate. +fn remove_segments_from_disk( + timeline_dir: &Path, + wal_seg_size: usize, + remove_predicate: impl Fn(XLogSegNo) -> bool, +) -> Result<()> { let mut n_removed = 0; + let mut min_removed = u64::MAX; + let mut max_removed = u64::MIN; + for entry in fs::read_dir(&timeline_dir)? { let entry = entry?; let entry_path = entry.path(); @@ -504,19 +467,21 @@ fn remove_up_to(timeline_dir: &Path, wal_seg_size: usize, segno_up_to: XLogSegNo continue; } let (segno, _) = XLogFromFileName(fname_str, wal_seg_size); - if segno <= segno_up_to { + if remove_predicate(segno) { remove_file(entry_path)?; n_removed += 1; + min_removed = min(min_removed, segno); + max_removed = max(max_removed, segno); } } } - let segno_from = segno_up_to - n_removed + 1; - info!( - "removed {} WAL segments [{}; {}]", - n_removed, - XLogFileName(PG_TLI, segno_from, wal_seg_size), - XLogFileName(PG_TLI, segno_up_to, wal_seg_size) - ); + + if n_removed > 0 { + info!( + "removed {} WAL segments [{}; {}]", + n_removed, min_removed, max_removed + ); + } Ok(()) } @@ -526,8 +491,10 @@ pub struct WalReader { pos: Lsn, wal_segment: Option>>, - enable_remote_read: bool, // S3 will be used to read WAL if LSN is not available locally + enable_remote_read: bool, + + // We don't have WAL locally if LSN is less than local_start_lsn local_start_lsn: Lsn, }