mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 09:00:37 +00:00
fix: find gc cutoff points without holding Tenant::gc_cs (#7585)
The current implementation of finding timeline gc cutoff Lsn(s) is done while holding `Tenant::gc_cs`. In recent incidents long create branch times were caused by holding the `Tenant::gc_cs` over extremely long `Timeline::find_lsn_by_timestamp`. The fix is to find the GC cutoff values before taking the `Tenant::gc_cs` lock. This change is safe to do because the GC cutoff values and the branch points have no dependencies on each other. In the case of `Timeline::find_gc_cutoff` taking a long time with this change, we should no longer see `Tenant::gc_cs` interfering with branch creation. Additionally, the `Tenant::refresh_gc_info` is now tolerant of timeline deletions (or any other failures to find the pitr_cutoff). This helps with the synthetic size calculation being constantly completed instead of having a break for a timely timeline deletion. Fixes: #7560 Fixes: #7587
This commit is contained in:
@@ -62,9 +62,9 @@ use self::timeline::uninit::TimelineCreateGuard;
|
||||
use self::timeline::uninit::TimelineExclusionError;
|
||||
use self::timeline::uninit::UninitializedTimeline;
|
||||
use self::timeline::EvictionTaskTenantState;
|
||||
use self::timeline::GcInfo;
|
||||
use self::timeline::TimelineResources;
|
||||
use self::timeline::WaitLsnError;
|
||||
use self::timeline::{GcCutoffs, GcInfo};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
@@ -2812,7 +2812,48 @@ impl Tenant {
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<Arc<Timeline>>> {
|
||||
// grab mutex to prevent new timelines from being created here.
|
||||
// before taking the gc_cs lock, do the heavier weight finding of gc_cutoff points for
|
||||
// currently visible timelines.
|
||||
let timelines = self
|
||||
.timelines
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.filter(|tl| match target_timeline_id.as_ref() {
|
||||
Some(target) => &tl.timeline_id == target,
|
||||
None => true,
|
||||
})
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let mut gc_cutoffs: HashMap<TimelineId, GcCutoffs> =
|
||||
HashMap::with_capacity(timelines.len());
|
||||
|
||||
for timeline in timelines.iter() {
|
||||
let cutoff = timeline
|
||||
.get_last_record_lsn()
|
||||
.checked_sub(horizon)
|
||||
.unwrap_or(Lsn(0));
|
||||
|
||||
let res = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await;
|
||||
|
||||
match res {
|
||||
Ok(cutoffs) => {
|
||||
let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs);
|
||||
assert!(old.is_none());
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(timeline_id = %timeline.timeline_id, "ignoring failure to find gc cutoffs: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !self.is_active() {
|
||||
anyhow::bail!("shutting down");
|
||||
}
|
||||
|
||||
// grab mutex to prevent new timelines from being created here; avoid doing long operations
|
||||
// because that will stall branch creation.
|
||||
let gc_cs = self.gc_cs.lock().await;
|
||||
|
||||
// Scan all timelines. For each timeline, remember the timeline ID and
|
||||
@@ -2874,11 +2915,6 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
let cutoff = timeline
|
||||
.get_last_record_lsn()
|
||||
.checked_sub(horizon)
|
||||
.unwrap_or(Lsn(0));
|
||||
|
||||
let branchpoints: Vec<Lsn> = all_branchpoints
|
||||
.range((
|
||||
Included((timeline_id, Lsn(0))),
|
||||
@@ -2886,12 +2922,27 @@ impl Tenant {
|
||||
))
|
||||
.map(|&x| x.1)
|
||||
.collect();
|
||||
let cutoffs = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await?;
|
||||
|
||||
*timeline.gc_info.write().unwrap() = GcInfo {
|
||||
retain_lsns: branchpoints,
|
||||
cutoffs,
|
||||
};
|
||||
{
|
||||
let mut target = timeline.gc_info.write().unwrap();
|
||||
|
||||
match gc_cutoffs.remove(&timeline_id) {
|
||||
Some(cutoffs) => {
|
||||
*target = GcInfo {
|
||||
retain_lsns: branchpoints,
|
||||
cutoffs,
|
||||
};
|
||||
}
|
||||
None => {
|
||||
// reasons for this being unavailable:
|
||||
// - this timeline was created while we were finding cutoffs
|
||||
// - lsn for timestamp search fails for this timeline repeatedly
|
||||
//
|
||||
// in both cases, refreshing the branchpoints is correct.
|
||||
target.retain_lsns = branchpoints;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
gc_timelines.push(timeline);
|
||||
}
|
||||
|
||||
@@ -118,9 +118,6 @@ pub(super) async fn gather_inputs(
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ModelInputs> {
|
||||
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
|
||||
//
|
||||
// FIXME: if a single timeline is deleted while refresh gc info is ongoing, we will fail the
|
||||
// whole computation. It does not make sense from the billing perspective.
|
||||
tenant
|
||||
.refresh_gc_info(cancel, ctx)
|
||||
.await
|
||||
@@ -221,6 +218,8 @@ pub(super) async fn gather_inputs(
|
||||
.map(|lsn| (lsn, LsnKind::BranchPoint))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
drop(gc_info);
|
||||
|
||||
// Add branch points we collected earlier, just in case there were any that were
|
||||
// not present in retain_lsns. We will remove any duplicates below later.
|
||||
if let Some(this_branchpoints) = branchpoints.get(&timeline_id) {
|
||||
|
||||
@@ -4434,6 +4434,8 @@ impl Timeline {
|
||||
.start_timer()
|
||||
.record_on_drop();
|
||||
|
||||
pausable_failpoint!("Timeline::find_gc_cutoffs-pausable");
|
||||
|
||||
// First, calculate pitr_cutoff_timestamp and then convert it to LSN.
|
||||
//
|
||||
// Some unit tests depend on garbage-collection working even when
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
@@ -405,6 +406,29 @@ def test_duplicate_creation(neon_env_builder: NeonEnvBuilder):
|
||||
assert len(ps_http.timeline_list(tenant_id=env.initial_tenant)) == 1
|
||||
|
||||
|
||||
def test_branching_while_stuck_find_gc_cutoffs(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
failpoint = "Timeline::find_gc_cutoffs-pausable"
|
||||
|
||||
client.configure_failpoints((failpoint, "pause"))
|
||||
|
||||
with ThreadPoolExecutor(max_workers=1) as exec:
|
||||
completion = exec.submit(client.timeline_gc, env.initial_tenant, env.initial_timeline, None)
|
||||
|
||||
wait_until_paused(env, failpoint)
|
||||
|
||||
env.neon_cli.create_branch(
|
||||
tenant_id=env.initial_tenant, ancestor_branch_name="main", new_branch_name="branch"
|
||||
)
|
||||
|
||||
client.configure_failpoints((failpoint, "off"))
|
||||
|
||||
completion.result()
|
||||
|
||||
|
||||
def wait_until_paused(env: NeonEnv, failpoint: str):
|
||||
found = False
|
||||
msg = f"at failpoint {failpoint}"
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
|
||||
@@ -11,13 +12,15 @@ from fixtures.neon_fixtures import (
|
||||
wait_for_last_flush_lsn,
|
||||
wait_for_wal_insert_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
tenant_delete_wait_completed,
|
||||
timeline_delete_wait_completed,
|
||||
wait_until_tenant_active,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
def test_empty_tenant_size(neon_env_builder: NeonEnvBuilder):
|
||||
@@ -616,6 +619,68 @@ def test_get_tenant_size_with_multiple_branches(
|
||||
size_debug_file.write(size_debug)
|
||||
|
||||
|
||||
def test_synthetic_size_while_deleting(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Makes sure synthetic size can still be calculated even if one of the
|
||||
timelines is deleted or the tenant is deleted.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
failpoint = "Timeline::find_gc_cutoffs-pausable"
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
orig_size = client.tenant_size(env.initial_tenant)
|
||||
|
||||
branch_id = env.neon_cli.create_branch(
|
||||
tenant_id=env.initial_tenant, ancestor_branch_name="main", new_branch_name="branch"
|
||||
)
|
||||
client.configure_failpoints((failpoint, "pause"))
|
||||
|
||||
with ThreadPoolExecutor(max_workers=1) as exec:
|
||||
completion = exec.submit(client.tenant_size, env.initial_tenant)
|
||||
_, last_offset = wait_until(
|
||||
10, 1.0, lambda: env.pageserver.assert_log_contains(f"at failpoint {failpoint}")
|
||||
)
|
||||
|
||||
timeline_delete_wait_completed(client, env.initial_tenant, branch_id)
|
||||
|
||||
client.configure_failpoints((failpoint, "off"))
|
||||
size = completion.result()
|
||||
|
||||
assert_size_approx_equal(orig_size, size)
|
||||
|
||||
branch_id = env.neon_cli.create_branch(
|
||||
tenant_id=env.initial_tenant, ancestor_branch_name="main", new_branch_name="branch2"
|
||||
)
|
||||
client.configure_failpoints((failpoint, "pause"))
|
||||
|
||||
with ThreadPoolExecutor(max_workers=1) as exec:
|
||||
completion = exec.submit(client.tenant_size, env.initial_tenant)
|
||||
wait_until(
|
||||
10,
|
||||
1.0,
|
||||
lambda: env.pageserver.assert_log_contains(
|
||||
f"at failpoint {failpoint}", offset=last_offset
|
||||
),
|
||||
)
|
||||
|
||||
tenant_delete_wait_completed(client, env.initial_tenant, 10)
|
||||
|
||||
client.configure_failpoints((failpoint, "off"))
|
||||
|
||||
with pytest.raises(
|
||||
PageserverApiException, match="Failed to refresh gc_info before gathering inputs"
|
||||
):
|
||||
completion.result()
|
||||
|
||||
# this happens on both cases
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*ignoring failure to find gc cutoffs: timeline shutting down.*"
|
||||
)
|
||||
# this happens only in the case of deletion (http response logging)
|
||||
env.pageserver.allowed_errors.append(".*Failed to refresh gc_info before gathering inputs.*")
|
||||
|
||||
|
||||
# Helper for tests that compare timeline_inputs
|
||||
# We don't want to compare the exact values, because they can be unstable
|
||||
# and cause flaky tests. So replace the values with useful invariants.
|
||||
|
||||
Reference in New Issue
Block a user