pageserver: wait for lsn lease duration after transition into AttachedSingle (#9024)

Part of #7497, closes https://github.com/neondatabase/neon/issues/8890.

## Problem

Since leases are in-memory objects, we need to take special care of them
after pageserver restarts and while doing a live migration. The approach
we took for pageserver restart is to wait for at least lease duration
before doing first GC. We want to do the same for live migration. Since
we do not do any GC when a tenant is in `AttachedStale` or
`AttachedMulti` mode, only the transition from `AttachedMulti` to
`AttachedSingle` requires this treatment.

## Summary of changes

- Added `lsn_lease_deadline` field in `GcBlock::reasons`: the tenant is
temporarily blocked from GC until we reach the deadline. This
information does not persist to S3.
- In `GCBlock::start`, skip the GC iteration if we are blocked by the
lsn lease deadline.
- In `TenantManager::upsert_location`, set the lsn_lease_deadline to
`Instant::now() + lsn_lease_length` so the granted leases have a chance
to be renewed before we run GC for the first time after transitioned
from AttachedMulti to AttachedSingle.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
This commit is contained in:
Yuchen Liang
2024-09-19 12:27:10 -04:00
committed by GitHub
parent 0a1ca7670c
commit 1708743e78
16 changed files with 91 additions and 43 deletions

View File

@@ -1,11 +1,29 @@
use std::collections::HashMap;
use utils::id::TimelineId;
use std::{collections::HashMap, time::Duration};
use super::remote_timeline_client::index::GcBlockingReason;
use tokio::time::Instant;
use utils::id::TimelineId;
type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
type TimelinesBlocked = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
#[derive(Default)]
struct Storage {
timelines_blocked: TimelinesBlocked,
/// The deadline before which we are blocked from GC so that
/// leases have a chance to be renewed.
lsn_lease_deadline: Option<Instant>,
}
impl Storage {
fn is_blocked_by_lsn_lease_deadline(&self) -> bool {
self.lsn_lease_deadline
.map(|d| Instant::now() < d)
.unwrap_or(false)
}
}
/// GcBlock provides persistent (per-timeline) gc blocking and facilitates transient time based gc
/// blocking.
#[derive(Default)]
pub(crate) struct GcBlock {
/// The timelines which have current reasons to block gc.
@@ -13,6 +31,12 @@ pub(crate) struct GcBlock {
/// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done
/// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`.
reasons: std::sync::Mutex<Storage>,
/// GC background task or manually run `Tenant::gc_iteration` holds a lock on this.
///
/// Do not add any more features taking and forbidding taking this lock. It should be
/// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
/// synchronizes with gc attempts by locking and unlocking this mutex.
blocking: tokio::sync::Mutex<()>,
}
@@ -42,6 +66,20 @@ impl GcBlock {
}
}
/// Sets a deadline before which we cannot proceed to GC due to lsn lease.
///
/// We do this as the leases mapping are not persisted to disk. By delaying GC by lease
/// length, we guarantee that all the leases we granted before will have a chance to renew
/// when we run GC for the first time after restart / transition from AttachedMulti to AttachedSingle.
pub(super) fn set_lsn_lease_deadline(&self, lsn_lease_length: Duration) {
let deadline = Instant::now() + lsn_lease_length;
let mut g = self.reasons.lock().unwrap();
g.lsn_lease_deadline = Some(deadline);
}
/// Describe the current gc blocking reasons.
///
/// TODO: make this json serializable.
pub(crate) fn summary(&self) -> Option<BlockingReasons> {
let g = self.reasons.lock().unwrap();
@@ -64,7 +102,7 @@ impl GcBlock {
) -> anyhow::Result<bool> {
let (added, uploaded) = {
let mut g = self.reasons.lock().unwrap();
let set = g.entry(timeline.timeline_id).or_default();
let set = g.timelines_blocked.entry(timeline.timeline_id).or_default();
let added = set.insert(reason);
// LOCK ORDER: intentionally hold the lock, see self.reasons.
@@ -95,7 +133,7 @@ impl GcBlock {
let (remaining_blocks, uploaded) = {
let mut g = self.reasons.lock().unwrap();
match g.entry(timeline.timeline_id) {
match g.timelines_blocked.entry(timeline.timeline_id) {
Entry::Occupied(mut oe) => {
let set = oe.get_mut();
set.remove(reason);
@@ -109,7 +147,7 @@ impl GcBlock {
}
}
let remaining_blocks = g.len();
let remaining_blocks = g.timelines_blocked.len();
// LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
let uploaded = timeline
@@ -134,11 +172,11 @@ impl GcBlock {
pub(crate) fn before_delete(&self, timeline: &super::Timeline) {
let unblocked = {
let mut g = self.reasons.lock().unwrap();
if g.is_empty() {
if g.timelines_blocked.is_empty() {
return;
}
g.remove(&timeline.timeline_id);
g.timelines_blocked.remove(&timeline.timeline_id);
BlockingReasons::clean_and_summarize(g).is_none()
};
@@ -149,10 +187,11 @@ impl GcBlock {
}
/// Initialize with the non-deleted timelines of this tenant.
pub(crate) fn set_scanned(&self, scanned: Storage) {
pub(crate) fn set_scanned(&self, scanned: TimelinesBlocked) {
let mut g = self.reasons.lock().unwrap();
assert!(g.is_empty());
g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
assert!(g.timelines_blocked.is_empty());
g.timelines_blocked
.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
tracing::info!(summary=?reasons, "initialized with gc blocked");
@@ -166,6 +205,7 @@ pub(super) struct Guard<'a> {
#[derive(Debug)]
pub(crate) struct BlockingReasons {
tenant_blocked_by_lsn_lease_deadline: bool,
timelines: usize,
reasons: enumset::EnumSet<GcBlockingReason>,
}
@@ -174,8 +214,8 @@ impl std::fmt::Display for BlockingReasons {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} timelines block for {:?}",
self.timelines, self.reasons
"tenant_blocked_by_lsn_lease_deadline: {}, {} timelines block for {:?}",
self.tenant_blocked_by_lsn_lease_deadline, self.timelines, self.reasons
)
}
}
@@ -183,13 +223,15 @@ impl std::fmt::Display for BlockingReasons {
impl BlockingReasons {
fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
let mut reasons = enumset::EnumSet::empty();
g.retain(|_key, value| {
g.timelines_blocked.retain(|_key, value| {
reasons = reasons.union(*value);
!value.is_empty()
});
if !g.is_empty() {
let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline();
if !g.timelines_blocked.is_empty() || blocked_by_lsn_lease_deadline {
Some(BlockingReasons {
timelines: g.len(),
tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline,
timelines: g.timelines_blocked.len(),
reasons,
})
} else {
@@ -198,14 +240,17 @@ impl BlockingReasons {
}
fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
if g.is_empty() {
let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline();
if g.timelines_blocked.is_empty() && !blocked_by_lsn_lease_deadline {
None
} else {
let reasons = g
.timelines_blocked
.values()
.fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
Some(BlockingReasons {
timelines: g.len(),
tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline,
timelines: g.timelines_blocked.len(),
reasons,
})
}

View File

@@ -949,6 +949,12 @@ impl TenantManager {
(LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
match attach_conf.generation.cmp(&tenant.generation) {
Ordering::Equal => {
if attach_conf.attach_mode == AttachmentMode::Single {
tenant
.gc_block
.set_lsn_lease_deadline(tenant.get_lsn_lease_length());
}
// A transition from Attached to Attached in the same generation, we may
// take our fast path and just provide the updated configuration
// to the tenant.

View File

@@ -346,6 +346,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let mut first = true;
tenant.gc_block.set_lsn_lease_deadline(tenant.get_lsn_lease_length());
loop {
tokio::select! {
_ = cancel.cancelled() => {
@@ -363,7 +364,6 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
first = false;
let delays = async {
delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel).await?;
random_init_delay(period, &cancel).await?;
Ok::<_, Cancelled>(())
};
@@ -538,28 +538,12 @@ pub(crate) async fn random_init_delay(
let mut rng = rand::thread_rng();
rng.gen_range(Duration::ZERO..=period)
};
match tokio::time::timeout(d, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
}
}
/// Delays GC by defaul lease length at restart.
///
/// We do this as the leases mapping are not persisted to disk. By delaying GC by default
/// length, we gurantees that all the leases we granted before the restart will expire
/// when we run GC for the first time after the restart.
pub(crate) async fn delay_by_lease_length(
length: Duration,
cancel: &CancellationToken,
) -> Result<(), Cancelled> {
match tokio::time::timeout(length, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
}
}
struct Iteration {
started_at: Instant,
period: Duration,

View File

@@ -142,6 +142,7 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
"image_creation_threshold": "1",
# set PITR interval to be small, so we can do GC
"pitr_interval": "0 s",
"lsn_lease_length": "0s",
}
)

View File

@@ -11,7 +11,9 @@ from fixtures.utils import print_gc_result, query_scalar
#
def test_branch_behind(neon_env_builder: NeonEnvBuilder):
# Disable pitr, because here we want to test branch creation after GC
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
env = neon_env_builder.init_start(
initial_tenant_conf={"pitr_interval": "0 sec", "lsn_lease_length": "0s"}
)
error_regexes = [
".*invalid branch start lsn.*",

View File

@@ -419,7 +419,7 @@ def test_duplicate_creation(neon_env_builder: NeonEnvBuilder):
def test_branching_while_stuck_find_gc_cutoffs(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
client = env.pageserver.http_client()

View File

@@ -240,6 +240,7 @@ def test_uploads_and_deletions(
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"compaction_algorithm": json.dumps({"kind": compaction_algorithm.value}),
"lsn_lease_length": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)

View File

@@ -222,7 +222,7 @@ def pgbench_accounts_initialized(ep):
# Without hs feedback enabled we'd see 'User query might have needed to see row
# versions that must be removed.' errors.
def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
agressive_vacuum_conf = [
"log_autovacuum_min_duration = 0",
"autovacuum_naptime = 10s",

View File

@@ -173,6 +173,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
# "image_creation_threshold": set at runtime
"compaction_target_size": f"{128 * (1024**2)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
"image_layer_creation_check_threshold": "0", # always check if a new image layer can be created
"lsn_lease_length": "0s",
}
def tenant_update_config(changes):

View File

@@ -53,6 +53,7 @@ TENANT_CONF = {
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"lsn_lease_length": "0s",
}

View File

@@ -244,6 +244,7 @@ def test_remote_storage_upload_queue_retries(
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"lsn_lease_length": "0s",
}
)
@@ -391,6 +392,7 @@ def test_remote_timeline_client_calls_started_metric(
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
"lsn_lease_length": "0s",
}
)

View File

@@ -200,6 +200,7 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
# Disable automatic creation of image layers, as we will create them explicitly when we want them
"image_creation_threshold": 9999,
"image_layer_creation_check_threshold": 0,
"lsn_lease_length": "0s",
}
neon_env_builder.storage_controller_config = {

View File

@@ -485,7 +485,7 @@ def test_storage_controller_compute_hook(
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
# Start running
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
# Initial notification from tenant creation
assert len(notifications) == 1

View File

@@ -204,6 +204,7 @@ def test_scrubber_physical_gc_ancestors(
# No PITR, so that as soon as child shards generate an image layer, it covers ancestor deltas
# and makes them GC'able
"pitr_interval": "0s",
"lsn_lease_length": "0s",
},
)

View File

@@ -266,13 +266,13 @@ def test_tenant_reattach_while_busy(
def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
pageserver_http = env.pageserver.http_client()
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)
# create new nenant
tenant_id, timeline_id = env.neon_cli.create_tenant()
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
# assert tenant exists on disk
assert env.pageserver.tenant_dir(tenant_id).exists()

View File

@@ -45,7 +45,10 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool
tenant_after = http.tenant_status(env.initial_tenant)
assert tenant_before != tenant_after
gc_blocking = tenant_after["gc_blocking"]
assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }"
assert (
gc_blocking
== "BlockingReasons { tenant_blocked_by_lsn_lease_deadline: false, timelines: 1, reasons: EnumSet(Manual) }"
)
wait_for_another_gc_round()
pss.assert_log_contains(gc_skipped_line)