mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c5412b37a0 | ||
|
|
5e650c8c84 |
@@ -20,7 +20,7 @@ use utils::{bin_ser::LeSer, id::TenantTimelineId};
|
||||
use crate::SafeKeeperConf;
|
||||
|
||||
pub const SK_MAGIC: u32 = 0xcafeceefu32;
|
||||
pub const SK_FORMAT_VERSION: u32 = 8;
|
||||
pub const SK_FORMAT_VERSION: u32 = 9;
|
||||
|
||||
// contains persistent metadata for safekeeper
|
||||
const CONTROL_FILE_NAME: &str = "safekeeper.control";
|
||||
|
||||
@@ -183,6 +183,53 @@ pub struct SafeKeeperStateV7 {
|
||||
pub peers: PersistedPeers,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct SafeKeeperStateV8 {
|
||||
#[serde(with = "hex")]
|
||||
pub tenant_id: TenantId,
|
||||
#[serde(with = "hex")]
|
||||
pub timeline_id: TimelineId,
|
||||
/// persistent acceptor state
|
||||
pub acceptor_state: AcceptorState,
|
||||
/// information about server
|
||||
pub server: ServerInfo,
|
||||
/// Unique id of the last *elected* proposer we dealt with. Not needed
|
||||
/// for correctness, exists for monitoring purposes.
|
||||
#[serde(with = "hex")]
|
||||
pub proposer_uuid: PgUuid,
|
||||
/// Since which LSN this timeline generally starts. Safekeeper might have
|
||||
/// joined later.
|
||||
pub timeline_start_lsn: Lsn,
|
||||
/// Since which LSN safekeeper has (had) WAL for this timeline.
|
||||
/// All WAL segments next to one containing local_start_lsn are
|
||||
/// filled with data from the beginning.
|
||||
pub local_start_lsn: Lsn,
|
||||
/// Part of WAL acknowledged by quorum *and available locally*. Always points
|
||||
/// to record boundary.
|
||||
pub commit_lsn: Lsn,
|
||||
/// LSN that points to the end of the last backed up segment. Useful to
|
||||
/// persist to avoid finding out offloading progress on boot.
|
||||
pub backup_lsn: Lsn,
|
||||
/// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
|
||||
/// of last record streamed to everyone). Persisting it helps skipping
|
||||
/// recovery in walproposer, generally we compute it from peers. In
|
||||
/// walproposer proto called 'truncate_lsn'. Updates are currently drived
|
||||
/// only by walproposer.
|
||||
pub peer_horizon_lsn: Lsn,
|
||||
/// LSN of the oldest known checkpoint made by pageserver and successfully
|
||||
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
|
||||
/// informational purposes, we receive it from pageserver (or broker).
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
/// Peers and their state as we remember it. Knowing peers themselves is
|
||||
/// fundamental; but state is saved here only for informational purposes and
|
||||
/// obviously can be stale. (Currently not saved at all, but let's provision
|
||||
/// place to have less file version upgrades).
|
||||
pub peers: PersistedPeers,
|
||||
/// Holds names of partial segments uploaded to remote storage. Used to
|
||||
/// clean up old objects without leaving garbage in remote storage.
|
||||
pub partial_backup: wal_backup_partial::State,
|
||||
}
|
||||
|
||||
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersistentState> {
|
||||
// migrate to storing full term history
|
||||
if version == 1 {
|
||||
@@ -213,6 +260,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
remote_consistent_lsn: Lsn(0),
|
||||
peers: PersistedPeers(vec![]),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
paused: false,
|
||||
});
|
||||
// migrate to hexing some ids
|
||||
} else if version == 2 {
|
||||
@@ -237,6 +285,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
remote_consistent_lsn: Lsn(0),
|
||||
peers: PersistedPeers(vec![]),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
paused: false,
|
||||
});
|
||||
// migrate to moving tenant_id/timeline_id to the top and adding some lsns
|
||||
} else if version == 3 {
|
||||
@@ -261,6 +310,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
remote_consistent_lsn: Lsn(0),
|
||||
peers: PersistedPeers(vec![]),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
paused: false,
|
||||
});
|
||||
// migrate to having timeline_start_lsn
|
||||
} else if version == 4 {
|
||||
@@ -285,6 +335,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
remote_consistent_lsn: Lsn(0),
|
||||
peers: PersistedPeers(vec![]),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
paused: false,
|
||||
});
|
||||
} else if version == 5 {
|
||||
info!("reading safekeeper control file version {}", version);
|
||||
@@ -329,6 +380,27 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
|
||||
remote_consistent_lsn: oldstate.remote_consistent_lsn,
|
||||
peers: oldstate.peers,
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
paused: false,
|
||||
});
|
||||
} else if version == 8 {
|
||||
info!("reading safekeeper control file version {}", version);
|
||||
let oldstate = SafeKeeperStateV8::des(&buf[..buf.len()])?;
|
||||
|
||||
return Ok(TimelinePersistentState {
|
||||
tenant_id: oldstate.tenant_id,
|
||||
timeline_id: oldstate.timeline_id,
|
||||
acceptor_state: oldstate.acceptor_state,
|
||||
server: oldstate.server,
|
||||
proposer_uuid: oldstate.proposer_uuid,
|
||||
timeline_start_lsn: oldstate.timeline_start_lsn,
|
||||
local_start_lsn: oldstate.local_start_lsn,
|
||||
commit_lsn: oldstate.commit_lsn,
|
||||
backup_lsn: oldstate.backup_lsn,
|
||||
peer_horizon_lsn: oldstate.peer_horizon_lsn,
|
||||
remote_consistent_lsn: oldstate.remote_consistent_lsn,
|
||||
peers: oldstate.peers,
|
||||
partial_backup: oldstate.partial_backup,
|
||||
paused: false,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -194,6 +194,28 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Info about timeline on safekeeper ready for reporting.
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct PauseRequest {
|
||||
pause: bool,
|
||||
}
|
||||
|
||||
async fn timeline_pause_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
|
||||
let data: PauseRequest = json_request(&mut request).await?;
|
||||
let ttid = TenantTimelineId::new(
|
||||
parse_request_param(&request, "tenant_id")?,
|
||||
parse_request_param(&request, "timeline_id")?,
|
||||
);
|
||||
|
||||
let tli = GlobalTimelines::get(ttid)?;
|
||||
tli.set_pause(data.pause)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Pull timeline from peer safekeeper instances.
|
||||
async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
@@ -536,6 +558,9 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
request_span(r, tenant_delete_handler)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_id/timeline/:timeline_id/pause", |r| {
|
||||
request_span(r, timeline_pause_handler)
|
||||
})
|
||||
.post("/v1/pull_timeline", |r| {
|
||||
request_span(r, timeline_pull_handler)
|
||||
})
|
||||
|
||||
@@ -1238,6 +1238,7 @@ mod tests {
|
||||
},
|
||||
)]),
|
||||
partial_backup: crate::wal_backup_partial::State::default(),
|
||||
paused: false,
|
||||
};
|
||||
|
||||
let ser = state.ser().unwrap();
|
||||
@@ -1285,6 +1286,8 @@ mod tests {
|
||||
0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
|
||||
// partial_backup
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
// paused
|
||||
0x00
|
||||
];
|
||||
|
||||
assert_eq!(Hex(&ser), Hex(&expected));
|
||||
|
||||
@@ -63,6 +63,9 @@ pub struct TimelinePersistentState {
|
||||
/// Holds names of partial segments uploaded to remote storage. Used to
|
||||
/// clean up old objects without leaving garbage in remote storage.
|
||||
pub partial_backup: wal_backup_partial::State,
|
||||
/// Paused timeline forbids writes to it (it might be useful to pause any
|
||||
/// other activity as well, but that's a todo).
|
||||
pub paused: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
@@ -98,6 +101,7 @@ impl TimelinePersistentState {
|
||||
.collect(),
|
||||
),
|
||||
partial_backup: wal_backup_partial::State::default(),
|
||||
paused: false,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -313,6 +313,8 @@ impl SharedState {
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum TimelineError {
|
||||
#[error("Timeline {0} is paused")]
|
||||
Paused(TenantTimelineId),
|
||||
#[error("Timeline {0} was cancelled and cannot be used anymore")]
|
||||
Cancelled(TenantTimelineId),
|
||||
#[error("Timeline {0} was not found in global map")]
|
||||
@@ -508,6 +510,27 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Pause timeline, forbidding writes to it, or resume after it has been paused.
|
||||
pub async fn set_pause(&self, paused: bool) -> Result<()> {
|
||||
{
|
||||
let mut ss = self.write_shared_state(true).await?;
|
||||
if paused && !ss.sk.state.paused {
|
||||
let mut persistent_state = ss.sk.state.start_change();
|
||||
persistent_state.paused = true;
|
||||
ss.sk.state.finish_change(&persistent_state).await?;
|
||||
info!("paused timeline {}", self.ttid);
|
||||
}
|
||||
if !paused && ss.sk.state.paused {
|
||||
let mut persistent_state = ss.sk.state.start_change();
|
||||
persistent_state.paused = false;
|
||||
ss.sk.state.finish_change(&persistent_state).await?;
|
||||
info!("resumed timeline {}", self.ttid);
|
||||
}
|
||||
}
|
||||
self.update_status_notify().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete timeline from disk completely, by removing timeline directory.
|
||||
/// Background timeline activities will stop eventually.
|
||||
///
|
||||
@@ -567,8 +590,34 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/// Take a writing mutual exclusive lock on timeline shared_state.
|
||||
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
|
||||
self.mutex.lock().await
|
||||
///
|
||||
/// Unless allowed_cancelled is true errors out if timeline is cancelled (deleted).
|
||||
/// Unless allow_paused is true errors out if timeline is paused: so far pause
|
||||
/// immediately forbids only compute writes, but we might want to pause other
|
||||
/// activites in the future.
|
||||
async fn write_shared_state_internal(
|
||||
&self,
|
||||
allow_cancelled: bool,
|
||||
allow_paused: bool,
|
||||
) -> Result<MutexGuard<SharedState>> {
|
||||
let state = self.mutex.lock().await;
|
||||
if !allow_cancelled && self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
if !allow_paused && state.sk.state.paused {
|
||||
bail!(TimelineError::Paused(self.ttid));
|
||||
}
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
pub async fn write_shared_state(&self, allow_paused: bool) -> Result<MutexGuard<SharedState>> {
|
||||
self.write_shared_state_internal(false, allow_paused).await
|
||||
}
|
||||
|
||||
pub async fn write_shared_state_allow_cancelled(&self) -> MutexGuard<SharedState> {
|
||||
self.write_shared_state_internal(true, true)
|
||||
.await
|
||||
.expect("timeline is cancelled and allow_cancelled is false")
|
||||
}
|
||||
|
||||
async fn update_status(&self, shared_state: &mut SharedState) -> bool {
|
||||
@@ -579,11 +628,8 @@ impl Timeline {
|
||||
|
||||
/// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
|
||||
pub async fn update_status_notify(&self) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
let is_wal_backup_action_pending: bool = {
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
let mut shared_state = self.write_shared_state_allow_cancelled().await;
|
||||
self.update_status(&mut shared_state).await
|
||||
};
|
||||
if is_wal_backup_action_pending {
|
||||
@@ -599,10 +645,10 @@ impl Timeline {
|
||||
/// remote_consistent_lsn update through replication feedback, and we want
|
||||
/// to stop pushing to the broker if pageserver is fully caughtup.
|
||||
pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return true;
|
||||
}
|
||||
let shared_state = self.write_shared_state().await;
|
||||
let shared_state = match self.write_shared_state(false).await {
|
||||
Ok(state) => state,
|
||||
Err(_) => return true,
|
||||
};
|
||||
if self.walreceivers.get_num() == 0 {
|
||||
return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet
|
||||
reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn;
|
||||
@@ -610,9 +656,9 @@ impl Timeline {
|
||||
false
|
||||
}
|
||||
|
||||
/// Ensure taht current term is t, erroring otherwise, and lock the state.
|
||||
/// Ensure that current term is t, erroring otherwise, and lock the state.
|
||||
pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
|
||||
let ss = self.write_shared_state().await;
|
||||
let ss = self.write_shared_state(false).await?;
|
||||
if ss.sk.state.acceptor_state.term != t {
|
||||
bail!(
|
||||
"failed to acquire term {}, current term {}",
|
||||
@@ -626,13 +672,12 @@ impl Timeline {
|
||||
/// Returns whether s3 offloading is required and sets current status as
|
||||
/// matching it.
|
||||
pub async fn wal_backup_attend(&self) -> bool {
|
||||
if self.is_cancelled() {
|
||||
let mut ss = self.write_shared_state_allow_cancelled().await;
|
||||
if self.is_cancelled() || ss.sk.state.paused {
|
||||
ss.wal_backup_active = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
self.write_shared_state()
|
||||
.await
|
||||
.wal_backup_attend(self.walreceivers.get_num())
|
||||
ss.wal_backup_attend(self.walreceivers.get_num())
|
||||
}
|
||||
|
||||
/// Returns commit_lsn watch channel.
|
||||
@@ -650,15 +695,11 @@ impl Timeline {
|
||||
&self,
|
||||
msg: &ProposerAcceptorMessage,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
|
||||
let mut rmsg: Option<AcceptorProposerMessage>;
|
||||
let commit_lsn: Lsn;
|
||||
let term_flush_lsn: TermLsn;
|
||||
{
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
let mut shared_state = self.write_shared_state(false).await?;
|
||||
rmsg = shared_state.sk.process_msg(msg).await?;
|
||||
|
||||
// if this is AppendResponse, fill in proper hot standby feedback.
|
||||
@@ -677,36 +718,38 @@ impl Timeline {
|
||||
|
||||
/// Returns wal_seg_size.
|
||||
pub async fn get_wal_seg_size(&self) -> usize {
|
||||
self.write_shared_state().await.get_wal_seg_size()
|
||||
self.write_shared_state_allow_cancelled()
|
||||
.await
|
||||
.get_wal_seg_size()
|
||||
}
|
||||
|
||||
/// Returns true only if the timeline is loaded and active.
|
||||
pub async fn is_active(&self) -> bool {
|
||||
if self.is_cancelled() {
|
||||
return false;
|
||||
match self.write_shared_state(false).await {
|
||||
Ok(state) => state.active,
|
||||
Err(_) => false,
|
||||
}
|
||||
|
||||
self.write_shared_state().await.active
|
||||
}
|
||||
|
||||
/// Returns state of the timeline.
|
||||
pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
|
||||
let state = self.write_shared_state().await;
|
||||
let state = self.write_shared_state_allow_cancelled().await;
|
||||
(state.sk.state.inmem.clone(), state.sk.state.clone())
|
||||
}
|
||||
|
||||
/// Returns latest backup_lsn.
|
||||
pub async fn get_wal_backup_lsn(&self) -> Lsn {
|
||||
self.write_shared_state().await.sk.state.inmem.backup_lsn
|
||||
self.write_shared_state_allow_cancelled()
|
||||
.await
|
||||
.sk
|
||||
.state
|
||||
.inmem
|
||||
.backup_lsn
|
||||
}
|
||||
|
||||
/// Sets backup_lsn to the given value.
|
||||
pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
|
||||
let mut state = self.write_shared_state().await;
|
||||
let mut state = self.write_shared_state(true).await?;
|
||||
state.sk.state.inmem.backup_lsn = max(state.sk.state.inmem.backup_lsn, backup_lsn);
|
||||
// we should check whether to shut down offloader, but this will be done
|
||||
// soon by peer communication anyway.
|
||||
@@ -715,7 +758,7 @@ impl Timeline {
|
||||
|
||||
/// Get safekeeper info for broadcasting to broker and other peers.
|
||||
pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
let shared_state = self.write_shared_state_allow_cancelled().await;
|
||||
shared_state.get_safekeeper_info(&self.ttid, conf)
|
||||
}
|
||||
|
||||
@@ -724,7 +767,7 @@ impl Timeline {
|
||||
let is_wal_backup_action_pending: bool;
|
||||
let commit_lsn: Lsn;
|
||||
{
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
let mut shared_state = self.write_shared_state_allow_cancelled().await;
|
||||
shared_state.sk.record_safekeeper_info(&sk_info).await?;
|
||||
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
|
||||
shared_state.peers_info.upsert(&peer_info);
|
||||
@@ -741,13 +784,13 @@ impl Timeline {
|
||||
|
||||
/// Update in memory remote consistent lsn.
|
||||
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
let mut shared_state = self.write_shared_state_allow_cancelled().await;
|
||||
shared_state.sk.state.inmem.remote_consistent_lsn =
|
||||
max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate);
|
||||
}
|
||||
|
||||
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
let shared_state = self.write_shared_state_allow_cancelled().await;
|
||||
shared_state.get_peers(conf.heartbeat_timeout)
|
||||
}
|
||||
|
||||
@@ -769,7 +812,7 @@ impl Timeline {
|
||||
/// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
|
||||
/// Thus we don't try to predict it here.
|
||||
pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
|
||||
let ss = self.write_shared_state().await;
|
||||
let ss = self.write_shared_state_allow_cancelled().await;
|
||||
let term = ss.sk.state.acceptor_state.term;
|
||||
let last_log_term = ss.sk.get_epoch();
|
||||
let flush_lsn = ss.sk.flush_lsn();
|
||||
@@ -840,16 +883,16 @@ impl Timeline {
|
||||
|
||||
/// Returns flush_lsn.
|
||||
pub async fn get_flush_lsn(&self) -> Lsn {
|
||||
self.write_shared_state().await.sk.wal_store.flush_lsn()
|
||||
self.write_shared_state_allow_cancelled()
|
||||
.await
|
||||
.sk
|
||||
.wal_store
|
||||
.flush_lsn()
|
||||
}
|
||||
|
||||
/// Delete WAL segments from disk that are no longer needed. This is determined
|
||||
/// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
|
||||
pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
|
||||
if self.is_cancelled() {
|
||||
bail!(TimelineError::Cancelled(self.ttid));
|
||||
}
|
||||
|
||||
// If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
|
||||
// This allows to get better read speed for pageservers that are lagging behind,
|
||||
// at the cost of keeping more WAL on disk.
|
||||
@@ -861,7 +904,7 @@ impl Timeline {
|
||||
|
||||
let horizon_segno: XLogSegNo;
|
||||
let remover = {
|
||||
let shared_state = self.write_shared_state().await;
|
||||
let shared_state = self.write_shared_state(true).await?;
|
||||
horizon_segno =
|
||||
shared_state.get_horizon_segno(wal_backup_enabled, replication_horizon_lsn);
|
||||
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
|
||||
@@ -876,7 +919,7 @@ impl Timeline {
|
||||
remover.await?;
|
||||
|
||||
// update last_removed_segno
|
||||
let mut shared_state = self.write_shared_state().await;
|
||||
let mut shared_state = self.write_shared_state_allow_cancelled().await;
|
||||
shared_state.last_removed_segno = horizon_segno;
|
||||
Ok(())
|
||||
}
|
||||
@@ -886,8 +929,8 @@ impl Timeline {
|
||||
/// to date so that storage nodes restart doesn't cause many pageserver ->
|
||||
/// safekeeper reconnections.
|
||||
pub async fn maybe_persist_control_file(&self) -> Result<()> {
|
||||
self.write_shared_state()
|
||||
.await
|
||||
self.write_shared_state(true)
|
||||
.await?
|
||||
.sk
|
||||
.maybe_persist_inmem_control_file()
|
||||
.await
|
||||
@@ -901,7 +944,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
|
||||
let state = self.write_shared_state().await;
|
||||
let state = self.write_shared_state_allow_cancelled().await;
|
||||
if state.active {
|
||||
Some(FullTimelineInfo {
|
||||
ttid: self.ttid,
|
||||
@@ -924,7 +967,7 @@ impl Timeline {
|
||||
|
||||
/// Returns in-memory timeline state to build a full debug dump.
|
||||
pub async fn memory_dump(&self) -> debug_dump::Memory {
|
||||
let state = self.write_shared_state().await;
|
||||
let state = self.write_shared_state_allow_cancelled().await;
|
||||
|
||||
let (write_lsn, write_record_lsn, flush_lsn, file_open) =
|
||||
state.sk.wal_store.internal_state();
|
||||
@@ -951,7 +994,7 @@ impl Timeline {
|
||||
&self,
|
||||
f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
|
||||
) -> Result<T> {
|
||||
let mut state = self.write_shared_state().await;
|
||||
let mut state = self.write_shared_state_allow_cancelled().await;
|
||||
let mut persistent_state = state.sk.state.start_change();
|
||||
// If f returns error, we abort the change and don't persist anything.
|
||||
let res = f(&mut persistent_state)?;
|
||||
|
||||
@@ -252,7 +252,7 @@ impl GlobalTimelines {
|
||||
// Take a lock and finish the initialization holding this mutex. No other threads
|
||||
// can interfere with creation after we will insert timeline into the map.
|
||||
{
|
||||
let mut shared_state = timeline.write_shared_state().await;
|
||||
let mut shared_state = timeline.write_shared_state_allow_cancelled().await;
|
||||
|
||||
// We can get a race condition here in case of concurrent create calls, but only
|
||||
// in theory. create() will return valid timeline on the next try.
|
||||
@@ -336,7 +336,7 @@ impl GlobalTimelines {
|
||||
match tli_res {
|
||||
Ok(timeline) => {
|
||||
// Take a lock and finish the deletion holding this mutex.
|
||||
let mut shared_state = timeline.write_shared_state().await;
|
||||
let mut shared_state = timeline.write_shared_state_allow_cancelled().await;
|
||||
|
||||
info!("deleting timeline {}, only_local={}", ttid, only_local);
|
||||
let (dir_existed, was_active) =
|
||||
|
||||
@@ -77,6 +77,21 @@ class SafekeeperHttpClient(requests.Session):
|
||||
assert res_json is None
|
||||
return res_json
|
||||
|
||||
def set_pause(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
pause: bool,
|
||||
):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/pause",
|
||||
json={
|
||||
"pause": pause,
|
||||
},
|
||||
)
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
|
||||
params = params or {}
|
||||
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)
|
||||
|
||||
@@ -531,6 +531,43 @@ def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
|
||||
asyncio.run(run_recovery_uncommitted(env))
|
||||
|
||||
|
||||
async def run_pause(env: NeonEnv):
|
||||
(sk1, sk2, _) = env.safekeepers
|
||||
(sk1_http, sk2_http, _) = [sk.http_client() for sk in env.safekeepers]
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_pause")
|
||||
|
||||
ep = env.endpoints.create_start("test_pause")
|
||||
ep.safe_psql("create table t(key int, value text)")
|
||||
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
|
||||
|
||||
# Halt two safekeepers, query should hang
|
||||
sk1_http.set_pause(tenant_id, timeline_id, True)
|
||||
sk2_http.set_pause(tenant_id, timeline_id, True)
|
||||
conn = await ep.connect_async()
|
||||
# query should hang, so execute in separate task
|
||||
bg_query = asyncio.create_task(
|
||||
conn.execute("insert into t select generate_series(1, 200), 'payload'")
|
||||
)
|
||||
sleep_sec = 2
|
||||
await asyncio.sleep(sleep_sec)
|
||||
# it must still be not finished
|
||||
assert not bg_query.done()
|
||||
|
||||
# resume timeline on one of paused safekeepers, should be enough to commit
|
||||
sk2_http.set_pause(tenant_id, timeline_id, False)
|
||||
ep.safe_psql("insert into t select generate_series(1, 2000), 'payload'")
|
||||
|
||||
|
||||
# Test timeline pause
|
||||
def test_pause(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
asyncio.run(run_pause(env))
|
||||
|
||||
|
||||
async def run_segment_init_failure(env: NeonEnv):
|
||||
env.neon_cli.create_branch("test_segment_init_failure")
|
||||
ep = env.endpoints.create_start("test_segment_init_failure")
|
||||
|
||||
Reference in New Issue
Block a user