Compare commits

...

2 Commits

Author SHA1 Message Date
Arseny Sher
c5412b37a0 safekeeper: implement timeline pause.
So far it immediately stops only writes to the timeline, which is already might
be useful.
2024-05-13 15:10:28 +03:00
Arseny Sher
5e650c8c84 safekeeper: move is_cancelled to write_shared_state.
Each time we access shared state we either want to error out if it is cancelled
or ignore that; make that explicit.
2024-05-08 17:31:56 +03:00
9 changed files with 253 additions and 54 deletions

View File

@@ -20,7 +20,7 @@ use utils::{bin_ser::LeSer, id::TenantTimelineId};
use crate::SafeKeeperConf;
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 8;
pub const SK_FORMAT_VERSION: u32 = 9;
// contains persistent metadata for safekeeper
const CONTROL_FILE_NAME: &str = "safekeeper.control";

View File

@@ -183,6 +183,53 @@ pub struct SafeKeeperStateV7 {
pub peers: PersistedPeers,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SafeKeeperStateV8 {
#[serde(with = "hex")]
pub tenant_id: TenantId,
#[serde(with = "hex")]
pub timeline_id: TimelineId,
/// persistent acceptor state
pub acceptor_state: AcceptorState,
/// information about server
pub server: ServerInfo,
/// Unique id of the last *elected* proposer we dealt with. Not needed
/// for correctness, exists for monitoring purposes.
#[serde(with = "hex")]
pub proposer_uuid: PgUuid,
/// Since which LSN this timeline generally starts. Safekeeper might have
/// joined later.
pub timeline_start_lsn: Lsn,
/// Since which LSN safekeeper has (had) WAL for this timeline.
/// All WAL segments next to one containing local_start_lsn are
/// filled with data from the beginning.
pub local_start_lsn: Lsn,
/// Part of WAL acknowledged by quorum *and available locally*. Always points
/// to record boundary.
pub commit_lsn: Lsn,
/// LSN that points to the end of the last backed up segment. Useful to
/// persist to avoid finding out offloading progress on boot.
pub backup_lsn: Lsn,
/// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn
/// of last record streamed to everyone). Persisting it helps skipping
/// recovery in walproposer, generally we compute it from peers. In
/// walproposer proto called 'truncate_lsn'. Updates are currently drived
/// only by walproposer.
pub peer_horizon_lsn: Lsn,
/// LSN of the oldest known checkpoint made by pageserver and successfully
/// pushed to s3. We don't remove WAL beyond it. Persisted only for
/// informational purposes, we receive it from pageserver (or broker).
pub remote_consistent_lsn: Lsn,
/// Peers and their state as we remember it. Knowing peers themselves is
/// fundamental; but state is saved here only for informational purposes and
/// obviously can be stale. (Currently not saved at all, but let's provision
/// place to have less file version upgrades).
pub peers: PersistedPeers,
/// Holds names of partial segments uploaded to remote storage. Used to
/// clean up old objects without leaving garbage in remote storage.
pub partial_backup: wal_backup_partial::State,
}
pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersistentState> {
// migrate to storing full term history
if version == 1 {
@@ -213,6 +260,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(vec![]),
partial_backup: wal_backup_partial::State::default(),
paused: false,
});
// migrate to hexing some ids
} else if version == 2 {
@@ -237,6 +285,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(vec![]),
partial_backup: wal_backup_partial::State::default(),
paused: false,
});
// migrate to moving tenant_id/timeline_id to the top and adding some lsns
} else if version == 3 {
@@ -261,6 +310,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(vec![]),
partial_backup: wal_backup_partial::State::default(),
paused: false,
});
// migrate to having timeline_start_lsn
} else if version == 4 {
@@ -285,6 +335,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
remote_consistent_lsn: Lsn(0),
peers: PersistedPeers(vec![]),
partial_backup: wal_backup_partial::State::default(),
paused: false,
});
} else if version == 5 {
info!("reading safekeeper control file version {}", version);
@@ -329,6 +380,27 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
remote_consistent_lsn: oldstate.remote_consistent_lsn,
peers: oldstate.peers,
partial_backup: wal_backup_partial::State::default(),
paused: false,
});
} else if version == 8 {
info!("reading safekeeper control file version {}", version);
let oldstate = SafeKeeperStateV8::des(&buf[..buf.len()])?;
return Ok(TimelinePersistentState {
tenant_id: oldstate.tenant_id,
timeline_id: oldstate.timeline_id,
acceptor_state: oldstate.acceptor_state,
server: oldstate.server,
proposer_uuid: oldstate.proposer_uuid,
timeline_start_lsn: oldstate.timeline_start_lsn,
local_start_lsn: oldstate.local_start_lsn,
commit_lsn: oldstate.commit_lsn,
backup_lsn: oldstate.backup_lsn,
peer_horizon_lsn: oldstate.peer_horizon_lsn,
remote_consistent_lsn: oldstate.remote_consistent_lsn,
peers: oldstate.peers,
partial_backup: oldstate.partial_backup,
paused: false,
});
}

