mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
fix(pageserver): LSN lease edge cases around restarts/migrations (#9055)
Part of #7497, closes #8817. ## Problem See #8817. ## Summary of changes **compute_ctl** - Renew lsn lease as soon as `/configure` updates pageserver_connstr, use `state_changed` Condvar for synchronization. **pageserver** As mentioned in https://github.com/neondatabase/neon/issues/8817#issuecomment-2315768076, we still want some permanent error reported if a lease cannot be granted. By considering attachment mode and the added `lsn_lease_deadline` when processing lease requests, we can also bound the case of bad requests to a very short period after migration/restart. - Refactor https://github.com/neondatabase/neon/pull/9024 and move `lsn_lease_deadline` to `AttachedTenantConf` so timeline can easily access it. - Have separate HTTP `init_lsn_lease` and libpq `renew_lsn_lease` API. - Always do LSN verification for the initial HTTP lease request. - LSN verification for the renewal is **still done** when tenants are not in `AttachedSingle` and we have pass the `lsn_lease_deadline`, which give plenty of time for compute to renew the lease. **neon_local** - add and call `timeline_init_lsn_lease` mgmt_api at static endpoint start. The initial lsn lease http request is sent when we run `cargo neon endpoint start <static endpoint>`. ## Testing - Extend `test_readonly_node_gc` to do pageserver restarts and migration. ## Future Work - The control plane should make the initial lease request through HTTP when creating a static endpoint. This is currently only done in `neon_local`. Signed-off-by: Yuchen Liang <yuchen@neon.tech>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1321,6 +1321,7 @@ dependencies = [
|
||||
"clap",
|
||||
"comfy-table",
|
||||
"compute_api",
|
||||
"futures",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"hyper 0.14.30",
|
||||
|
||||
@@ -10,6 +10,7 @@ use std::sync::atomic::AtomicU32;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Condvar, Mutex, RwLock};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
@@ -1398,6 +1399,36 @@ LIMIT 100",
|
||||
}
|
||||
Ok(remote_ext_metrics)
|
||||
}
|
||||
|
||||
/// Waits until current thread receives a state changed notification and
|
||||
/// the pageserver connection strings has changed.
|
||||
///
|
||||
/// The operation will time out after a specified duration.
|
||||
pub fn wait_timeout_while_pageserver_connstr_unchanged(&self, duration: Duration) {
|
||||
let state = self.state.lock().unwrap();
|
||||
let old_pageserver_connstr = state
|
||||
.pspec
|
||||
.as_ref()
|
||||
.expect("spec must be set")
|
||||
.pageserver_connstr
|
||||
.clone();
|
||||
let mut unchanged = true;
|
||||
let _ = self
|
||||
.state_changed
|
||||
.wait_timeout_while(state, duration, |s| {
|
||||
let pageserver_connstr = &s
|
||||
.pspec
|
||||
.as_ref()
|
||||
.expect("spec must be set")
|
||||
.pageserver_connstr;
|
||||
unchanged = pageserver_connstr == &old_pageserver_connstr;
|
||||
unchanged
|
||||
})
|
||||
.unwrap();
|
||||
if !unchanged {
|
||||
info!("Pageserver config changed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn forward_termination_signal() {
|
||||
|
||||
@@ -57,10 +57,10 @@ fn lsn_lease_bg_task(
|
||||
.max(valid_duration / 2);
|
||||
|
||||
info!(
|
||||
"Succeeded, sleeping for {} seconds",
|
||||
"Request succeeded, sleeping for {} seconds",
|
||||
sleep_duration.as_secs()
|
||||
);
|
||||
thread::sleep(sleep_duration);
|
||||
compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,10 +89,7 @@ fn acquire_lsn_lease_with_retry(
|
||||
.map(|connstr| {
|
||||
let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr");
|
||||
if let Some(storage_auth_token) = &spec.storage_auth_token {
|
||||
info!("Got storage auth token from spec file");
|
||||
config.password(storage_auth_token.clone());
|
||||
} else {
|
||||
info!("Storage auth token not set");
|
||||
}
|
||||
config
|
||||
})
|
||||
@@ -108,9 +105,11 @@ fn acquire_lsn_lease_with_retry(
|
||||
bail!("Permanent error: lease could not be obtained, LSN is behind the GC cutoff");
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Failed to acquire lsn lease: {e} (attempt {attempts}");
|
||||
warn!("Failed to acquire lsn lease: {e} (attempt {attempts})");
|
||||
|
||||
thread::sleep(Duration::from_millis(retry_period_ms as u64));
|
||||
compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis(
|
||||
retry_period_ms as u64,
|
||||
));
|
||||
retry_period_ms *= 1.5;
|
||||
retry_period_ms = retry_period_ms.min(MAX_RETRY_PERIOD_MS);
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ anyhow.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
comfy-table.workspace = true
|
||||
futures.workspace = true
|
||||
humantime.workspace = true
|
||||
nix.workspace = true
|
||||
once_cell.workspace = true
|
||||
|
||||
@@ -894,17 +894,27 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
|
||||
// to pass these on to postgres.
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
let locate_result = storage_controller.tenant_locate(endpoint.tenant_id).await?;
|
||||
let pageservers = locate_result
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|shard| {
|
||||
(
|
||||
let pageservers = futures::future::try_join_all(
|
||||
locate_result.shards.into_iter().map(|shard| async move {
|
||||
if let ComputeMode::Static(lsn) = endpoint.mode {
|
||||
// Initialize LSN leases for static computes.
|
||||
let conf = env.get_pageserver_conf(shard.node_id).unwrap();
|
||||
let pageserver = PageServerNode::from_env(env, conf);
|
||||
|
||||
pageserver
|
||||
.http_client
|
||||
.timeline_init_lsn_lease(shard.shard_id, endpoint.timeline_id, lsn)
|
||||
.await?;
|
||||
}
|
||||
|
||||
anyhow::Ok((
|
||||
Host::parse(&shard.listen_pg_addr)
|
||||
.expect("Storage controller reported bad hostname"),
|
||||
shard.listen_pg_port,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
))
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
let stripe_size = locate_result.shard_params.stripe_size;
|
||||
|
||||
(pageservers, stripe_size)
|
||||
|
||||
@@ -736,4 +736,22 @@ impl Client {
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn timeline_init_lsn_lease(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Result<LsnLease> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/lsn_lease",
|
||||
self.mgmt_api_endpoint,
|
||||
);
|
||||
|
||||
self.request(Method::POST, &uri, LsnLeaseRequest { lsn })
|
||||
.await?
|
||||
.json()
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -824,7 +824,7 @@ async fn get_lsn_by_timestamp_handler(
|
||||
|
||||
let lease = if with_lease {
|
||||
timeline
|
||||
.make_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), &ctx)
|
||||
.init_lsn_lease(lsn, timeline.get_lsn_lease_length_for_ts(), &ctx)
|
||||
.inspect_err(|_| {
|
||||
warn!("fail to grant a lease to {}", lsn);
|
||||
})
|
||||
@@ -1692,9 +1692,18 @@ async fn lsn_lease_handler(
|
||||
let timeline =
|
||||
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
|
||||
.await?;
|
||||
let result = timeline
|
||||
.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx)
|
||||
.map_err(|e| ApiError::InternalServerError(e.context("lsn lease http handler")))?;
|
||||
|
||||
let result = async {
|
||||
timeline
|
||||
.init_lsn_lease(lsn, timeline.get_lsn_lease_length(), &ctx)
|
||||
.map_err(|e| {
|
||||
ApiError::InternalServerError(
|
||||
e.context(format!("invalid lsn lease request at {lsn}")),
|
||||
)
|
||||
})
|
||||
}
|
||||
.instrument(info_span!("init_lsn_lease", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, result)
|
||||
}
|
||||
|
||||
@@ -833,7 +833,7 @@ impl PageServerHandler {
|
||||
set_tracing_field_shard_id(&timeline);
|
||||
|
||||
let lease = timeline
|
||||
.make_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
|
||||
.renew_lsn_lease(lsn, timeline.get_lsn_lease_length(), ctx)
|
||||
.inspect_err(|e| {
|
||||
warn!("{e}");
|
||||
})
|
||||
|
||||
@@ -21,6 +21,7 @@ use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::models;
|
||||
use pageserver_api::models::AuxFilePolicy;
|
||||
use pageserver_api::models::LsnLease;
|
||||
use pageserver_api::models::TimelineArchivalState;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use pageserver_api::models::TopTenantShardItem;
|
||||
@@ -182,27 +183,54 @@ pub struct TenantSharedResources {
|
||||
pub(super) struct AttachedTenantConf {
|
||||
tenant_conf: TenantConfOpt,
|
||||
location: AttachedLocationConfig,
|
||||
/// The deadline before which we are blocked from GC so that
|
||||
/// leases have a chance to be renewed.
|
||||
lsn_lease_deadline: Option<tokio::time::Instant>,
|
||||
}
|
||||
|
||||
impl AttachedTenantConf {
|
||||
fn new(tenant_conf: TenantConfOpt, location: AttachedLocationConfig) -> Self {
|
||||
// Sets a deadline before which we cannot proceed to GC due to lsn lease.
|
||||
//
|
||||
// We do this as the leases mapping are not persisted to disk. By delaying GC by lease
|
||||
// length, we guarantee that all the leases we granted before will have a chance to renew
|
||||
// when we run GC for the first time after restart / transition from AttachedMulti to AttachedSingle.
|
||||
let lsn_lease_deadline = if location.attach_mode == AttachmentMode::Single {
|
||||
Some(
|
||||
tokio::time::Instant::now()
|
||||
+ tenant_conf
|
||||
.lsn_lease_length
|
||||
.unwrap_or(LsnLease::DEFAULT_LENGTH),
|
||||
)
|
||||
} else {
|
||||
// We don't use `lsn_lease_deadline` to delay GC in AttachedMulti and AttachedStale
|
||||
// because we don't do GC in these modes.
|
||||
None
|
||||
};
|
||||
|
||||
Self {
|
||||
tenant_conf,
|
||||
location,
|
||||
lsn_lease_deadline,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
|
||||
match &location_conf.mode {
|
||||
LocationMode::Attached(attach_conf) => Ok(Self {
|
||||
tenant_conf: location_conf.tenant_conf,
|
||||
location: *attach_conf,
|
||||
}),
|
||||
LocationMode::Attached(attach_conf) => {
|
||||
Ok(Self::new(location_conf.tenant_conf, *attach_conf))
|
||||
}
|
||||
LocationMode::Secondary(_) => {
|
||||
anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_gc_blocked_by_lsn_lease_deadline(&self) -> bool {
|
||||
self.lsn_lease_deadline
|
||||
.map(|d| tokio::time::Instant::now() < d)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
}
|
||||
struct TimelinePreload {
|
||||
timeline_id: TimelineId,
|
||||
@@ -1822,6 +1850,11 @@ impl Tenant {
|
||||
info!("Skipping GC in location state {:?}", conf.location);
|
||||
return Ok(GcResult::default());
|
||||
}
|
||||
|
||||
if conf.is_gc_blocked_by_lsn_lease_deadline() {
|
||||
info!("Skipping GC because lsn lease deadline is not reached");
|
||||
return Ok(GcResult::default());
|
||||
}
|
||||
}
|
||||
|
||||
let _guard = match self.gc_block.start().await {
|
||||
@@ -2630,6 +2663,8 @@ impl Tenant {
|
||||
Arc::new(AttachedTenantConf {
|
||||
tenant_conf: new_tenant_conf.clone(),
|
||||
location: inner.location,
|
||||
// Attached location is not changed, no need to update lsn lease deadline.
|
||||
lsn_lease_deadline: inner.lsn_lease_deadline,
|
||||
})
|
||||
});
|
||||
|
||||
@@ -4461,13 +4496,17 @@ mod tests {
|
||||
tline.freeze_and_flush().await.map_err(|e| e.into())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) =
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")
|
||||
.await?
|
||||
.load()
|
||||
.await;
|
||||
// Advance to the lsn lease deadline so that GC is not blocked by
|
||||
// initial transition into AttachedSingle.
|
||||
tokio::time::advance(tenant.get_lsn_lease_length()).await;
|
||||
tokio::time::resume();
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
@@ -7244,9 +7283,17 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_lsn_lease() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_lsn_lease").await?.load().await;
|
||||
let (tenant, ctx) = TenantHarness::create("test_lsn_lease")
|
||||
.await
|
||||
.unwrap()
|
||||
.load()
|
||||
.await;
|
||||
// Advance to the lsn lease deadline so that GC is not blocked by
|
||||
// initial transition into AttachedSingle.
|
||||
tokio::time::advance(tenant.get_lsn_lease_length()).await;
|
||||
tokio::time::resume();
|
||||
let key = Key::from_hex("010000000033333333444444445500000000").unwrap();
|
||||
|
||||
let end_lsn = Lsn(0x100);
|
||||
@@ -7274,24 +7321,33 @@ mod tests {
|
||||
|
||||
let leased_lsns = [0x30, 0x50, 0x70];
|
||||
let mut leases = Vec::new();
|
||||
let _: anyhow::Result<_> = leased_lsns.iter().try_for_each(|n| {
|
||||
leases.push(timeline.make_lsn_lease(Lsn(*n), timeline.get_lsn_lease_length(), &ctx)?);
|
||||
Ok(())
|
||||
leased_lsns.iter().for_each(|n| {
|
||||
leases.push(
|
||||
timeline
|
||||
.init_lsn_lease(Lsn(*n), timeline.get_lsn_lease_length(), &ctx)
|
||||
.expect("lease request should succeed"),
|
||||
);
|
||||
});
|
||||
|
||||
// Renewing with shorter lease should not change the lease.
|
||||
let updated_lease_0 =
|
||||
timeline.make_lsn_lease(Lsn(leased_lsns[0]), Duration::from_secs(0), &ctx)?;
|
||||
assert_eq!(updated_lease_0.valid_until, leases[0].valid_until);
|
||||
let updated_lease_0 = timeline
|
||||
.renew_lsn_lease(Lsn(leased_lsns[0]), Duration::from_secs(0), &ctx)
|
||||
.expect("lease renewal should succeed");
|
||||
assert_eq!(
|
||||
updated_lease_0.valid_until, leases[0].valid_until,
|
||||
" Renewing with shorter lease should not change the lease."
|
||||
);
|
||||
|
||||
// Renewing with a long lease should renew lease with later expiration time.
|
||||
let updated_lease_1 = timeline.make_lsn_lease(
|
||||
Lsn(leased_lsns[1]),
|
||||
timeline.get_lsn_lease_length() * 2,
|
||||
&ctx,
|
||||
)?;
|
||||
|
||||
assert!(updated_lease_1.valid_until > leases[1].valid_until);
|
||||
let updated_lease_1 = timeline
|
||||
.renew_lsn_lease(
|
||||
Lsn(leased_lsns[1]),
|
||||
timeline.get_lsn_lease_length() * 2,
|
||||
&ctx,
|
||||
)
|
||||
.expect("lease renewal should succeed");
|
||||
assert!(
|
||||
updated_lease_1.valid_until > leases[1].valid_until,
|
||||
"Renewing with a long lease should renew lease with later expiration time."
|
||||
);
|
||||
|
||||
// Force set disk consistent lsn so we can get the cutoff at `end_lsn`.
|
||||
info!(
|
||||
@@ -7308,7 +7364,8 @@ mod tests {
|
||||
&CancellationToken::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Keeping everything <= Lsn(0x80) b/c leases:
|
||||
// 0/10: initdb layer
|
||||
@@ -7322,13 +7379,16 @@ mod tests {
|
||||
// Make lease on a already GC-ed LSN.
|
||||
// 0/80 does not have a valid lease + is below latest_gc_cutoff
|
||||
assert!(Lsn(0x80) < *timeline.get_latest_gc_cutoff_lsn());
|
||||
let res = timeline.make_lsn_lease(Lsn(0x80), timeline.get_lsn_lease_length(), &ctx);
|
||||
assert!(res.is_err());
|
||||
timeline
|
||||
.init_lsn_lease(Lsn(0x80), timeline.get_lsn_lease_length(), &ctx)
|
||||
.expect_err("lease request on GC-ed LSN should fail");
|
||||
|
||||
// Should still be able to renew a currently valid lease
|
||||
// Assumption: original lease to is still valid for 0/50.
|
||||
let _ =
|
||||
timeline.make_lsn_lease(Lsn(leased_lsns[1]), timeline.get_lsn_lease_length(), &ctx)?;
|
||||
// (use `Timeline::init_lsn_lease` for testing so it always does validation)
|
||||
timeline
|
||||
.init_lsn_lease(Lsn(leased_lsns[1]), timeline.get_lsn_lease_length(), &ctx)
|
||||
.expect("lease renewal with validation should succeed");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,29 +1,12 @@
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use super::remote_timeline_client::index::GcBlockingReason;
|
||||
use tokio::time::Instant;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
type TimelinesBlocked = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
|
||||
use super::remote_timeline_client::index::GcBlockingReason;
|
||||
|
||||
#[derive(Default)]
|
||||
struct Storage {
|
||||
timelines_blocked: TimelinesBlocked,
|
||||
/// The deadline before which we are blocked from GC so that
|
||||
/// leases have a chance to be renewed.
|
||||
lsn_lease_deadline: Option<Instant>,
|
||||
}
|
||||
type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
|
||||
|
||||
impl Storage {
|
||||
fn is_blocked_by_lsn_lease_deadline(&self) -> bool {
|
||||
self.lsn_lease_deadline
|
||||
.map(|d| Instant::now() < d)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
}
|
||||
|
||||
/// GcBlock provides persistent (per-timeline) gc blocking and facilitates transient time based gc
|
||||
/// blocking.
|
||||
/// GcBlock provides persistent (per-timeline) gc blocking.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct GcBlock {
|
||||
/// The timelines which have current reasons to block gc.
|
||||
@@ -66,17 +49,6 @@ impl GcBlock {
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets a deadline before which we cannot proceed to GC due to lsn lease.
|
||||
///
|
||||
/// We do this as the leases mapping are not persisted to disk. By delaying GC by lease
|
||||
/// length, we guarantee that all the leases we granted before will have a chance to renew
|
||||
/// when we run GC for the first time after restart / transition from AttachedMulti to AttachedSingle.
|
||||
pub(super) fn set_lsn_lease_deadline(&self, lsn_lease_length: Duration) {
|
||||
let deadline = Instant::now() + lsn_lease_length;
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
g.lsn_lease_deadline = Some(deadline);
|
||||
}
|
||||
|
||||
/// Describe the current gc blocking reasons.
|
||||
///
|
||||
/// TODO: make this json serializable.
|
||||
@@ -102,7 +74,7 @@ impl GcBlock {
|
||||
) -> anyhow::Result<bool> {
|
||||
let (added, uploaded) = {
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
let set = g.timelines_blocked.entry(timeline.timeline_id).or_default();
|
||||
let set = g.entry(timeline.timeline_id).or_default();
|
||||
let added = set.insert(reason);
|
||||
|
||||
// LOCK ORDER: intentionally hold the lock, see self.reasons.
|
||||
@@ -133,7 +105,7 @@ impl GcBlock {
|
||||
|
||||
let (remaining_blocks, uploaded) = {
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
match g.timelines_blocked.entry(timeline.timeline_id) {
|
||||
match g.entry(timeline.timeline_id) {
|
||||
Entry::Occupied(mut oe) => {
|
||||
let set = oe.get_mut();
|
||||
set.remove(reason);
|
||||
@@ -147,7 +119,7 @@ impl GcBlock {
|
||||
}
|
||||
}
|
||||
|
||||
let remaining_blocks = g.timelines_blocked.len();
|
||||
let remaining_blocks = g.len();
|
||||
|
||||
// LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
|
||||
let uploaded = timeline
|
||||
@@ -172,11 +144,11 @@ impl GcBlock {
|
||||
pub(crate) fn before_delete(&self, timeline: &super::Timeline) {
|
||||
let unblocked = {
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
if g.timelines_blocked.is_empty() {
|
||||
if g.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
g.timelines_blocked.remove(&timeline.timeline_id);
|
||||
g.remove(&timeline.timeline_id);
|
||||
|
||||
BlockingReasons::clean_and_summarize(g).is_none()
|
||||
};
|
||||
@@ -187,11 +159,10 @@ impl GcBlock {
|
||||
}
|
||||
|
||||
/// Initialize with the non-deleted timelines of this tenant.
|
||||
pub(crate) fn set_scanned(&self, scanned: TimelinesBlocked) {
|
||||
pub(crate) fn set_scanned(&self, scanned: Storage) {
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
assert!(g.timelines_blocked.is_empty());
|
||||
g.timelines_blocked
|
||||
.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
|
||||
assert!(g.is_empty());
|
||||
g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
|
||||
|
||||
if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
|
||||
tracing::info!(summary=?reasons, "initialized with gc blocked");
|
||||
@@ -205,7 +176,6 @@ pub(super) struct Guard<'a> {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct BlockingReasons {
|
||||
tenant_blocked_by_lsn_lease_deadline: bool,
|
||||
timelines: usize,
|
||||
reasons: enumset::EnumSet<GcBlockingReason>,
|
||||
}
|
||||
@@ -214,8 +184,8 @@ impl std::fmt::Display for BlockingReasons {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"tenant_blocked_by_lsn_lease_deadline: {}, {} timelines block for {:?}",
|
||||
self.tenant_blocked_by_lsn_lease_deadline, self.timelines, self.reasons
|
||||
"{} timelines block for {:?}",
|
||||
self.timelines, self.reasons
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -223,15 +193,13 @@ impl std::fmt::Display for BlockingReasons {
|
||||
impl BlockingReasons {
|
||||
fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
|
||||
let mut reasons = enumset::EnumSet::empty();
|
||||
g.timelines_blocked.retain(|_key, value| {
|
||||
g.retain(|_key, value| {
|
||||
reasons = reasons.union(*value);
|
||||
!value.is_empty()
|
||||
});
|
||||
let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline();
|
||||
if !g.timelines_blocked.is_empty() || blocked_by_lsn_lease_deadline {
|
||||
if !g.is_empty() {
|
||||
Some(BlockingReasons {
|
||||
tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline,
|
||||
timelines: g.timelines_blocked.len(),
|
||||
timelines: g.len(),
|
||||
reasons,
|
||||
})
|
||||
} else {
|
||||
@@ -240,17 +208,14 @@ impl BlockingReasons {
|
||||
}
|
||||
|
||||
fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
|
||||
let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline();
|
||||
if g.timelines_blocked.is_empty() && !blocked_by_lsn_lease_deadline {
|
||||
if g.is_empty() {
|
||||
None
|
||||
} else {
|
||||
let reasons = g
|
||||
.timelines_blocked
|
||||
.values()
|
||||
.fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
|
||||
Some(BlockingReasons {
|
||||
tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline,
|
||||
timelines: g.timelines_blocked.len(),
|
||||
timelines: g.len(),
|
||||
reasons,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -949,12 +949,6 @@ impl TenantManager {
|
||||
(LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
|
||||
match attach_conf.generation.cmp(&tenant.generation) {
|
||||
Ordering::Equal => {
|
||||
if attach_conf.attach_mode == AttachmentMode::Single {
|
||||
tenant
|
||||
.gc_block
|
||||
.set_lsn_lease_deadline(tenant.get_lsn_lease_length());
|
||||
}
|
||||
|
||||
// A transition from Attached to Attached in the same generation, we may
|
||||
// take our fast path and just provide the updated configuration
|
||||
// to the tenant.
|
||||
|
||||
@@ -330,7 +330,6 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
|
||||
let mut first = true;
|
||||
tenant.gc_block.set_lsn_lease_deadline(tenant.get_lsn_lease_length());
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
|
||||
@@ -66,6 +66,7 @@ use std::{
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
tenant::{
|
||||
config::AttachmentMode,
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
metadata::TimelineMetadata,
|
||||
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc},
|
||||
@@ -1324,16 +1325,38 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Obtains a temporary lease blocking garbage collection for the given LSN.
|
||||
///
|
||||
/// This function will error if the requesting LSN is less than the `latest_gc_cutoff_lsn` and there is also
|
||||
/// no existing lease to renew. If there is an existing lease in the map, the lease will be renewed only if
|
||||
/// the request extends the lease. The returned lease is therefore the maximum between the existing lease and
|
||||
/// the requesting lease.
|
||||
pub(crate) fn make_lsn_lease(
|
||||
/// Initializes an LSN lease. The function will return an error if the requested LSN is less than the `latest_gc_cutoff_lsn`.
|
||||
pub(crate) fn init_lsn_lease(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
length: Duration,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<LsnLease> {
|
||||
self.make_lsn_lease(lsn, length, true, ctx)
|
||||
}
|
||||
|
||||
/// Renews a lease at a particular LSN. The requested LSN is not validated against the `latest_gc_cutoff_lsn` when we are in the grace period.
|
||||
pub(crate) fn renew_lsn_lease(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
length: Duration,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<LsnLease> {
|
||||
self.make_lsn_lease(lsn, length, false, ctx)
|
||||
}
|
||||
|
||||
/// Obtains a temporary lease blocking garbage collection for the given LSN.
|
||||
///
|
||||
/// If we are in `AttachedSingle` mode and is not blocked by the lsn lease deadline, this function will error
|
||||
/// if the requesting LSN is less than the `latest_gc_cutoff_lsn` and there is no existing request present.
|
||||
///
|
||||
/// If there is an existing lease in the map, the lease will be renewed only if the request extends the lease.
|
||||
/// The returned lease is therefore the maximum between the existing lease and the requesting lease.
|
||||
fn make_lsn_lease(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
length: Duration,
|
||||
init: bool,
|
||||
_ctx: &RequestContext,
|
||||
) -> anyhow::Result<LsnLease> {
|
||||
let lease = {
|
||||
@@ -1347,8 +1370,8 @@ impl Timeline {
|
||||
|
||||
let entry = gc_info.leases.entry(lsn);
|
||||
|
||||
let lease = {
|
||||
if let Entry::Occupied(mut occupied) = entry {
|
||||
match entry {
|
||||
Entry::Occupied(mut occupied) => {
|
||||
let existing_lease = occupied.get_mut();
|
||||
if valid_until > existing_lease.valid_until {
|
||||
existing_lease.valid_until = valid_until;
|
||||
@@ -1360,20 +1383,28 @@ impl Timeline {
|
||||
}
|
||||
|
||||
existing_lease.clone()
|
||||
} else {
|
||||
// Reject already GC-ed LSN (lsn < latest_gc_cutoff)
|
||||
let latest_gc_cutoff_lsn = self.get_latest_gc_cutoff_lsn();
|
||||
if lsn < *latest_gc_cutoff_lsn {
|
||||
bail!("tried to request a page version that was garbage collected. requested at {} gc cutoff {}", lsn, *latest_gc_cutoff_lsn);
|
||||
}
|
||||
Entry::Vacant(vacant) => {
|
||||
// Reject already GC-ed LSN (lsn < latest_gc_cutoff) if we are in AttachedSingle and
|
||||
// not blocked by the lsn lease deadline.
|
||||
let validate = {
|
||||
let conf = self.tenant_conf.load();
|
||||
conf.location.attach_mode == AttachmentMode::Single
|
||||
&& !conf.is_gc_blocked_by_lsn_lease_deadline()
|
||||
};
|
||||
|
||||
if init || validate {
|
||||
let latest_gc_cutoff_lsn = self.get_latest_gc_cutoff_lsn();
|
||||
if lsn < *latest_gc_cutoff_lsn {
|
||||
bail!("tried to request a page version that was garbage collected. requested at {} gc cutoff {}", lsn, *latest_gc_cutoff_lsn);
|
||||
}
|
||||
}
|
||||
|
||||
let dt: DateTime<Utc> = valid_until.into();
|
||||
info!("lease created, valid until {}", dt);
|
||||
entry.or_insert(LsnLease { valid_until }).clone()
|
||||
vacant.insert(LsnLease { valid_until }).clone()
|
||||
}
|
||||
};
|
||||
|
||||
lease
|
||||
}
|
||||
};
|
||||
|
||||
Ok(lease)
|
||||
@@ -1950,8 +1981,6 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length)
|
||||
}
|
||||
|
||||
// TODO(yuchen): remove unused flag after implementing https://github.com/neondatabase/neon/issues/8072
|
||||
#[allow(unused)]
|
||||
pub(crate) fn get_lsn_lease_length_for_ts(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
|
||||
@@ -27,7 +27,7 @@ def test_readonly_node(neon_simple_env: NeonEnv):
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*basebackup .* failed: invalid basebackup lsn.*",
|
||||
".*page_service.*handle_make_lsn_lease.*.*tried to request a page version that was garbage collected",
|
||||
".*/lsn_lease.*invalid lsn lease request.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -108,7 +108,7 @@ def test_readonly_node(neon_simple_env: NeonEnv):
|
||||
assert cur.fetchone() == (1,)
|
||||
|
||||
# Create node at pre-initdb lsn
|
||||
with pytest.raises(Exception, match="invalid basebackup lsn"):
|
||||
with pytest.raises(Exception, match="invalid lsn lease request"):
|
||||
# compute node startup with invalid LSN should fail
|
||||
env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
@@ -167,6 +167,23 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
return last_flush_lsn
|
||||
|
||||
def trigger_gc_and_select(env: NeonEnv, ep_static: Endpoint):
|
||||
"""
|
||||
Trigger GC manually on all pageservers. Then run an `SELECT` query.
|
||||
"""
|
||||
for shard, ps in tenant_get_shards(env, env.initial_tenant):
|
||||
client = ps.http_client()
|
||||
gc_result = client.timeline_gc(shard, env.initial_timeline, 0)
|
||||
log.info(f"{gc_result=}")
|
||||
|
||||
assert (
|
||||
gc_result["layers_removed"] == 0
|
||||
), "No layers should be removed, old layers are guarded by leases."
|
||||
|
||||
with ep_static.cursor() as cur:
|
||||
cur.execute("SELECT count(*) FROM t0")
|
||||
assert cur.fetchone() == (ROW_COUNT,)
|
||||
|
||||
# Insert some records on main branch
|
||||
with env.endpoints.create_start("main") as ep_main:
|
||||
with ep_main.cursor() as cur:
|
||||
@@ -193,25 +210,31 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
generate_updates_on_main(env, ep_main, i, end=100)
|
||||
|
||||
# Trigger GC
|
||||
for shard, ps in tenant_get_shards(env, env.initial_tenant):
|
||||
client = ps.http_client()
|
||||
gc_result = client.timeline_gc(shard, env.initial_timeline, 0)
|
||||
log.info(f"{gc_result=}")
|
||||
trigger_gc_and_select(env, ep_static)
|
||||
|
||||
assert (
|
||||
gc_result["layers_removed"] == 0
|
||||
), "No layers should be removed, old layers are guarded by leases."
|
||||
# Trigger Pageserver restarts
|
||||
for ps in env.pageservers:
|
||||
ps.stop()
|
||||
# Static compute should have at least one lease request failure due to connection.
|
||||
time.sleep(LSN_LEASE_LENGTH / 2)
|
||||
ps.start()
|
||||
|
||||
with ep_static.cursor() as cur:
|
||||
cur.execute("SELECT count(*) FROM t0")
|
||||
assert cur.fetchone() == (ROW_COUNT,)
|
||||
trigger_gc_and_select(env, ep_static)
|
||||
|
||||
# Reconfigure pageservers
|
||||
env.pageservers[0].stop()
|
||||
env.storage_controller.node_configure(
|
||||
env.pageservers[0].id, {"availability": "Offline"}
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
trigger_gc_and_select(env, ep_static)
|
||||
|
||||
# Do some update so we can increment latest_gc_cutoff
|
||||
generate_updates_on_main(env, ep_main, i, end=100)
|
||||
|
||||
# Wait for the existing lease to expire.
|
||||
time.sleep(LSN_LEASE_LENGTH)
|
||||
time.sleep(LSN_LEASE_LENGTH + 1)
|
||||
# Now trigger GC again, layers should be removed.
|
||||
for shard, ps in tenant_get_shards(env, env.initial_tenant):
|
||||
client = ps.http_client()
|
||||
|
||||
@@ -45,10 +45,7 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool
|
||||
tenant_after = http.tenant_status(env.initial_tenant)
|
||||
assert tenant_before != tenant_after
|
||||
gc_blocking = tenant_after["gc_blocking"]
|
||||
assert (
|
||||
gc_blocking
|
||||
== "BlockingReasons { tenant_blocked_by_lsn_lease_deadline: false, timelines: 1, reasons: EnumSet(Manual) }"
|
||||
)
|
||||
assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }"
|
||||
|
||||
wait_for_another_gc_round()
|
||||
pss.assert_log_contains(gc_skipped_line)
|
||||
|
||||
Reference in New Issue
Block a user