Compare commits

...

1 Commits

Author SHA1 Message Date
Arseny Sher
5e650c8c84 safekeeper: move is_cancelled to write_shared_state.
Each time we access shared state we either want to error out if it is cancelled
or ignore that; make that explicit.
2024-05-08 17:31:56 +03:00
2 changed files with 64 additions and 53 deletions

View File

@@ -567,8 +567,27 @@ impl Timeline {
}
/// Take a writing mutual exclusive lock on timeline shared_state.
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
self.mutex.lock().await
///
/// Unless allowed_cancelled is true errors out if timeline is cancelled (deleted).
async fn write_shared_state_internal(
&self,
allow_cancelled: bool,
) -> Result<MutexGuard<SharedState>> {
let state = self.mutex.lock().await;
if !allow_cancelled && self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
Ok(state)
}
pub async fn write_shared_state(&self) -> Result<MutexGuard<SharedState>> {
self.write_shared_state_internal(false).await
}
pub async fn write_shared_state_allow_cancelled(&self) -> MutexGuard<SharedState> {
self.write_shared_state_internal(true)
.await
.expect("timeline is cancelled and allow_cancelled is false")
}
async fn update_status(&self, shared_state: &mut SharedState) -> bool {
@@ -579,11 +598,8 @@ impl Timeline {
/// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
pub async fn update_status_notify(&self) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
let is_wal_backup_action_pending: bool = {
let mut shared_state = self.write_shared_state().await;
let mut shared_state = self.write_shared_state().await?;
self.update_status(&mut shared_state).await
};
if is_wal_backup_action_pending {
@@ -599,10 +615,10 @@ impl Timeline {
/// remote_consistent_lsn update through replication feedback, and we want
/// to stop pushing to the broker if pageserver is fully caughtup.
pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
if self.is_cancelled() {
return true;
}
let shared_state = self.write_shared_state().await;
let shared_state = match self.write_shared_state().await {
Ok(state) => state,
Err(_) => return true,
};
if self.walreceivers.get_num() == 0 {
return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet
reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn;
@@ -610,9 +626,9 @@ impl Timeline {
false
}
/// Ensure taht current term is t, erroring otherwise, and lock the state.
/// Ensure thht current term is t, erroring otherwise, and lock the state.
pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
let ss = self.write_shared_state().await;
let ss = self.write_shared_state().await?;
if ss.sk.state.acceptor_state.term != t {
bail!(
"failed to acquire term {}, current term {}",
@@ -626,13 +642,10 @@ impl Timeline {
/// Returns whether s3 offloading is required and sets current status as
/// matching it.
pub async fn wal_backup_attend(&self) -> bool {
if self.is_cancelled() {
return false;
match self.write_shared_state().await {
Ok(mut state) => state.wal_backup_attend(self.walreceivers.get_num()),
Err(_) => false,
}
self.write_shared_state()
.await
.wal_backup_attend(self.walreceivers.get_num())
}
/// Returns commit_lsn watch channel.
@@ -650,15 +663,11 @@ impl Timeline {
&self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
let mut rmsg: Option<AcceptorProposerMessage>;
let commit_lsn: Lsn;
let term_flush_lsn: TermLsn;
{
let mut shared_state = self.write_shared_state().await;
let mut shared_state = self.write_shared_state().await?;
rmsg = shared_state.sk.process_msg(msg).await?;
// if this is AppendResponse, fill in proper hot standby feedback.
@@ -677,36 +686,38 @@ impl Timeline {
/// Returns wal_seg_size.
pub async fn get_wal_seg_size(&self) -> usize {
self.write_shared_state().await.get_wal_seg_size()
self.write_shared_state_allow_cancelled()
.await
.get_wal_seg_size()
}
/// Returns true only if the timeline is loaded and active.
pub async fn is_active(&self) -> bool {
if self.is_cancelled() {
return false;
match self.write_shared_state().await {
Ok(state) => state.active,
Err(_) => false,
}
self.write_shared_state().await.active
}
/// Returns state of the timeline.
pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
let state = self.write_shared_state().await;
let state = self.write_shared_state_allow_cancelled().await;
(state.sk.state.inmem.clone(), state.sk.state.clone())
}
/// Returns latest backup_lsn.
pub async fn get_wal_backup_lsn(&self) -> Lsn {
self.write_shared_state().await.sk.state.inmem.backup_lsn
self.write_shared_state_allow_cancelled()
.await
.sk
.state
.inmem
.backup_lsn
}
/// Sets backup_lsn to the given value.
pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
let mut state = self.write_shared_state().await;
let mut state = self.write_shared_state().await?;
state.sk.state.inmem.backup_lsn = max(state.sk.state.inmem.backup_lsn, backup_lsn);
// we should check whether to shut down offloader, but this will be done
// soon by peer communication anyway.
@@ -715,7 +726,7 @@ impl Timeline {
/// Get safekeeper info for broadcasting to broker and other peers.
pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
let shared_state = self.write_shared_state().await;
let shared_state = self.write_shared_state_allow_cancelled().await;
shared_state.get_safekeeper_info(&self.ttid, conf)
}
@@ -724,7 +735,7 @@ impl Timeline {
let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn;
{
let mut shared_state = self.write_shared_state().await;
let mut shared_state = self.write_shared_state_allow_cancelled().await;
shared_state.sk.record_safekeeper_info(&sk_info).await?;
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info);
@@ -741,13 +752,13 @@ impl Timeline {
/// Update in memory remote consistent lsn.
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
let mut shared_state = self.write_shared_state().await;
let mut shared_state = self.write_shared_state_allow_cancelled().await;
shared_state.sk.state.inmem.remote_consistent_lsn =
max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate);
}
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.write_shared_state().await;
let shared_state = self.write_shared_state_allow_cancelled().await;
shared_state.get_peers(conf.heartbeat_timeout)
}
@@ -769,7 +780,7 @@ impl Timeline {
/// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
/// Thus we don't try to predict it here.
pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
let ss = self.write_shared_state().await;
let ss = self.write_shared_state_allow_cancelled().await;
let term = ss.sk.state.acceptor_state.term;
let last_log_term = ss.sk.get_epoch();
let flush_lsn = ss.sk.flush_lsn();
@@ -840,16 +851,16 @@ impl Timeline {
/// Returns flush_lsn.
pub async fn get_flush_lsn(&self) -> Lsn {
self.write_shared_state().await.sk.wal_store.flush_lsn()
self.write_shared_state_allow_cancelled()
.await
.sk
.wal_store
.flush_lsn()
}
/// Delete WAL segments from disk that are no longer needed. This is determined
/// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
// If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
// This allows to get better read speed for pageservers that are lagging behind,
// at the cost of keeping more WAL on disk.
@@ -861,7 +872,7 @@ impl Timeline {
let horizon_segno: XLogSegNo;
let remover = {
let shared_state = self.write_shared_state().await;
let shared_state = self.write_shared_state().await?;
horizon_segno =
shared_state.get_horizon_segno(wal_backup_enabled, replication_horizon_lsn);
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
@@ -876,7 +887,7 @@ impl Timeline {
remover.await?;
// update last_removed_segno
let mut shared_state = self.write_shared_state().await;
let mut shared_state = self.write_shared_state_allow_cancelled().await;
shared_state.last_removed_segno = horizon_segno;
Ok(())
}
@@ -887,7 +898,7 @@ impl Timeline {
/// safekeeper reconnections.
pub async fn maybe_persist_control_file(&self) -> Result<()> {
self.write_shared_state()
.await
.await?
.sk
.maybe_persist_inmem_control_file()
.await
@@ -901,7 +912,7 @@ impl Timeline {
}
let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
let state = self.write_shared_state().await;
let state = self.write_shared_state_allow_cancelled().await;
if state.active {
Some(FullTimelineInfo {
ttid: self.ttid,
@@ -924,7 +935,7 @@ impl Timeline {
/// Returns in-memory timeline state to build a full debug dump.
pub async fn memory_dump(&self) -> debug_dump::Memory {
let state = self.write_shared_state().await;
let state = self.write_shared_state_allow_cancelled().await;
let (write_lsn, write_record_lsn, flush_lsn, file_open) =
state.sk.wal_store.internal_state();
@@ -951,7 +962,7 @@ impl Timeline {
&self,
f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
) -> Result<T> {
let mut state = self.write_shared_state().await;
let mut state = self.write_shared_state_allow_cancelled().await;
let mut persistent_state = state.sk.state.start_change();
// If f returns error, we abort the change and don't persist anything.
let res = f(&mut persistent_state)?;

View File

@@ -252,7 +252,7 @@ impl GlobalTimelines {
// Take a lock and finish the initialization holding this mutex. No other threads
// can interfere with creation after we will insert timeline into the map.
{
let mut shared_state = timeline.write_shared_state().await;
let mut shared_state = timeline.write_shared_state_allow_cancelled().await;
// We can get a race condition here in case of concurrent create calls, but only
// in theory. create() will return valid timeline on the next try.
@@ -336,7 +336,7 @@ impl GlobalTimelines {
match tli_res {
Ok(timeline) => {
// Take a lock and finish the deletion holding this mutex.
let mut shared_state = timeline.write_shared_state().await;
let mut shared_state = timeline.write_shared_state_allow_cancelled().await;
info!("deleting timeline {}, only_local={}", ttid, only_local);
let (dir_existed, was_active) =