View File

@@ -194,6 +194,28 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
json_response(StatusCode::OK, ())
}
/// Info about timeline on safekeeper ready for reporting.
#[derive(Debug, Serialize, Deserialize)]
struct PauseRequest {
pause: bool,
}
async fn timeline_pause_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let data: PauseRequest = json_request(&mut request).await?;
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
let tli = GlobalTimelines::get(ttid)?;
tli.set_pause(data.pause)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
/// Pull timeline from peer safekeeper instances.
async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
@@ -536,6 +558,9 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
.delete("/v1/tenant/:tenant_id", |r| {
request_span(r, tenant_delete_handler)
})
.post("/v1/tenant/:tenant_id/timeline/:timeline_id/pause", |r| {
request_span(r, timeline_pause_handler)
})
.post("/v1/pull_timeline", |r| {
request_span(r, timeline_pull_handler)
})

View File

@@ -1238,6 +1238,7 @@ mod tests {
},
)]),
partial_backup: crate::wal_backup_partial::State::default(),
paused: false,
};
let ser = state.ser().unwrap();
@@ -1285,6 +1286,8 @@ mod tests {
0xb0, 0x01, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00,
// partial_backup
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
// paused
0x00
];
assert_eq!(Hex(&ser), Hex(&expected));

View File

@@ -63,6 +63,9 @@ pub struct TimelinePersistentState {
/// Holds names of partial segments uploaded to remote storage. Used to
/// clean up old objects without leaving garbage in remote storage.
pub partial_backup: wal_backup_partial::State,
/// Paused timeline forbids writes to it (it might be useful to pause any
/// other activity as well, but that's a todo).
pub paused: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@@ -98,6 +101,7 @@ impl TimelinePersistentState {
.collect(),
),
partial_backup: wal_backup_partial::State::default(),
paused: false,
}
}

View File

