mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-09 05:30:37 +00:00
Compare commits
1 Commits
conrad/ref
...
sk-move-ca
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5e650c8c84 |
@@ -567,8 +567,27 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Take a writing mutual exclusive lock on timeline shared_state.
|
||||
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
|
||||
self.mutex.lock().await
|
||||
///
|
||||
/// Unless allowed_cancelled is true errors out if timeline is cancelled (deleted).
|
||||
async fn write_shared_state_internal(
|
||||
&self,
|
||||
allow_cancelled: bool,
|
||||
) -> Result<MutexGuard<SharedState>> {
|
||||
let state = self.mutex.lock().await;
|
||||
if !allow_cancelled && self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
pub async fn write_shared_state(&self) -> Result<MutexGuard<SharedState>> {
|
||||
self.write_shared_state_internal(false).await
|
||||
}
|
||||
|
||||
pub async fn write_shared_state_allow_cancelled(&self) -> MutexGuard<SharedState> {
|
||||
self.write_shared_state_internal(true)
|
||||
.await
|
||||
.expect("timeline is cancelled and allow_cancelled is false")
|
||||
}
|
||||
|
||||
async fn update_status(&self, shared_state: &mut SharedState) -> bool {
|
||||
@@ -579,11 +598,8 @@ impl Timeline {
|
||||
|
||||
/// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
|
||||
pub async fn update_status_notify(&self) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
let is_wal_backup_action_pending: bool = {
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
let mut shared_state = self.write_shared_state().await?;
|
||||
self.update_status(&mut shared_state).await
|
||||
};
|
||||
if is_wal_backup_action_pending {
|
||||
@@ -599,10 +615,10 @@ impl Timeline {
|
||||
/// remote_consistent_lsn update through replication feedback, and we want
|
||||
/// to stop pushing to the broker if pageserver is fully caughtup.
|
||||
pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return true;
|
||||
}
|
||||
let shared_state = self.write_shared_state().await;
|
||||
let shared_state = match self.write_shared_state().await {
|
||||
Ok(state) => state,
|
||||
Err(_) => return true,
|
||||
};
|
||||
if self.walreceivers.get_num() == 0 {
|
||||
return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet
|
||||
reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn;
|
||||
@@ -610,9 +626,9 @@ impl Timeline {
|
||||
false
|
||||
}
|
||||
|
||||
/// Ensure taht current term is t, erroring otherwise, and lock the state.
|
||||
/// Ensure thht current term is t, erroring otherwise, and lock the state.
|
||||
pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
|
||||
let ss = self.write_shared_state().await;
|
||||
let ss = self.write_shared_state().await?;
|
||||
if ss.sk.state.acceptor_state.term != t {
|
||||
bail!(
|
||||
"failed to acquire term {}, current term {}",
|
||||
@@ -626,13 +642,10 @@ impl Timeline {
|
||||
/// Returns whether s3 offloading is required and sets current status as
|
||||
/// matching it.
|
||||
pub async fn wal_backup_attend(&self) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return false;
|
||||
match self.write_shared_state().await {
|
||||
Ok(mut state) => state.wal_backup_attend(self.walreceivers.get_num()),
|
||||
Err(_) => false,
|
||||
}
|
||||
|
||||
self.write_shared_state()
|
||||
.await
|
||||
.wal_backup_attend(self.walreceivers.get_num())
|
||||
}
|
||||
|
||||
/// Returns commit_lsn watch channel.
|
||||
@@ -650,15 +663,11 @@ impl Timeline {
|
||||
&self,
|
||||
msg: &ProposerAcceptorMessage,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
|
||||
let mut rmsg: Option<AcceptorProposerMessage>;
|
||||
let commit_lsn: Lsn;
|
||||
let term_flush_lsn: TermLsn;
|
||||
{
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
let mut shared_state = self.write_shared_state().await?;
|
||||
rmsg = shared_state.sk.process_msg(msg).await?;
|
||||
|
||||
// if this is AppendResponse, fill in proper hot standby feedback.
|
||||
@@ -677,36 +686,38 @@ impl Timeline {
|
||||
|
||||
/// Returns wal_seg_size.
|
||||
pub async fn get_wal_seg_size(&self) -> usize {
|
||||
self.write_shared_state().await.get_wal_seg_size()
|
||||
self.write_shared_state_allow_cancelled()
|
||||
.await
|
||||
.get_wal_seg_size()
|
||||
}
|
||||
|
||||
/// Returns true only if the timeline is loaded and active.
|
||||
pub async fn is_active(&self) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return false;
|
||||
match self.write_shared_state().await {
|
||||
Ok(state) => state.active,
|
||||
Err(_) => false,
|
||||
}
|
||||
|
||||
self.write_shared_state().await.active
|
||||
}
|
||||
|
||||
/// Returns state of the timeline.
|
||||
pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
|
||||
let state = self.write_shared_state().await;
|
||||
let state = self.write_shared_state_allow_cancelled().await;
|
||||
(state.sk.state.inmem.clone(), state.sk.state.clone())
|
||||
}
|
||||
|
||||
/// Returns latest backup_lsn.
|
||||
pub async fn get_wal_backup_lsn(&self) -> Lsn {
|
||||
self.write_shared_state().await.sk.state.inmem.backup_lsn
|
||||
self.write_shared_state_allow_cancelled()
|
||||
.await
|
||||
.sk
|
||||
.state
|
||||
.inmem
|
||||
.backup_lsn
|
||||
}
|
||||
|
||||
/// Sets backup_lsn to the given value.
|
||||
pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
|
||||
let mut state = self.write_shared_state().await;
|
||||
let mut state = self.write_shared_state().await?;
|
||||
state.sk.state.inmem.backup_lsn = max(state.sk.state.inmem.backup_lsn, backup_lsn);
|
||||
// we should check whether to shut down offloader, but this will be done
|
||||
// soon by peer communication anyway.
|
||||
@@ -715,7 +726,7 @@ impl Timeline {
|
||||
|
||||
/// Get safekeeper info for broadcasting to broker and other peers.
|
||||
pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
let shared_state = self.write_shared_state_allow_cancelled().await;
|
||||
shared_state.get_safekeeper_info(&self.ttid, conf)
|
||||
}
|
||||
|
||||
@@ -724,7 +735,7 @@ impl Timeline {
|
||||
let is_wal_backup_action_pending: bool;
|
||||
let commit_lsn: Lsn;
|
||||
{
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
let mut shared_state = self.write_shared_state_allow_cancelled().await;
|
||||
shared_state.sk.record_safekeeper_info(&sk_info).await?;
|
||||
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
|
||||
shared_state.peers_info.upsert(&peer_info);
|
||||
@@ -741,13 +752,13 @@ impl Timeline {
|
||||
|
||||
/// Update in memory remote consistent lsn.
|
||||
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
let mut shared_state = self.write_shared_state_allow_cancelled().await;
|
||||
shared_state.sk.state.inmem.remote_consistent_lsn =
|
||||
max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate);
|
||||
}
|
||||
|
||||
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
let shared_state = self.write_shared_state_allow_cancelled().await;
|
||||
shared_state.get_peers(conf.heartbeat_timeout)
|
||||
}
|
||||
|
||||
@@ -769,7 +780,7 @@ impl Timeline {
|
||||
/// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
|
||||
/// Thus we don't try to predict it here.
|
||||
pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
|
||||
let ss = self.write_shared_state().await;
|
||||
let ss = self.write_shared_state_allow_cancelled().await;
|
||||
let term = ss.sk.state.acceptor_state.term;
|
||||
let last_log_term = ss.sk.get_epoch();
|
||||
let flush_lsn = ss.sk.flush_lsn();
|
||||
@@ -840,16 +851,16 @@ impl Timeline {
|
||||
|
||||
/// Returns flush_lsn.
|
||||
pub async fn get_flush_lsn(&self) -> Lsn {
|
||||
self.write_shared_state().await.sk.wal_store.flush_lsn()
|
||||
self.write_shared_state_allow_cancelled()
|
||||
.await
|
||||
.sk
|
||||
.wal_store
|
||||
.flush_lsn()
|
||||
}
|
||||
|
||||
/// Delete WAL segments from disk that are no longer needed. This is determined
|
||||
/// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
|
||||
pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
|
||||
// If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
|
||||
// This allows to get better read speed for pageservers that are lagging behind,
|
||||
// at the cost of keeping more WAL on disk.
|
||||
@@ -861,7 +872,7 @@ impl Timeline {
|
||||
|
||||
let horizon_segno: XLogSegNo;
|
||||
let remover = {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
let shared_state = self.write_shared_state().await?;
|
||||
horizon_segno =
|
||||
shared_state.get_horizon_segno(wal_backup_enabled, replication_horizon_lsn);
|
||||
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
|
||||
@@ -876,7 +887,7 @@ impl Timeline {
|
||||
remover.await?;
|
||||
|
||||
// update last_removed_segno
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
let mut shared_state = self.write_shared_state_allow_cancelled().await;
|
||||
shared_state.last_removed_segno = horizon_segno;
|
||||
Ok(())
|
||||
}
|
||||
@@ -887,7 +898,7 @@ impl Timeline {
|
||||
/// safekeeper reconnections.
|
||||
pub async fn maybe_persist_control_file(&self) -> Result<()> {
|
||||
self.write_shared_state()
|
||||
.await
|
||||
.await?
|
||||
.sk
|
||||
.maybe_persist_inmem_control_file()
|
||||
.await
|
||||
@@ -901,7 +912,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
|
||||
let state = self.write_shared_state().await;
|
||||
let state = self.write_shared_state_allow_cancelled().await;
|
||||
if state.active {
|
||||
Some(FullTimelineInfo {
|
||||
ttid: self.ttid,
|
||||
@@ -924,7 +935,7 @@ impl Timeline {
|
||||
|
||||
/// Returns in-memory timeline state to build a full debug dump.
|
||||
pub async fn memory_dump(&self) -> debug_dump::Memory {
|
||||
let state = self.write_shared_state().await;
|
||||
let state = self.write_shared_state_allow_cancelled().await;
|
||||
|
||||
let (write_lsn, write_record_lsn, flush_lsn, file_open) =
|
||||
state.sk.wal_store.internal_state();
|
||||
@@ -951,7 +962,7 @@ impl Timeline {
|
||||
&self,
|
||||
f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
|
||||
) -> Result<T> {
|
||||
let mut state = self.write_shared_state().await;
|
||||
let mut state = self.write_shared_state_allow_cancelled().await;
|
||||
let mut persistent_state = state.sk.state.start_change();
|
||||
// If f returns error, we abort the change and don't persist anything.
|
||||
let res = f(&mut persistent_state)?;
|
||||
|
||||
@@ -252,7 +252,7 @@ impl GlobalTimelines {
|
||||
// Take a lock and finish the initialization holding this mutex. No other threads
|
||||
// can interfere with creation after we will insert timeline into the map.
|
||||
{
|
||||
let mut shared_state = timeline.write_shared_state().await;
|
||||
let mut shared_state = timeline.write_shared_state_allow_cancelled().await;
|
||||
|
||||
// We can get a race condition here in case of concurrent create calls, but only
|
||||
// in theory. create() will return valid timeline on the next try.
|
||||
@@ -336,7 +336,7 @@ impl GlobalTimelines {
|
||||
match tli_res {
|
||||
Ok(timeline) => {
|
||||
// Take a lock and finish the deletion holding this mutex.
|
||||
let mut shared_state = timeline.write_shared_state().await;
|
||||
let mut shared_state = timeline.write_shared_state_allow_cancelled().await;
|
||||
|
||||
info!("deleting timeline {}, only_local={}", ttid, only_local);
|
||||
let (dir_existed, was_active) =
|
||||
|
||||
Reference in New Issue
Block a user