diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index edb1af20ea..4300c6dc8c 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -298,7 +298,9 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo let signals = signals::install_shutdown_handlers()?; let mut threads = vec![]; let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100); - GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx); + + // Load all timelines from disk to memory. + GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?; let conf_ = conf.clone(); threads.push( diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 17a96a5bcc..5fbbc147be 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -235,21 +235,15 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { // Push data concurrently to not suffer from latency, with many timelines it can be slow. let handles = active_tlis .iter() - .filter_map(|tli| { - let sk_info = tli.get_public_info(&conf).ok()?; + .map(|tli| { + let sk_info = tli.get_public_info(&conf); let key = timeline_safekeeper_path( conf.broker_etcd_prefix.clone(), tli.zttid, conf.my_id, ); let lease = leases.remove(&tli.zttid).unwrap(); - Some(tokio::spawn(push_sk_info( - tli.zttid, - client.clone(), - key, - sk_info, - lease, - ))) + tokio::spawn(push_sk_info(tli.zttid, client.clone(), key, sk_info, lease)) }) .collect::>(); for h in handles { @@ -290,10 +284,10 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { match subscription.value_updates.recv().await { Some(new_info) => { // note: there are blocking operations below, but it's considered fine for now - let tli = GlobalTimelines::get(new_info.key.id); - let _ = tli - .record_safekeeper_info(&new_info.value, new_info.key.node_id) - .await; + if let Ok(tli) = GlobalTimelines::get(new_info.key.id) { + tli.record_safekeeper_info(&new_info.value, new_info.key.node_id) + .await? + } } None => { // XXX it means we lost connection with etcd, error is consumed inside sub object diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index 5017104dbd..d1af9032b7 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -69,6 +69,7 @@ impl FileStorage { }) } + /// Create file storage for a new timeline, but don't persist it yet. pub fn create_new( zttid: &ZTenantTimelineId, conf: &SafeKeeperConf, @@ -78,19 +79,18 @@ impl FileStorage { let tenant_id = zttid.tenant_id.to_string(); let timeline_id = zttid.timeline_id.to_string(); - let mut store = FileStorage { + let store = FileStorage { timeline_dir, conf: conf.clone(), persist_control_file_seconds: PERSIST_CONTROL_FILE_SECONDS .with_label_values(&[&tenant_id, &timeline_id]), - state: state.clone(), + state, }; - store.persist(&state)?; Ok(store) } - // Check the magic/version in the on-disk data and deserialize it, if possible. + /// Check the magic/version in the on-disk data and deserialize it, if possible. fn deser_sk_state(buf: &mut &[u8]) -> Result { // Read the version independent part let magic = buf.read_u32::()?; @@ -110,7 +110,7 @@ impl FileStorage { upgrade_control_file(buf, version) } - // Load control file for given zttid at path specified by conf. + /// Load control file for given zttid at path specified by conf. pub fn load_control_file_conf( conf: &SafeKeeperConf, zttid: &ZTenantTimelineId, @@ -171,8 +171,8 @@ impl Deref for FileStorage { } impl Storage for FileStorage { - // persists state durably to underlying storage - // for description see https://lwn.net/Articles/457667/ + /// persists state durably to underlying storage + /// for description see https://lwn.net/Articles/457667/ fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { let _timer = &self.persist_control_file_seconds.start_timer(); diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index c64ed84e3b..48faba0fb1 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -3,15 +3,15 @@ use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage}; use crate::receive_wal::ReceiveWalConn; -use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage}; + use crate::send_wal::ReplicationConn; -use crate::timeline::Timeline; + use crate::{GlobalTimelines, SafeKeeperConf}; use anyhow::{bail, Context, Result}; use postgres_ffi::PG_TLI; use regex::Regex; -use std::sync::Arc; + use tracing::info; use utils::{ lsn::Lsn, @@ -27,7 +27,7 @@ pub struct SafekeeperPostgresHandler { pub appname: Option, pub ztenantid: Option, pub ztimelineid: Option, - pub timeline: Option>, + pub zttid: ZTenantTimelineId, } /// Parsed Postgres command. @@ -100,11 +100,7 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { let tenantid = self.ztenantid.context("tenantid is required")?; let timelineid = self.ztimelineid.context("timelineid is required")?; - if self.timeline.is_none() { - self.timeline = Some(GlobalTimelines::get(ZTenantTimelineId::new( - tenantid, timelineid, - ))); - } + self.zttid = ZTenantTimelineId::new(tenantid, timelineid); match cmd { SafekeeperPostgresCommand::StartWalPush => ReceiveWalConn::new(pgb) @@ -129,38 +125,26 @@ impl SafekeeperPostgresHandler { appname: None, ztenantid: None, ztimelineid: None, - timeline: None, + zttid: ZTenantTimelineId::empty(), } } - /// Shortcut for calling `process_msg` in the timeline. - pub fn process_safekeeper_msg( - &self, - msg: &ProposerAcceptorMessage, - ) -> Result> { - self.timeline - .as_ref() - .unwrap() - .process_msg(msg) - .context("failed to process ProposerAcceptorMessage") - } - /// /// Handle IDENTIFY_SYSTEM replication command /// fn handle_identify_system(&mut self, pgb: &mut PostgresBackend) -> Result<()> { - let tli = self.timeline.as_ref().unwrap(); + let tli = GlobalTimelines::get(self.zttid)?; let lsn = if self.is_walproposer_recovery() { // walproposer should get all local WAL until flush_lsn - tli.get_flush_lsn()? + tli.get_flush_lsn() } else { // other clients shouldn't get any uncommitted WAL - tli.get_state()?.0.commit_lsn + tli.get_state().0.commit_lsn } .to_string(); - let sysid = tli.get_state()?.1.server.system_id.to_string(); + let sysid = tli.get_state().1.server.system_id.to_string(); let lsn_bytes = lsn.as_bytes(); let tli = PG_TLI.to_string(); let tli_bytes = tli.as_bytes(); diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 5d843585d1..a7d1d75732 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -10,8 +10,8 @@ use std::sync::Arc; use crate::safekeeper::Term; use crate::safekeeper::TermHistory; -use crate::timeline; -use crate::timeline::TimelineDeleteForceResult; + +use crate::timelines_global_map::TimelineDeleteForceResult; use crate::GlobalTimelines; use crate::SafeKeeperConf; use etcd_broker::subscription_value::SkTimelineInfo; @@ -99,9 +99,9 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result, ) -> Result, ApiError> { @@ -153,12 +147,8 @@ async fn timeline_delete_force_handler( ); check_permission(&request, Some(zttid.tenant_id))?; ensure_no_body(&mut request).await?; - json_response( - StatusCode::OK, - timeline::delete_force(get_conf(&request), &zttid) - .await - .map_err(ApiError::from_err)?, - ) + let resp = GlobalTimelines::delete_force(&zttid).map_err(ApiError::from_err)?; + json_response(StatusCode::OK, resp) } /// Deactivates all timelines for the tenant and removes its data directory. @@ -171,8 +161,7 @@ async fn tenant_delete_force_handler( ensure_no_body(&mut request).await?; json_response( StatusCode::OK, - timeline::delete_force_all_for_tenant(get_conf(&request), &tenant_id) - .await + GlobalTimelines::delete_force_all_for_tenant(&tenant_id) .map_err(ApiError::from_err)? .iter() .map(|(zttid, resp)| (format!("{}", zttid.timeline_id), *resp)) @@ -189,7 +178,7 @@ async fn record_safekeeper_info(mut request: Request) -> Result Result<()> { info!("JSON_CTRL request: {:?}", append_request); + let tli = GlobalTimelines::get(spg.zttid)?; // need to init safekeeper state before AppendRequest - prepare_safekeeper(spg)?; + prepare_safekeeper(&tli)?; // if send_proposer_elected is true, we need to update local history if append_request.send_proposer_elected { - send_proposer_elected(spg, append_request.term, append_request.epoch_start_lsn)?; + send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn)?; } - let inserted_wal = append_logical_message(spg, append_request)?; + let inserted_wal = append_logical_message(&tli, append_request)?; let response = AppendResult { - state: spg.timeline.as_ref().unwrap().get_state()?.1, + state: tli.get_state().1, inserted_wal, }; let response_data = serde_json::to_vec(&response)?; @@ -92,39 +95,28 @@ pub fn handle_json_ctrl( /// Prepare safekeeper to process append requests without crashes, /// by sending ProposerGreeting with default server.wal_seg_size. -fn prepare_safekeeper(spg: &mut SafekeeperPostgresHandler) -> Result<()> { +fn prepare_safekeeper(tli: &Arc) -> Result<()> { let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting { protocol_version: 2, // current protocol pg_version: 0, // unknown proposer_id: [0u8; 16], system_id: 0, - ztli: spg.ztimelineid.unwrap(), - tenant_id: spg.ztenantid.unwrap(), + ztli: tli.zttid.timeline_id, + tenant_id: tli.zttid.tenant_id, tli: 0, wal_seg_size: WAL_SEGMENT_SIZE as u32, // 16MB, default for tests }); - let response = spg - .timeline - .as_ref() - .unwrap() - .process_msg(&greeting_request)?; + let response = tli.process_msg(&greeting_request)?; match response { Some(AcceptorProposerMessage::Greeting(_)) => Ok(()), _ => anyhow::bail!("not GreetingResponse"), } } -fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: Lsn) -> Result<()> { +fn send_proposer_elected(tli: &Arc, term: Term, lsn: Lsn) -> Result<()> { // add new term to existing history - let history = spg - .timeline - .as_ref() - .unwrap() - .get_state()? - .1 - .acceptor_state - .term_history; + let history = tli.get_state().1.acceptor_state.term_history; let history = history.up_to(lsn.checked_sub(1u64).unwrap()); let mut history_entries = history.0; history_entries.push(TermSwitchEntry { term, lsn }); @@ -137,10 +129,7 @@ fn send_proposer_elected(spg: &mut SafekeeperPostgresHandler, term: Term, lsn: L timeline_start_lsn: lsn, }); - spg.timeline - .as_ref() - .unwrap() - .process_msg(&proposer_elected_request)?; + tli.process_msg(&proposer_elected_request)?; Ok(()) } @@ -153,12 +142,9 @@ struct InsertedWAL { /// Extend local WAL with new LogicalMessage record. To do that, /// create AppendRequest with new WAL and pass it to safekeeper. -fn append_logical_message( - spg: &mut SafekeeperPostgresHandler, - msg: &AppendLogicalMessage, -) -> Result { +fn append_logical_message(tli: &Arc, msg: &AppendLogicalMessage) -> Result { let wal_data = xlog_utils::encode_logical_message(&msg.lm_prefix, &msg.lm_message); - let sk_state = spg.timeline.as_ref().unwrap().get_state()?.1; + let sk_state = tli.get_state().1; let begin_lsn = msg.begin_lsn; let end_lsn = begin_lsn + wal_data.len() as u64; @@ -182,11 +168,7 @@ fn append_logical_message( wal_data: Bytes::from(wal_data), }); - let response = spg - .timeline - .as_ref() - .unwrap() - .process_msg(&append_request)?; + let response = tli.process_msg(&append_request)?; let append_response = match response { Some(AcceptorProposerMessage::AppendResponse(resp)) => resp, diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 9f3daa55d2..2039b64724 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -7,7 +7,9 @@ use anyhow::{anyhow, bail, Result}; use bytes::BytesMut; use tracing::*; +use crate::safekeeper::ServerInfo; use crate::timeline::Timeline; +use crate::GlobalTimelines; use std::net::SocketAddr; use std::sync::mpsc::channel; @@ -66,15 +68,21 @@ impl<'pg> ReceiveWalConn<'pg> { // Receive information about server let next_msg = poll_reader.recv_msg()?; - match next_msg { + let tli = match next_msg { ProposerAcceptorMessage::Greeting(ref greeting) => { info!( "start handshake with wal proposer {} sysid {} timeline {}", self.peer_addr, greeting.system_id, greeting.tli, ); + let server_info = ServerInfo { + pg_version: greeting.pg_version, + system_id: greeting.system_id, + wal_seg_size: greeting.wal_seg_size, + }; + GlobalTimelines::create(spg.zttid, server_info)? } _ => bail!("unexpected message {:?} instead of greeting", next_msg), - } + }; let mut next_msg = Some(next_msg); @@ -87,7 +95,7 @@ impl<'pg> ReceiveWalConn<'pg> { while let Some(ProposerAcceptorMessage::AppendRequest(append_request)) = next_msg { let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); - let reply = spg.process_safekeeper_msg(&msg)?; + let reply = tli.process_msg(&msg)?; if let Some(reply) = reply { self.write_msg(&reply)?; } @@ -96,13 +104,13 @@ impl<'pg> ReceiveWalConn<'pg> { } // flush all written WAL to the disk - let reply = spg.process_safekeeper_msg(&ProposerAcceptorMessage::FlushWAL)?; + let reply = tli.process_msg(&ProposerAcceptorMessage::FlushWAL)?; if let Some(reply) = reply { self.write_msg(&reply)?; } } else if let Some(msg) = next_msg.take() { // process other message - let reply = spg.process_safekeeper_msg(&msg)?; + let reply = tli.process_msg(&msg)?; if let Some(reply) = reply { self.write_msg(&reply)?; } @@ -111,9 +119,9 @@ impl<'pg> ReceiveWalConn<'pg> { // Register the connection and defer unregister. Do that only // after processing first message, as it sets wal_seg_size, // wanted by many. - spg.timeline.as_ref().unwrap().on_compute_connect()?; + tli.on_compute_connect()?; _guard = Some(ComputeConnectionGuard { - timeline: Arc::clone(spg.timeline.as_ref().unwrap()), + timeline: Arc::clone(&tli), }); first_time_through = false; } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 39858e34c9..600b1e8572 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -530,9 +530,6 @@ where state.timeline_id ); } - if state.server.wal_seg_size == 0 { - bail!("Calling SafeKeeper::new with empty wal_seg_size"); - } Ok(SafeKeeper { global_commit_lsn: state.commit_lsn, @@ -788,6 +785,11 @@ where Ok(()) } + /// Persist control file to disk, called only after timeline creation (bootstrap). + pub fn persist(&mut self) -> Result<()> { + self.persist_control_file(self.state.clone()) + } + /// Persist in-memory state to the disk, taking other data from state. fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> { state.commit_lsn = self.inmem.commit_lsn; @@ -938,7 +940,7 @@ where #[cfg(test)] mod tests { - use postgres_ffi::v14::pg_constants; + use postgres_ffi::WAL_SEGMENT_SIZE; use super::*; use crate::wal_storage::Storage; @@ -966,7 +968,7 @@ mod tests { fn test_sk_state() -> SafeKeeperState { let mut state = SafeKeeperState::empty(); - state.server.wal_seg_size = pg_constants::WAL_SEGMENT_SIZE as u32; + state.server.wal_seg_size = WAL_SEGMENT_SIZE as u32; state.tenant_id = ZTenantId::from([1u8; 16]); state.timeline_id = ZTimelineId::from([1u8; 16]); state diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index db3b931f08..3b90b5f27f 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -4,6 +4,7 @@ use crate::handler::SafekeeperPostgresHandler; use crate::timeline::{ReplicaState, Timeline}; use crate::wal_storage::WalReader; +use crate::GlobalTimelines; use anyhow::{bail, Context, Result}; use bytes::Bytes; @@ -78,13 +79,7 @@ struct ReplicationConnGuard { impl Drop for ReplicationConnGuard { fn drop(&mut self) { - let res = self.timeline.remove_replica(self.replica); - if let Err(e) = res { - warn!( - "Failed to remove replica {} from timeline {}: {}", - self.replica, self.timeline.zttid, e - ); - } + self.timeline.remove_replica(self.replica); } } @@ -118,7 +113,7 @@ impl ReplicationConn { // Note: deserializing is on m[1..] because we skip the tag byte. state.hs_feedback = HotStandbyFeedback::des(&m[1..]) .context("failed to deserialize HotStandbyFeedback")?; - timeline.update_replica_state(replica_id, state)?; + timeline.update_replica_state(replica_id, state); } Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => { let _reply = StandbyReply::des(&m[1..]) @@ -140,7 +135,7 @@ impl ReplicationConn { // This replica is the source of information to resend to compute. state.pageserver_feedback = Some(reply); - timeline.update_replica_state(replica_id, state)?; + timeline.update_replica_state(replica_id, state); } _ => warn!("unexpected message {:?}", msg), } @@ -173,7 +168,7 @@ impl ReplicationConn { ) -> Result<()> { let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap()).entered(); - let tli = Arc::clone(spg.timeline.as_ref().unwrap()); + let tli = GlobalTimelines::get(spg.zttid)?; // spawn the background thread which receives HotStandbyFeedback messages. let bg_timeline = Arc::clone(&tli); @@ -182,7 +177,7 @@ impl ReplicationConn { let state = ReplicaState::new(); // This replica_id is used below to check if it's time to stop replication. - let replica_id = bg_timeline.add_replica(state)?; + let replica_id = bg_timeline.add_replica(state); // Use a guard object to remove our entry from the timeline, when the background // thread and us have both finished using it. @@ -209,7 +204,7 @@ impl ReplicationConn { .build()?; runtime.block_on(async move { - let (inmem_state, persisted_state) = tli.get_state()?; + let (inmem_state, persisted_state) = tli.get_state(); // add persisted_state.timeline_start_lsn == Lsn(0) check // Walproposer gets special handling: safekeeper must give proposer all @@ -222,7 +217,7 @@ impl ReplicationConn { // on this safekeeper itself. That's ok as (old) proposer will never be // able to commit such WAL. let stop_pos: Option = if spg.is_walproposer_recovery() { - let wal_end = tli.get_flush_lsn()?; + let wal_end = tli.get_flush_lsn(); Some(wal_end) } else { None @@ -263,7 +258,7 @@ impl ReplicationConn { } else { // TODO: also check once in a while whether we are walsender // to right pageserver. - if tli.should_walsender_stop(replica_id)? { + if tli.should_walsender_stop(replica_id) { // Shut down, timeline is suspended. // TODO create proper error type for this bail!("end streaming to {:?}", spg.appname); diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index e081cc4f29..2aba45f3cf 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -1,23 +1,19 @@ //! This module implements Timeline lifecycle management and has all neccessary code //! to glue together SafeKeeper and all other background services. -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{anyhow, bail, Result}; use etcd_broker::subscription_value::SkTimelineInfo; -use postgres_ffi::v14::xlog_utils::XLogSegNo; +use postgres_ffi::XLogSegNo; -use serde::Serialize; use tokio::sync::watch; use std::cmp::{max, min}; -use std::collections::HashMap; -use std::fs; -use parking_lot::{MappedMutexGuard, Mutex, MutexGuard}; +use parking_lot::{Mutex, MutexGuard}; use std::io; use std::path::PathBuf; -use std::time::Instant; use tokio::sync::mpsc::Sender; use tracing::*; @@ -25,15 +21,15 @@ use tracing::*; use utils::{ lsn::Lsn, pq_proto::ReplicationFeedback, - zid::{NodeId, ZTenantId, ZTenantTimelineId}, + zid::{NodeId, ZTenantTimelineId}, }; +use crate::control_file; use crate::safekeeper::{ AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, SafekeeperMemState, ServerInfo, }; use crate::send_wal::HotStandbyFeedback; -use crate::{control_file, GlobalTimelines}; use crate::metrics::FullTimelineInfo; use crate::wal_storage; @@ -75,7 +71,7 @@ impl ReplicaState { } /// Shared state associated with database instance -struct SharedState { +pub struct SharedState { /// Safekeeper object sk: SafeKeeper, /// State of replicas @@ -97,8 +93,8 @@ struct SharedState { } impl SharedState { - /// Initialize timeline state, create a control file on disk. - fn create( + /// Initialize fresh timeline state without persisting anything to disk. + fn create_new( conf: &SafeKeeperConf, zttid: &ZTenantTimelineId, state: SafeKeeperState, @@ -107,6 +103,8 @@ impl SharedState { bail!(TimelineError::UninitializedWalSegSize(*zttid)); } + // We don't want to write anything to disk, because we may have existing timeline there. + // These functions should not change anything on disk. let control_store = control_file::FileStorage::create_new(zttid, conf, state)?; let wal_store = wal_storage::PhysicalStorage::new(zttid, conf, &control_store)?; let sk = SafeKeeper::new(control_store, wal_store, conf.my_id)?; @@ -148,7 +146,6 @@ impl SharedState { }) } - fn is_active(&self) -> bool { self.is_wal_backup_required() // FIXME: add tracking of relevant pageservers and check them here individually, @@ -269,225 +266,189 @@ impl SharedState { } } -enum TimelineState { - /// Timeline on-disk state is unknown. We either haven't tried to restore - /// timeline state from disk or got an error. - Uninitialized(UninitializedState), - /// Timeline exists on disk and loaded in memory. - Loaded(Box), - /// Timeline was deleted and cannot be used anymore. - Deleted, -} - -impl TimelineState { - fn inner_mut(&mut self) -> &mut SharedState { - match self { - TimelineState::Loaded(state) => state, - _ => panic!("timeline state is not initialized"), - } - } -} - -#[derive(Debug)] -struct UninitializedState { - /// Safekeeper config. - conf: Box, - /// Error that occurred on last attempt to restore timeline state from disk. - restore_error: Option, - /// Timestamp of the last restore attempt. - last_restore_attempt: Option, -} - #[derive(Debug, thiserror::Error)] pub enum TimelineError { - #[error("Timeline {0} was deleted and cannot be used anymore")] - Deleted(ZTenantTimelineId), + #[error("Timeline {0} was cancelled and cannot be used anymore")] + Cancelled(ZTenantTimelineId), + #[error("Timeline {0} was not found in global map")] + NotFound(ZTenantTimelineId), + #[error("Timeline {0} exists on disk, but wasn't loaded on startup")] + Invalid(ZTenantTimelineId), + #[error("Timeline {0} is already exists")] + AlreadyExists(ZTenantTimelineId), #[error("Timeline {0} is not initialized, wal_seg_size is zero")] UninitializedWalSegSize(ZTenantTimelineId), - #[error("Timeline {0} was not found on disk")] - NotFound(ZTenantTimelineId), } /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline. /// It also holds SharedState and provides mutually exclusive access to it. pub struct Timeline { pub zttid: ZTenantTimelineId, + /// Sending here asks for wal backup launcher attention (start/stop /// offloading). Sending zttid instead of concrete command allows to do /// sending without timeline lock. wal_backup_launcher_tx: Sender, + + /// Used to broadcast commit_lsn updates to all background jobs. commit_lsn_watch_tx: watch::Sender, - /// For breeding receivers. commit_lsn_watch_rx: watch::Receiver, - mutex: Mutex, + + /// Safekeeper and other state, that should remain consistent and synchronized + /// with the disk. + mutex: Mutex, + + /// Cancellation channel. Delete/cancel will send `true` here as a cancellation signal. + cancellation_tx: watch::Sender, + + /// Timeline should not be used after cancellation. Background tasks should + /// monitor this channel and stop eventually after receiving `true` from this channel. + cancellation_rx: watch::Receiver, + + /// Directory where timeline state is stored. + timeline_dir: PathBuf, } impl Timeline { - /// Create a new uninitialized timeline. Timeline will be tried to restore from disk - /// automatically on most function calls. - pub fn new( + /// Load existing timeline from disk. + pub fn load_timeline( conf: SafeKeeperConf, zttid: ZTenantTimelineId, wal_backup_launcher_tx: Sender, - ) -> Timeline { - let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID); - Timeline { + ) -> Result { + let shared_state = SharedState::restore(&conf, &zttid)?; + let (commit_lsn_watch_tx, commit_lsn_watch_rx) = + watch::channel(shared_state.sk.state.commit_lsn); + let (cancellation_tx, cancellation_rx) = watch::channel(false); + + Ok(Timeline { zttid, wal_backup_launcher_tx, commit_lsn_watch_tx, commit_lsn_watch_rx, - mutex: Mutex::new(TimelineState::Uninitialized(UninitializedState { - conf: Box::new(conf), - restore_error: None, - last_restore_attempt: None, - })), - } + mutex: Mutex::new(shared_state), + cancellation_rx, + cancellation_tx, + timeline_dir: conf.timeline_dir(&zttid), + }) } - /// Try to restore timeline state from disk. - fn load_from_disk( - zttid: &ZTenantTimelineId, - uninit: &mut UninitializedState, - ) -> Result { - info!("Restoring timeline {} from disk", zttid); + /// Create a new timeline, which is not yet persisted to disk. + pub fn create_empty( + conf: SafeKeeperConf, + zttid: ZTenantTimelineId, + wal_backup_launcher_tx: Sender, + server_info: ServerInfo, + ) -> Result { + let (commit_lsn_watch_tx, commit_lsn_watch_rx) = watch::channel(Lsn::INVALID); + let (cancellation_tx, cancellation_rx) = watch::channel(false); + let state = SafeKeeperState::new(&zttid, server_info, vec![]); - let res = SharedState::restore(&uninit.conf, zttid); - if let Err(e) = &res { - uninit.restore_error = Some(format!("{}", e)); - uninit.last_restore_attempt = Some(Instant::now()); - error!("Failed to restore timeline {} from disk: {}", zttid, e); - } - res + Ok(Timeline { + zttid, + wal_backup_launcher_tx, + commit_lsn_watch_tx, + commit_lsn_watch_rx, + mutex: Mutex::new(SharedState::create_new(&conf, &zttid, state)?), + cancellation_rx, + cancellation_tx, + timeline_dir: conf.timeline_dir(&zttid), + }) } - /// Try to create a new timeline on disk. - fn create_on_disk( - zttid: &ZTenantTimelineId, - uninit: &mut UninitializedState, - state: SafeKeeperState, - ) -> Result { - let conf = &uninit.conf; - info!("Creating timeline {} on disk", zttid); - - // TODO: check directory existence - let dir = conf.timeline_dir(zttid); - fs::create_dir_all(dir)?; - - SharedState::create(conf, zttid, state).context("failed to create shared state") - } - - /// Initialize timeline with shared_state. - fn set_shared_state( - &self, - state: SharedState, - state_lock: &mut MutexGuard, - ) -> Result<()> { - assert!(matches!(&**state_lock, TimelineState::Uninitialized(_))); - self.commit_lsn_watch_tx.send(state.sk.inmem.commit_lsn)?; - **state_lock = TimelineState::Loaded(Box::new(state)); - Ok(()) - } - - /// Require timeline state to be loaded. If it's not loaded, try to restore it from disk. - fn require_loaded(&self) -> Result> { - let mut state = self.mutex.lock(); - match &mut *state { - TimelineState::Loaded(_) => {} - TimelineState::Uninitialized(uninit) => { - if let Some(err) = &uninit.restore_error { - // We have an error from last restore attempt, next attempt will not help - // unless something was changed on disk manually. If we will try to restore - // every time, we will have a lot of spam in the logs. - bail!(err.clone()) - } - // TODO: we can allow restoring once a minute, to automatically recover from - // restore failure when something was changed on disk. - - let shared_state = Self::load_from_disk(&self.zttid, uninit)?; - self.set_shared_state(shared_state, &mut state)?; - } - TimelineState::Deleted => bail!(TimelineError::Deleted(self.zttid)), - } - - Ok(MutexGuard::map(state, TimelineState::inner_mut)) - } - - /// Try to load timeline state from disk. If timeline control file is not found, - /// create and initialize a state for the new timeline. - pub fn init_create_or_load(&self, server_info: ServerInfo) -> Result<()> { - let mut state_lock = self.mutex.lock(); - match &mut *state_lock { - TimelineState::Uninitialized(uninit) => { - let shared_state = Self::load_from_disk(&self.zttid, uninit); - match shared_state { - Ok(shared_state) => { - // restored successfully - self.set_shared_state(shared_state, &mut state_lock)?; - Ok(()) - } - Err(e) => { - if let Some(e) = e.downcast_ref::() { - if matches!(e, TimelineError::NotFound(_)) { - // timeline does not exist on disk, create new one - let state = SafeKeeperState::new(&self.zttid, server_info, vec![]); - let shared_state = - Self::create_on_disk(&self.zttid, uninit, state)?; - self.set_shared_state(shared_state, &mut state_lock)?; - return Ok(()); - } - } - Err(e) - } - } - } - TimelineState::Loaded(_) => Ok(()), - TimelineState::Deleted => bail!(TimelineError::Deleted(self.zttid)), - } - } - - /// Deactivates and marks timeline as deleted. Returns whether the timeline was - /// already active. The timeline can no longer be used after this call, almost - /// all functions will return `TimelineError::Deleted`. + /// Initialize fresh timeline on disk and start background tasks. If bootstrap + /// fails, timeline is cancelled and cannot be used anymore. /// - /// We assume all threads will stop by themselves eventually (possibly with errors, - /// but no panics). There should be no compute threads (as we're deleting the timeline), - /// actually. Some WAL may be left unsent, but we're deleting the timeline anyway. - async fn delete(&self) -> bool { - let was_active = { - let mut state_lock = self.mutex.lock(); - let was_active = match &*state_lock { - TimelineState::Loaded(state) => state.active, - TimelineState::Uninitialized(_) => false, + /// Bootstrap is transactional, so if it fails, created files will be deleted, + /// and state on disk should remain unchanged. + pub fn bootstrap(&self, shared_state: &mut MutexGuard) -> Result<()> { + match std::fs::metadata(&self.timeline_dir) { + Ok(_) => { + // Timeline directory exists on disk, we should leave state unchanged + // and return error. + bail!(TimelineError::Invalid(self.zttid)); + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => { + return Err(e.into()); + } + } - // already deleted - TimelineState::Deleted => return false, - }; + // Create timeline directory. + std::fs::create_dir_all(&self.timeline_dir)?; - *state_lock = TimelineState::Deleted; - was_active - }; + // Write timeline to disk and TODO: start background tasks. + match || -> Result<()> { + shared_state.sk.persist()?; + // TODO: add more initialization steps here + Ok(()) + }() { + Ok(_) => Ok(()), + Err(e) => { + // Bootstrap failed, cancel timeline and remove timeline directory. + self.cancel(); - // XXX: we can have a cancellation channel to notify all tasks that they should stop + if let Err(fs_err) = std::fs::remove_dir_all(&self.timeline_dir) { + warn!( + "failed to remove timeline {} directory after bootstrap failure: {}", + self.zttid, fs_err + ); + } - let res = self.wal_backup_launcher_tx.send(self.zttid).await; + Err(e) + } + } + } + + /// Delete timeline from disk completely, by removing timeline directory. Background + /// timeline activities will stop eventually. + pub fn delete_from_disk( + &self, + shared_state: &mut MutexGuard, + ) -> Result<(bool, bool)> { + let was_active = shared_state.active; + self.cancel(); + let dir_existed = delete_dir(&self.timeline_dir)?; + Ok((dir_existed, was_active)) + } + + /// Cancel timeline to prevent further usage. Background tasks will stop + /// eventually after receiving cancellation signal. + fn cancel(&self) { + info!("Timeline {} is cancelled", self.zttid); + let _ = self.cancellation_tx.send(true); + let res = self.wal_backup_launcher_tx.blocking_send(self.zttid); if let Err(e) = res { error!("Failed to send stop signal to wal_backup_launcher: {}", e); } + } - was_active + /// Returns if timeline is cancelled. + pub fn is_cancelled(&self) -> bool { + *self.cancellation_rx.borrow() + } + + /// Take a writing mutual exclusive lock on timeline shared_state. + pub fn write_shared_state(&self) -> MutexGuard { + self.mutex.lock() } /// Register compute connection, starting timeline-related activity if it is /// not running yet. pub fn on_compute_connect(&self) -> Result<()> { + if self.is_cancelled() { + bail!(TimelineError::Cancelled(self.zttid)); + } + let is_wal_backup_action_pending: bool; { - let mut shared_state = self.require_loaded()?; + let mut shared_state = self.write_shared_state(); shared_state.num_computes += 1; is_wal_backup_action_pending = shared_state.update_status(self.zttid); } // Wake up wal backup launcher, if offloading not started yet. if is_wal_backup_action_pending { + // Can fail only if channel to a static thread got closed, which is not normal at all. self.wal_backup_launcher_tx.blocking_send(self.zttid)?; } Ok(()) @@ -496,14 +457,19 @@ impl Timeline { /// De-register compute connection, shutting down timeline activity if /// pageserver doesn't need catchup. pub fn on_compute_disconnect(&self) -> Result<()> { + if self.is_cancelled() { + bail!(TimelineError::Cancelled(self.zttid)); + } + let is_wal_backup_action_pending: bool; { - let mut shared_state = self.require_loaded()?; + let mut shared_state = self.write_shared_state(); shared_state.num_computes -= 1; is_wal_backup_action_pending = shared_state.update_status(self.zttid); } // Wake up wal backup launcher, if it is time to stop the offloading. if is_wal_backup_action_pending { + // Can fail only if channel to a static thread got closed, which is not normal at all. self.wal_backup_launcher_tx.blocking_send(self.zttid)?; } Ok(()) @@ -511,8 +477,12 @@ impl Timeline { /// Returns true if walsender should stop sending WAL to pageserver. /// TODO: check this pageserver is actually interested in this timeline. - pub fn should_walsender_stop(&self, replica_id: usize) -> Result { - let mut shared_state = self.require_loaded()?; + pub fn should_walsender_stop(&self, replica_id: usize) -> bool { + if self.is_cancelled() { + return true; + } + + let mut shared_state = self.write_shared_state(); if shared_state.num_computes == 0 { let replica_state = shared_state.replicas[replica_id].unwrap(); let stop = shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet @@ -520,59 +490,62 @@ impl Timeline { replica_state.remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn); if stop { shared_state.update_status(self.zttid); - return Ok(true); + return true; } } - Ok(false) + false } /// Returns whether s3 offloading is required and sets current status as /// matching it. pub fn wal_backup_attend(&self) -> bool { - let shared_state = self.require_loaded(); - if shared_state.is_err() { + if self.is_cancelled() { return false; } - let mut shared_state = shared_state.unwrap(); - shared_state.wal_backup_attend() + + self.write_shared_state().wal_backup_attend() } /// Can this safekeeper offload to s3? Recently joined safekeepers might not /// have necessary WAL. pub fn can_wal_backup(&self) -> bool { - self.require_loaded() - .map(|state| state.can_wal_backup()) - .unwrap_or(false) + if self.is_cancelled() { + return false; + } + + let shared_state = self.write_shared_state(); + shared_state.can_wal_backup() } /// Returns full timeline info, required for the metrics. If the timeline is /// not active, returns None instead. pub fn info_for_metrics(&self) -> Option { - self.require_loaded() - .map(|state| { - if state.active { - Some(FullTimelineInfo { - zttid: self.zttid, - replicas: state - .replicas - .iter() - .filter_map(|r| r.as_ref()) - .copied() - .collect(), - wal_backup_active: state.wal_backup_active, - timeline_is_active: state.active, - num_computes: state.num_computes, - last_removed_segno: state.last_removed_segno, - epoch_start_lsn: state.sk.epoch_start_lsn, - mem_state: state.sk.inmem.clone(), - persisted_state: state.sk.state.clone(), - flush_lsn: state.sk.wal_store.flush_lsn(), - }) - } else { - None - } + if self.is_cancelled() { + return None; + } + + let state = self.write_shared_state(); + if state.active { + Some(FullTimelineInfo { + zttid: self.zttid, + replicas: state + .replicas + .iter() + .filter_map(|r| r.as_ref()) + .copied() + .collect(), + wal_backup_active: state.wal_backup_active, + timeline_is_active: state.active, + num_computes: state.num_computes, + last_removed_segno: state.last_removed_segno, + epoch_start_lsn: state.sk.epoch_start_lsn, + mem_state: state.sk.inmem.clone(), + persisted_state: state.sk.state.clone(), + flush_lsn: state.sk.wal_store.flush_lsn(), }) - .unwrap_or(None) + } else { + None + } } /// Returns commit_lsn watch channel. Channel should be obtained only after @@ -586,10 +559,14 @@ impl Timeline { &self, msg: &ProposerAcceptorMessage, ) -> Result> { + if self.is_cancelled() { + bail!(TimelineError::Cancelled(self.zttid)); + } + let mut rmsg: Option; let commit_lsn: Lsn; { - let mut shared_state = self.require_loaded()?; + let mut shared_state = self.write_shared_state(); rmsg = shared_state.sk.process_msg(msg)?; // if this is AppendResponse, fill in proper hot standby feedback and disk consistent lsn @@ -607,42 +584,47 @@ impl Timeline { Ok(rmsg) } - /// Returns wal_seg_size or None if timeline is not loaded. - pub fn get_wal_seg_size(&self) -> Result { - self.require_loaded().map(|state| state.get_wal_seg_size()) + /// Returns wal_seg_size. + pub fn get_wal_seg_size(&self) -> usize { + self.write_shared_state().get_wal_seg_size() } /// Returns true only if the timeline is loaded and active. pub fn is_active(&self) -> bool { - self.require_loaded() - .map(|state| state.active) - .unwrap_or(false) + if self.is_cancelled() { + return false; + } + + self.write_shared_state().active } - /// Returns state of the timeline or None if timeline is not loaded. - pub fn get_state(&self) -> Result<(SafekeeperMemState, SafeKeeperState)> { - self.require_loaded() - .map(|state| (state.sk.inmem.clone(), state.sk.state.clone())) + /// Returns state of the timeline. + pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) { + let state = self.write_shared_state(); + (state.sk.inmem.clone(), state.sk.state.clone()) } - /// Returns backup_lsn or None if timeline is not loaded. - pub fn get_wal_backup_lsn(&self) -> Result { - self.require_loaded().map(|state| state.sk.inmem.backup_lsn) + /// Returns latest backup_lsn. + pub fn get_wal_backup_lsn(&self) -> Lsn { + self.write_shared_state().sk.inmem.backup_lsn } /// Sets backup_lsn to the given value. pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> { - let mut shared_state = self.require_loaded()?; - shared_state.sk.inmem.backup_lsn = backup_lsn; + if self.is_cancelled() { + bail!(TimelineError::Cancelled(self.zttid)); + } + + self.write_shared_state().sk.inmem.backup_lsn = backup_lsn; // we should check whether to shut down offloader, but this will be done // soon by peer communication anyway. Ok(()) } /// Return public safekeeper info for broadcasting to broker and other peers. - pub fn get_public_info(&self, conf: &SafeKeeperConf) -> Result { - let shared_state = self.require_loaded()?; - Ok(SkTimelineInfo { + pub fn get_public_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo { + let shared_state = self.write_shared_state(); + SkTimelineInfo { last_log_term: Some(shared_state.sk.get_epoch()), flush_lsn: Some(shared_state.sk.wal_store.flush_lsn()), // note: this value is not flushed to control file yet and can be lost @@ -655,7 +637,7 @@ impl Timeline { peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn), safekeeper_connstr: Some(conf.listen_pg_addr.clone()), backup_lsn: Some(shared_state.sk.inmem.backup_lsn), - }) + } } /// Update timeline state with peer safekeeper data. @@ -667,7 +649,7 @@ impl Timeline { let is_wal_backup_action_pending: bool; let commit_lsn: Lsn; { - let mut shared_state = self.require_loaded()?; + let mut shared_state = self.write_shared_state(); shared_state.sk.record_safekeeper_info(sk_info)?; is_wal_backup_action_pending = shared_state.update_status(self.zttid); commit_lsn = shared_state.sk.inmem.commit_lsn; @@ -681,39 +663,39 @@ impl Timeline { } /// Add send_wal replica to the in-memory vector of replicas. - pub fn add_replica(&self, state: ReplicaState) -> Result { - let mut shared_state = self.require_loaded()?; - Ok(shared_state.add_replica(state)) + pub fn add_replica(&self, state: ReplicaState) -> usize { + self.write_shared_state().add_replica(state) } /// Update replication replica state. - pub fn update_replica_state(&self, id: usize, state: ReplicaState) -> Result<()> { - let mut shared_state = self.require_loaded()?; + pub fn update_replica_state(&self, id: usize, state: ReplicaState) { + let mut shared_state = self.write_shared_state(); shared_state.replicas[id] = Some(state); - Ok(()) } /// Remove send_wal replica from the in-memory vector of replicas. - pub fn remove_replica(&self, id: usize) -> Result<()> { - let mut shared_state = self.require_loaded()?; + pub fn remove_replica(&self, id: usize) { + let mut shared_state = self.write_shared_state(); assert!(shared_state.replicas[id].is_some()); shared_state.replicas[id] = None; - Ok(()) } /// Returns flush_lsn. - pub fn get_flush_lsn(&self) -> Result { - let shared_state = self.require_loaded()?; - Ok(shared_state.sk.wal_store.flush_lsn()) + pub fn get_flush_lsn(&self) -> Lsn { + self.write_shared_state().sk.wal_store.flush_lsn() } /// Delete WAL segments from disk that are no longer needed. This is determined /// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn. pub fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> { + if self.is_cancelled() { + bail!(TimelineError::Cancelled(self.zttid)); + } + let horizon_segno: XLogSegNo; let remover: Box Result<(), anyhow::Error>>; { - let shared_state = self.require_loaded()?; + let shared_state = self.write_shared_state(); horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled); remover = shared_state.sk.wal_store.remove_up_to(); if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno { @@ -726,100 +708,17 @@ impl Timeline { remover(horizon_segno - 1)?; // update last_removed_segno - let mut shared_state = self.require_loaded()?; + let mut shared_state = self.write_shared_state(); shared_state.last_removed_segno = horizon_segno; Ok(()) } } -#[derive(Clone, Copy, Serialize)] -pub struct TimelineDeleteForceResult { - pub dir_existed: bool, - pub was_active: bool, -} - /// Deletes directory and it's contents. Returns false if directory does not exist. -fn delete_dir(path: PathBuf) -> Result { +fn delete_dir(path: &PathBuf) -> Result { match std::fs::remove_dir_all(path) { Ok(_) => Ok(true), Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false), Err(e) => Err(e.into()), } } - -/// Deactivates and sets TimelineState to Deleted, see `Timeline::delete()`, then deletes the -/// corresponding data directory. We assume all timeline threads will eventually check that -/// timeline is Deleted and terminate without panics. -/// -/// Timeline cannot be recreated because we keep all deleted timelines in memory. It can be -/// accidentally created again if safekeeper will restart and compute will connect to it, but this -/// is very unlikely. -pub async fn delete_force( - conf: &SafeKeeperConf, - zttid: &ZTenantTimelineId, -) -> Result { - info!("deleting timeline {}", zttid); - let timeline = GlobalTimelines::get(*zttid); - - let was_active = timeline.delete().await; - Ok(TimelineDeleteForceResult { - dir_existed: delete_dir(conf.timeline_dir(zttid))?, - was_active, - }) -} - -/// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which -/// the tenant had, `true` if a timeline was active. There may be a race if new timelines are -/// created simultaneously. In that case the function will return error and the caller should -/// try to delete tenant again later. -pub async fn delete_force_all_for_tenant( - conf: &SafeKeeperConf, - tenant_id: &ZTenantId, -) -> Result> { - info!("deleting all timelines for tenant {}", tenant_id); - let to_delete = GlobalTimelines::get_all_for_tenant(*tenant_id); - - let mut err = None; - - let mut deleted = HashMap::new(); - for tli in &to_delete { - let was_active = tli.delete().await; - let res = delete_dir(conf.timeline_dir(&tli.zttid)); - - match res { - Ok(dir_existed) => { - deleted.insert( - tli.zttid, - TimelineDeleteForceResult { - dir_existed, - was_active, - }, - ); - } - Err(e) => { - error!("failed to delete timeline {}: {}", tli.zttid, e); - // Save error to return later. - err = Some(e); - } - } - } - - // There may be inactive timelines, so delete the whole tenant dir as well. - delete_dir(conf.tenant_dir(tenant_id))?; - - let tlis_after_delete = GlobalTimelines::get_all_for_tenant(*tenant_id); - if tlis_after_delete.len() != to_delete.len() { - // Some timelines were created while we were deleting them, returning error - // to the caller, so it can retry later. - bail!( - "failed to delete all timelines for tenant {}: some timelines were created while we were deleting them", - tenant_id - ); - } - - if let Some(e) = err { - Err(e) - } else { - Ok(deleted) - } -} diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 66e0550e18..e594abfd2a 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -1,12 +1,20 @@ //! This module contains global (tenant_id, timeline_id) -> Arc mapping. +//! All timelines should always be present in this map, this is done by loading them +//! all from the disk on startup and keeping them in memory. -use crate::timeline::Timeline; +use crate::safekeeper::ServerInfo; +use crate::timeline::{Timeline, TimelineError}; use crate::SafeKeeperConf; +use anyhow::{anyhow, bail, Context, Result}; use once_cell::sync::Lazy; +use serde::Serialize; use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::{Arc, Mutex, MutexGuard}; use tokio::sync::mpsc::Sender; -use utils::zid::{ZTenantId, ZTenantTimelineId}; +use tracing::*; +use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; struct GlobalTimelinesState { timelines: HashMap>, @@ -14,6 +22,34 @@ struct GlobalTimelinesState { conf: SafeKeeperConf, } +impl GlobalTimelinesState { + /// Get dependencies for a timeline constructor. + fn get_dependencies(&self) -> (SafeKeeperConf, Sender) { + ( + self.conf.clone(), + self.wal_backup_launcher_tx.as_ref().unwrap().clone(), + ) + } + + /// Insert timeline into the map. Returns error if timeline with the same id already exists. + fn try_insert(&mut self, timeline: Arc) -> Result<()> { + let zttid = timeline.zttid; + if self.timelines.contains_key(&zttid) { + bail!(TimelineError::AlreadyExists(zttid)); + } + self.timelines.insert(zttid, timeline); + Ok(()) + } + + /// Get timeline from the map. Returns error if timeline doesn't exist. + fn get(&self, zttid: &ZTenantTimelineId) -> Result> { + self.timelines + .get(zttid) + .cloned() + .ok_or_else(|| anyhow!(TimelineError::NotFound(*zttid))) + } +} + static TIMELINES_STATE: Lazy> = Lazy::new(|| { Mutex::new(GlobalTimelinesState { timelines: HashMap::new(), @@ -26,41 +62,184 @@ static TIMELINES_STATE: Lazy> = Lazy::new(|| { pub struct GlobalTimelines; impl GlobalTimelines { - // Inject dependencies needed for the timeline constructors. - pub fn init(conf: SafeKeeperConf, wal_backup_launcher_tx: Sender) { + /// Inject dependencies needed for the timeline constructors and load all timelines to memory. + pub fn init( + conf: SafeKeeperConf, + wal_backup_launcher_tx: Sender, + ) -> Result<()> { let mut state = TIMELINES_STATE.lock().unwrap(); assert!(state.wal_backup_launcher_tx.is_none()); state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx); state.conf = conf; + + // Iterate through all directories and load tenants for all directories + // named as a valid tenant_id. + let mut tenant_count = 0; + let tenants_dir = state.conf.workdir.clone(); + for tenants_dir_entry in std::fs::read_dir(&tenants_dir) + .with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))? + { + match &tenants_dir_entry { + Ok(tenants_dir_entry) => { + if let Ok(tenant_id) = + ZTenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or("")) + { + tenant_count += 1; + GlobalTimelines::load_tenant_timelines(&mut state, tenant_id)?; + } + } + Err(e) => error!( + "failed to list tenants dir entry {:?} in directory {}, reason: {:?}", + tenants_dir_entry, + tenants_dir.display(), + e + ), + } + } + + info!( + "found {} tenants directories, successfully loaded {} timelines", + tenant_count, + state.timelines.len() + ); + Ok(()) } - /// Get a timeline from the global map. If it doesn't exist in the map, new uninitialised timeline - /// will be added to the map. - pub fn get(zttid: ZTenantTimelineId) -> Arc { - let mut global_lock = TIMELINES_STATE.lock().unwrap(); + /// Loads all timelines for the given tenant to memory. Returns fs::read_dir errors if any. + fn load_tenant_timelines( + state: &mut MutexGuard, + tenant_id: ZTenantId, + ) -> Result<()> { + let timelines_dir = state.conf.tenant_dir(&tenant_id); + for timelines_dir_entry in std::fs::read_dir(&timelines_dir) + .with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))? + { + match &timelines_dir_entry { + Ok(timeline_dir_entry) => { + if let Ok(timeline_id) = + ZTimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or("")) + { + let zttid = ZTenantTimelineId::new(tenant_id, timeline_id); + match Timeline::load_timeline( + state.conf.clone(), + zttid, + state.wal_backup_launcher_tx.as_ref().unwrap().clone(), + ) { + Ok(timeline) => { + state.timelines.insert(zttid, Arc::new(timeline)); + } + // If we can't load a timeline, it's most likely because of a corrupted + // directory. We will log an error and won't allow to delete/recreate + // this timeline. The only way to fix this timeline is to repair manually + // and restart the safekeeper. + Err(e) => error!( + "failed to load timeline {} for tenant {}, reason: {:?}", + timeline_id, tenant_id, e + ), + } + } + } + Err(e) => error!( + "failed to list timelines dir entry {:?} in directory {}, reason: {:?}", + timelines_dir_entry, + timelines_dir.display(), + e + ), + } + } + + Ok(()) + } + + /// Create a new timeline with the given id. If the timeline already exists, returns + /// an existing timeline. + pub fn create(zttid: ZTenantTimelineId, server_info: ServerInfo) -> Result> { + let (conf, wal_backup_launcher_tx) = { + let state = TIMELINES_STATE.lock().unwrap(); + if let Ok(timeline) = state.get(&zttid) { + // Timeline already exists, return it. + return Ok(timeline); + } + state.get_dependencies() + }; + + info!("creating new timeline {}", zttid); + + let timeline = Arc::new(Timeline::create_empty( + conf, + zttid, + wal_backup_launcher_tx, + server_info, + )?); + + // Take a lock and finish the initialization holding this mutex. No other threads + // can interfere with creation after we will insert timeline into the map. + let mut shared_state = timeline.write_shared_state(); + + // We can get a race condition here in case of concurrent create calls, but only + // in theory. create() will return valid timeline on the next try. + TIMELINES_STATE + .lock() + .unwrap() + .try_insert(timeline.clone())?; + + // Write the new timeline to the disk and start background workers. + // Bootstrap is transactional, so if it fails, the timeline will be deleted, + // and the state on disk should remain unchanged. + match timeline.bootstrap(&mut shared_state) { + Ok(_) => { + // We are done with bootstrap, release the lock, return the timeline. + drop(shared_state); + Ok(timeline) + } + Err(e) => { + // Note: the most likely reason for bootstrap failure is that the timeline + // directory already exists on disk. This happens when timeline is corrupted + // and wasn't loaded from disk on startup because of that. We want to preserve + // the timeline directory in this case, for further inspection. + + // TODO: this is an unusual error, perhaps we should send it to sentry + // TODO: compute will try to create timeline every second, we should add backoff + error!("failed to bootstrap timeline {}: {}", zttid, e); + + // Timeline failed to bootstrap, it cannot be used. Remove it from the map. + TIMELINES_STATE.lock().unwrap().timelines.remove(&zttid); + Err(e) + } + } + } + + /// Get a timeline from the global map. If it's not present, it doesn't exist on disk, + /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid, + /// i.e. loaded in memory and not cancelled. + pub fn get(zttid: ZTenantTimelineId) -> Result> { + let global_lock = TIMELINES_STATE.lock().unwrap(); match global_lock.timelines.get(&zttid) { - Some(result) => Arc::clone(result), - None => { - let tli = Arc::new(Timeline::new( - global_lock.conf.clone(), - zttid, - global_lock.wal_backup_launcher_tx.as_ref().unwrap().clone(), - )); - global_lock.timelines.insert(zttid, tli.clone()); - tli + Some(result) => { + if result.is_cancelled() { + anyhow::bail!(TimelineError::Cancelled(zttid)); + } + Ok(Arc::clone(result)) } + None => anyhow::bail!(TimelineError::NotFound(zttid)), } } /// Returns all timelines. This is used for background timeline proccesses. pub fn get_all() -> Vec> { let global_lock = TIMELINES_STATE.lock().unwrap(); - global_lock.timelines.values().cloned().collect() + global_lock + .timelines + .values() + .cloned() + .filter(|t| !t.is_cancelled()) + .collect() } - /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant. - pub fn get_all_for_tenant(tenant_id: ZTenantId) -> Vec> { + /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant, + /// and that's why it can return cancelled timelines, to retry deleting them. + fn get_all_for_tenant(tenant_id: ZTenantId) -> Vec> { let global_lock = TIMELINES_STATE.lock().unwrap(); global_lock .timelines @@ -69,4 +248,88 @@ impl GlobalTimelines { .cloned() .collect() } + + /// Cancels timeline, then deletes the corresponding data directory. + pub fn delete_force(zttid: &ZTenantTimelineId) -> Result { + let timeline = TIMELINES_STATE.lock().unwrap().get(zttid)?; + + // Take a lock and finish the deletion holding this mutex. + let mut shared_state = timeline.write_shared_state(); + + info!("deleting timeline {}", zttid); + let (dir_existed, was_active) = timeline.delete_from_disk(&mut shared_state)?; + + // Remove timeline from the map. + TIMELINES_STATE.lock().unwrap().timelines.remove(zttid); + + Ok(TimelineDeleteForceResult { + dir_existed, + was_active, + }) + } + + /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which + /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are + /// created simultaneously. In that case the function will return error and the caller should + /// retry tenant deletion again later. + pub fn delete_force_all_for_tenant( + tenant_id: &ZTenantId, + ) -> Result> { + info!("deleting all timelines for tenant {}", tenant_id); + let to_delete = Self::get_all_for_tenant(*tenant_id); + + let mut err = None; + + let mut deleted = HashMap::new(); + for tli in &to_delete { + match Self::delete_force(&tli.zttid) { + Ok(result) => { + deleted.insert(tli.zttid, result); + } + Err(e) => { + error!("failed to delete timeline {}: {}", tli.zttid, e); + // Save error to return later. + err = Some(e); + } + } + } + + // If there was an error, return it. + if let Some(e) = err { + return Err(e); + } + + // There may be broken timelines on disk, so delete the whole tenant dir as well. + // Note that we could concurrently create new timelines while we were deleting them, + // so the directory may be not empty. In this case timelines will have bad state + // and timeline background jobs can panic. + delete_dir(TIMELINES_STATE.lock().unwrap().conf.tenant_dir(tenant_id))?; + + let tlis_after_delete = Self::get_all_for_tenant(*tenant_id); + if !tlis_after_delete.is_empty() { + // Some timelines were created while we were deleting them, returning error + // to the caller, so it can retry later. + bail!( + "failed to delete all timelines for tenant {}: some timelines were created while we were deleting them", + tenant_id + ); + } + + Ok(deleted) + } +} + +#[derive(Clone, Copy, Serialize)] +pub struct TimelineDeleteForceResult { + pub dir_existed: bool, + pub was_active: bool, +} + +/// Deletes directory and it's contents. Returns false if directory does not exist. +fn delete_dir(path: PathBuf) -> Result { + match std::fs::remove_dir_all(path) { + Ok(_) => Ok(true), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false), + Err(e) => Err(e.into()), + } } diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index bb6d0f3f85..04a4675d18 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -54,7 +54,9 @@ pub fn wal_backup_launcher_thread_main( /// Check whether wal backup is required for timeline. If yes, mark that launcher is /// aware of current status and return the timeline. fn is_wal_backup_required(zttid: ZTenantTimelineId) -> Option> { - Some(GlobalTimelines::get(zttid)).filter(|tli| tli.wal_backup_attend()) + GlobalTimelines::get(zttid) + .ok() + .filter(|tli| tli.wal_backup_attend()) } struct WalBackupTaskHandle { @@ -200,15 +202,15 @@ async fn backup_task_main( election: Election, ) { info!("started"); - let tli = GlobalTimelines::get(zttid); - let res = tli.get_wal_seg_size(); + let res = GlobalTimelines::get(zttid); if let Err(e) = res { info!("backup error for timeline {}: {}", zttid, e); return; } + let tli = res.unwrap(); let mut wb = WalBackupTask { - wal_seg_size: res.unwrap(), + wal_seg_size: tli.get_wal_seg_size(), commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(), timeline: tli, timeline_dir, @@ -279,12 +281,7 @@ impl WalBackupTask { continue; /* nothing to do, common case as we wake up on every commit_lsn bump */ } // Perhaps peers advanced the position, check shmem value. - let res = self.timeline.get_wal_backup_lsn(); - if let Err(e) = res { - error!("backup error: {}", e); - return; - } - backup_lsn = res.unwrap(); + backup_lsn = self.timeline.get_wal_backup_lsn(); if backup_lsn.segment_number(self.wal_seg_size) >= commit_lsn.segment_number(self.wal_seg_size) { diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index cdf9ecde97..8b41dee01e 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -156,16 +156,14 @@ pub struct PhysicalStorage { } impl PhysicalStorage { + /// Create new storage. If commit_lsn is not zero, flush_lsn is tried to be restored from + /// the disk. Otherwise, all LSNs are set to zero. pub fn new( zttid: &ZTenantTimelineId, conf: &SafeKeeperConf, state: &SafeKeeperState, ) -> Result { let timeline_dir = conf.timeline_dir(zttid); - - if state.server.wal_seg_size == 0 { - bail!("wal_seg_size must be initialized before creating PhysicalStorage"); - } let wal_seg_size = state.server.wal_seg_size as usize; // Find out where stored WAL ends, starting at commit_lsn which is a @@ -399,7 +397,7 @@ impl Storage for PhysicalStorage { /// end_pos must point to the end of the WAL record. fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { // Streaming must not create a hole, so truncate cannot be called on non-written lsn - if self.write_lsn != Lsn(0) && end_pos >= self.write_lsn { + if self.write_lsn != Lsn(0) && end_pos > self.write_lsn { bail!( "truncate_wal called on non-written WAL, write_lsn={}, end_pos={}", self.write_lsn,