@@ -313,6 +313,8 @@ impl SharedState {
#[derive(Debug, thiserror::Error)]
pub enum TimelineError {
#[error("Timeline {0} is paused")]
Paused(TenantTimelineId),
#[error("Timeline {0} was cancelled and cannot be used anymore")]
Cancelled(TenantTimelineId),
#[error("Timeline {0} was not found in global map")]
@@ -508,6 +510,27 @@ impl Timeline {
}
}
/// Pause timeline, forbidding writes to it, or resume after it has been paused.
pub async fn set_pause(&self, paused: bool) -> Result<()> {
{
let mut ss = self.write_shared_state(true).await?;
if paused && !ss.sk.state.paused {
let mut persistent_state = ss.sk.state.start_change();
persistent_state.paused = true;
ss.sk.state.finish_change(&persistent_state).await?;
info!("paused timeline {}", self.ttid);
}
if !paused && ss.sk.state.paused {
let mut persistent_state = ss.sk.state.start_change();
persistent_state.paused = false;
ss.sk.state.finish_change(&persistent_state).await?;
info!("resumed timeline {}", self.ttid);
}
}
self.update_status_notify().await?;
Ok(())
}
/// Delete timeline from disk completely, by removing timeline directory.
/// Background timeline activities will stop eventually.
///
@@ -567,8 +590,34 @@ impl Timeline {
}
/// Take a writing mutual exclusive lock on timeline shared_state.
pub async fn write_shared_state(&self) -> MutexGuard<SharedState> {
self.mutex.lock().await
///
/// Unless allowed_cancelled is true errors out if timeline is cancelled (deleted).
/// Unless allow_paused is true errors out if timeline is paused: so far pause
/// immediately forbids only compute writes, but we might want to pause other
/// activites in the future.
async fn write_shared_state_internal(
&self,
allow_cancelled: bool,
allow_paused: bool,
) -> Result<MutexGuard<SharedState>> {
let state = self.mutex.lock().await;
if !allow_cancelled && self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
if !allow_paused && state.sk.state.paused {
bail!(TimelineError::Paused(self.ttid));
}
Ok(state)
}
pub async fn write_shared_state(&self, allow_paused: bool) -> Result<MutexGuard<SharedState>> {
self.write_shared_state_internal(false, allow_paused).await
}
pub async fn write_shared_state_allow_cancelled(&self) -> MutexGuard<SharedState> {
self.write_shared_state_internal(true, true)
.await
.expect("timeline is cancelled and allow_cancelled is false")
}
async fn update_status(&self, shared_state: &mut SharedState) -> bool {
@@ -579,11 +628,8 @@ impl Timeline {
/// Update timeline status and kick wal backup launcher to stop/start offloading if needed.
pub async fn update_status_notify(&self) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
let is_wal_backup_action_pending: bool = {
let mut shared_state = self.write_shared_state().await;
let mut shared_state = self.write_shared_state_allow_cancelled().await;
self.update_status(&mut shared_state).await
};
if is_wal_backup_action_pending {
@@ -599,10 +645,10 @@ impl Timeline {
/// remote_consistent_lsn update through replication feedback, and we want
/// to stop pushing to the broker if pageserver is fully caughtup.
pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool {
if self.is_cancelled() {
return true;
}
let shared_state = self.write_shared_state().await;
let shared_state = match self.write_shared_state(false).await {
Ok(state) => state,
Err(_) => return true,
};
if self.walreceivers.get_num() == 0 {
return shared_state.sk.state.inmem.commit_lsn == Lsn(0) || // no data at all yet
reported_remote_consistent_lsn >= shared_state.sk.state.inmem.commit_lsn;
@@ -610,9 +656,9 @@ impl Timeline {
false
}
/// Ensure taht current term is t, erroring otherwise, and lock the state.
/// Ensure that current term is t, erroring otherwise, and lock the state.
pub async fn acquire_term(&self, t: Term) -> Result<MutexGuard<SharedState>> {
let ss = self.write_shared_state().await;
let ss = self.write_shared_state(false).await?;
if ss.sk.state.acceptor_state.term != t {
bail!(
"failed to acquire term {}, current term {}",
@@ -626,13 +672,12 @@ impl Timeline {
/// Returns whether s3 offloading is required and sets current status as
/// matching it.
pub async fn wal_backup_attend(&self) -> bool {
if self.is_cancelled() {
let mut ss = self.write_shared_state_allow_cancelled().await;
if self.is_cancelled() || ss.sk.state.paused {
ss.wal_backup_active = false;
return false;
}
self.write_shared_state()
.await
.wal_backup_attend(self.walreceivers.get_num())
ss.wal_backup_attend(self.walreceivers.get_num())
}
/// Returns commit_lsn watch channel.
@@ -650,15 +695,11 @@ impl Timeline {
&self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
let mut rmsg: Option<AcceptorProposerMessage>;
let commit_lsn: Lsn;
let term_flush_lsn: TermLsn;
{
let mut shared_state = self.write_shared_state().await;
let mut shared_state = self.write_shared_state(false).await?;
rmsg = shared_state.sk.process_msg(msg).await?;
// if this is AppendResponse, fill in proper hot standby feedback.
@@ -677,36 +718,38 @@ impl Timeline {
/// Returns wal_seg_size.
pub async fn get_wal_seg_size(&self) -> usize {
self.write_shared_state().await.get_wal_seg_size()
self.write_shared_state_allow_cancelled()
.await
.get_wal_seg_size()
}
/// Returns true only if the timeline is loaded and active.
pub async fn is_active(&self) -> bool {
if self.is_cancelled() {
return false;
match self.write_shared_state(false).await {
Ok(state) => state.active,
Err(_) => false,
}
self.write_shared_state().await.active
}
/// Returns state of the timeline.
pub async fn get_state(&self) -> (TimelineMemState, TimelinePersistentState) {
let state = self.write_shared_state().await;
let state = self.write_shared_state_allow_cancelled().await;
(state.sk.state.inmem.clone(), state.sk.state.clone())
}
/// Returns latest backup_lsn.
pub async fn get_wal_backup_lsn(&self) -> Lsn {
self.write_shared_state().await.sk.state.inmem.backup_lsn
self.write_shared_state_allow_cancelled()
.await
.sk
.state
.inmem
.backup_lsn
}
/// Sets backup_lsn to the given value.
pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
let mut state = self.write_shared_state().await;
let mut state = self.write_shared_state(true).await?;
state.sk.state.inmem.backup_lsn = max(state.sk.state.inmem.backup_lsn, backup_lsn);
// we should check whether to shut down offloader, but this will be done
// soon by peer communication anyway.
@@ -715,7 +758,7 @@ impl Timeline {
/// Get safekeeper info for broadcasting to broker and other peers.
pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
let shared_state = self.write_shared_state().await;
let shared_state = self.write_shared_state_allow_cancelled().await;
shared_state.get_safekeeper_info(&self.ttid, conf)
}
@@ -724,7 +767,7 @@ impl Timeline {
let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn;
{
let mut shared_state = self.write_shared_state().await;
let mut shared_state = self.write_shared_state_allow_cancelled().await;
shared_state.sk.record_safekeeper_info(&sk_info).await?;
let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info);
@@ -741,13 +784,13 @@ impl Timeline {
/// Update in memory remote consistent lsn.
pub async fn update_remote_consistent_lsn(&self, candidate: Lsn) {
let mut shared_state = self.write_shared_state().await;
let mut shared_state = self.write_shared_state_allow_cancelled().await;
shared_state.sk.state.inmem.remote_consistent_lsn =
max(shared_state.sk.state.inmem.remote_consistent_lsn, candidate);
}
pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec<PeerInfo> {
let shared_state = self.write_shared_state().await;
let shared_state = self.write_shared_state_allow_cancelled().await;
shared_state.get_peers(conf.heartbeat_timeout)
}
@@ -769,7 +812,7 @@ impl Timeline {
/// depending on assembled quorum (e.g. classic picture 8 from Raft paper).
/// Thus we don't try to predict it here.
pub async fn recovery_needed(&self, heartbeat_timeout: Duration) -> RecoveryNeededInfo {
let ss = self.write_shared_state().await;
let ss = self.write_shared_state_allow_cancelled().await;
let term = ss.sk.state.acceptor_state.term;
let last_log_term = ss.sk.get_epoch();
let flush_lsn = ss.sk.flush_lsn();
@@ -840,16 +883,16 @@ impl Timeline {
/// Returns flush_lsn.
pub async fn get_flush_lsn(&self) -> Lsn {
self.write_shared_state().await.sk.wal_store.flush_lsn()
self.write_shared_state_allow_cancelled()
.await
.sk
.wal_store
.flush_lsn()
}
/// Delete WAL segments from disk that are no longer needed. This is determined
/// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn.
pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> {
if self.is_cancelled() {
bail!(TimelineError::Cancelled(self.ttid));
}
// If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
// This allows to get better read speed for pageservers that are lagging behind,
// at the cost of keeping more WAL on disk.
@@ -861,7 +904,7 @@ impl Timeline {
let horizon_segno: XLogSegNo;
let remover = {
let shared_state = self.write_shared_state().await;
let shared_state = self.write_shared_state(true).await?;
horizon_segno =
shared_state.get_horizon_segno(wal_backup_enabled, replication_horizon_lsn);
if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno {
@@ -876,7 +919,7 @@ impl Timeline {
remover.await?;
// update last_removed_segno
let mut shared_state = self.write_shared_state().await;
let mut shared_state = self.write_shared_state_allow_cancelled().await;
shared_state.last_removed_segno = horizon_segno;
Ok(())
}
@@ -886,8 +929,8 @@ impl Timeline {
/// to date so that storage nodes restart doesn't cause many pageserver ->
/// safekeeper reconnections.
pub async fn maybe_persist_control_file(&self) -> Result<()> {
self.write_shared_state()
.await
self.write_shared_state(true)
.await?
.sk
.maybe_persist_inmem_control_file()
.await
@@ -901,7 +944,7 @@ impl Timeline {
}
let (ps_feedback_count, last_ps_feedback) = self.walsenders.get_ps_feedback_stats();
let state = self.write_shared_state().await;
let state = self.write_shared_state_allow_cancelled().await;
if state.active {
Some(FullTimelineInfo {
ttid: self.ttid,
@@ -924,7 +967,7 @@ impl Timeline {
/// Returns in-memory timeline state to build a full debug dump.
pub async fn memory_dump(&self) -> debug_dump::Memory {
let state = self.write_shared_state().await;
let state = self.write_shared_state_allow_cancelled().await;
let (write_lsn, write_record_lsn, flush_lsn, file_open) =
state.sk.wal_store.internal_state();
@@ -951,7 +994,7 @@ impl Timeline {
&self,
f: impl FnOnce(&mut TimelinePersistentState) -> Result<T>,
) -> Result<T> {
let mut state = self.write_shared_state().await;
let mut state = self.write_shared_state_allow_cancelled().await;
let mut persistent_state = state.sk.state.start_change();
// If f returns error, we abort the change and don't persist anything.
let res = f(&mut persistent_state)?;

View File

@@ -252,7 +252,7 @@ impl GlobalTimelines {
// Take a lock and finish the initialization holding this mutex. No other threads
// can interfere with creation after we will insert timeline into the map.
{
let mut shared_state = timeline.write_shared_state().await;
let mut shared_state = timeline.write_shared_state_allow_cancelled().await;
// We can get a race condition here in case of concurrent create calls, but only
// in theory. create() will return valid timeline on the next try.
@@ -336,7 +336,7 @@ impl GlobalTimelines {
match tli_res {
Ok(timeline) => {
// Take a lock and finish the deletion holding this mutex.
let mut shared_state = timeline.write_shared_state().await;
let mut shared_state = timeline.write_shared_state_allow_cancelled().await;
info!("deleting timeline {}, only_local={}", ttid, only_local);
let (dir_existed, was_active) =

View File

@@ -77,6 +77,21 @@ class SafekeeperHttpClient(requests.Session):
assert res_json is None
return res_json
def set_pause(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
pause: bool,
):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/pause",
json={
"pause": pause,
},
)
res.raise_for_status()
return res.json()
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
params = params or {}
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)

View File

@@ -531,6 +531,43 @@ def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
asyncio.run(run_recovery_uncommitted(env))
async def run_pause(env: NeonEnv):
(sk1, sk2, _) = env.safekeepers
(sk1_http, sk2_http, _) = [sk.http_client() for sk in env.safekeepers]
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_pause")
ep = env.endpoints.create_start("test_pause")
ep.safe_psql("create table t(key int, value text)")
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
# Halt two safekeepers, query should hang
sk1_http.set_pause(tenant_id, timeline_id, True)
sk2_http.set_pause(tenant_id, timeline_id, True)
conn = await ep.connect_async()
# query should hang, so execute in separate task
bg_query = asyncio.create_task(
conn.execute("insert into t select generate_series(1, 200), 'payload'")
)
sleep_sec = 2
await asyncio.sleep(sleep_sec)
# it must still be not finished
assert not bg_query.done()
# resume timeline on one of paused safekeepers, should be enough to commit
sk2_http.set_pause(tenant_id, timeline_id, False)
ep.safe_psql("insert into t select generate_series(1, 2000), 'payload'")
# Test timeline pause
def test_pause(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
asyncio.run(run_pause(env))
async def run_segment_init_failure(env: NeonEnv):
env.neon_cli.create_branch("test_segment_init_failure")
ep = env.endpoints.create_start("test_segment_init_failure")