mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 14:00:38 +00:00
fix: logical size calculation gating (#6915)
Noticed that we are failing to handle `Result::Err` when entering a gate for logical size calculation. Audited rest of the gate enters, which seem fine, unified two instances. Noticed that the gate guard allows to remove a failpoint, then noticed that adjacent failpoint was blocking the executor thread instead of using `pausable_failpoint!`, fix both. eviction_task.rs now maintains a gate guard as well. Cc: #4733
This commit is contained in:
@@ -3461,9 +3461,8 @@ impl Tenant {
|
||||
// Run each timeline's flush in a task holding the timeline's gate: this
|
||||
// means that if this function's future is cancelled, the Timeline shutdown
|
||||
// will still wait for any I/O in here to complete.
|
||||
let gate = match timeline.gate.enter() {
|
||||
Ok(g) => g,
|
||||
Err(_) => continue,
|
||||
let Ok(gate) = timeline.gate.enter() else {
|
||||
continue;
|
||||
};
|
||||
let jh = tokio::task::spawn(async move { flush_timeline(gate, timeline).await });
|
||||
results.push(jh);
|
||||
|
||||
@@ -373,12 +373,9 @@ async fn upload_tenant_heatmap(
|
||||
// Ensure that Tenant::shutdown waits for any upload in flight: this is needed because otherwise
|
||||
// when we delete a tenant, we might race with an upload in flight and end up leaving a heatmap behind
|
||||
// in remote storage.
|
||||
let _guard = match tenant.gate.enter() {
|
||||
Ok(g) => g,
|
||||
Err(_) => {
|
||||
tracing::info!("Skipping heatmap upload for tenant which is shutting down");
|
||||
return Err(UploadHeatmapError::Cancelled);
|
||||
}
|
||||
let Ok(_guard) = tenant.gate.enter() else {
|
||||
tracing::info!("Skipping heatmap upload for tenant which is shutting down");
|
||||
return Err(UploadHeatmapError::Cancelled);
|
||||
};
|
||||
|
||||
for (timeline_id, timeline) in timelines {
|
||||
|
||||
@@ -33,7 +33,10 @@ use tokio::{
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::{bin_ser::BeSer, sync::gate::Gate};
|
||||
use utils::{
|
||||
bin_ser::BeSer,
|
||||
sync::gate::{Gate, GateGuard},
|
||||
};
|
||||
|
||||
use std::ops::{Deref, Range};
|
||||
use std::pin::pin;
|
||||
@@ -2288,14 +2291,17 @@ impl Timeline {
|
||||
// accurate relation sizes, and they do not emit consumption metrics.
|
||||
debug_assert!(self.tenant_shard_id.is_zero());
|
||||
|
||||
let _guard = self.gate.enter();
|
||||
let guard = self
|
||||
.gate
|
||||
.enter()
|
||||
.map_err(|_| CalculateLogicalSizeError::Cancelled)?;
|
||||
|
||||
let self_calculation = Arc::clone(self);
|
||||
|
||||
let mut calculation = pin!(async {
|
||||
let ctx = ctx.attached_child();
|
||||
self_calculation
|
||||
.calculate_logical_size(lsn, cause, &ctx)
|
||||
.calculate_logical_size(lsn, cause, &guard, &ctx)
|
||||
.await
|
||||
});
|
||||
|
||||
@@ -2324,33 +2330,16 @@ impl Timeline {
|
||||
&self,
|
||||
up_to_lsn: Lsn,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
_guard: &GateGuard,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<u64, CalculateLogicalSizeError> {
|
||||
info!(
|
||||
"Calculating logical size for timeline {} at {}",
|
||||
self.timeline_id, up_to_lsn
|
||||
);
|
||||
// These failpoints are used by python tests to ensure that we don't delete
|
||||
// the timeline while the logical size computation is ongoing.
|
||||
// The first failpoint is used to make this function pause.
|
||||
// Then the python test initiates timeline delete operation in a thread.
|
||||
// It waits for a few seconds, then arms the second failpoint and disables
|
||||
// the first failpoint. The second failpoint prints an error if the timeline
|
||||
// delete code has deleted the on-disk state while we're still running here.
|
||||
// It shouldn't do that. If it does it anyway, the error will be caught
|
||||
// by the test suite, highlighting the problem.
|
||||
fail::fail_point!("timeline-calculate-logical-size-pause");
|
||||
fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
|
||||
if !self
|
||||
.conf
|
||||
.timeline_path(&self.tenant_shard_id, &self.timeline_id)
|
||||
.exists()
|
||||
{
|
||||
error!("timeline-calculate-logical-size-pre metadata file does not exist")
|
||||
}
|
||||
// need to return something
|
||||
Ok(0)
|
||||
});
|
||||
|
||||
pausable_failpoint!("timeline-calculate-logical-size-pause");
|
||||
|
||||
// See if we've already done the work for initial size calculation.
|
||||
// This is a short-cut for timelines that are mostly unused.
|
||||
if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
|
||||
|
||||
@@ -34,7 +34,7 @@ use crate::{
|
||||
},
|
||||
};
|
||||
|
||||
use utils::completion;
|
||||
use utils::{completion, sync::gate::GateGuard};
|
||||
|
||||
use super::Timeline;
|
||||
|
||||
@@ -81,6 +81,12 @@ impl Timeline {
|
||||
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
|
||||
async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
|
||||
use crate::tenant::tasks::random_init_delay;
|
||||
|
||||
// acquire the gate guard only once within a useful span
|
||||
let Ok(guard) = self.gate.enter() else {
|
||||
return;
|
||||
};
|
||||
|
||||
{
|
||||
let policy = self.get_eviction_policy();
|
||||
let period = match policy {
|
||||
@@ -96,7 +102,9 @@ impl Timeline {
|
||||
let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn);
|
||||
loop {
|
||||
let policy = self.get_eviction_policy();
|
||||
let cf = self.eviction_iteration(&policy, &cancel, &ctx).await;
|
||||
let cf = self
|
||||
.eviction_iteration(&policy, &cancel, &guard, &ctx)
|
||||
.await;
|
||||
|
||||
match cf {
|
||||
ControlFlow::Break(()) => break,
|
||||
@@ -117,6 +125,7 @@ impl Timeline {
|
||||
self: &Arc<Self>,
|
||||
policy: &EvictionPolicy,
|
||||
cancel: &CancellationToken,
|
||||
gate: &GateGuard,
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<(), Instant> {
|
||||
debug!("eviction iteration: {policy:?}");
|
||||
@@ -127,14 +136,17 @@ impl Timeline {
|
||||
return ControlFlow::Continue(Instant::now() + Duration::from_secs(10));
|
||||
}
|
||||
EvictionPolicy::LayerAccessThreshold(p) => {
|
||||
match self.eviction_iteration_threshold(p, cancel, ctx).await {
|
||||
match self
|
||||
.eviction_iteration_threshold(p, cancel, gate, ctx)
|
||||
.await
|
||||
{
|
||||
ControlFlow::Break(()) => return ControlFlow::Break(()),
|
||||
ControlFlow::Continue(()) => (),
|
||||
}
|
||||
(p.period, p.threshold)
|
||||
}
|
||||
EvictionPolicy::OnlyImitiate(p) => {
|
||||
if self.imitiate_only(p, cancel, ctx).await.is_break() {
|
||||
if self.imitiate_only(p, cancel, gate, ctx).await.is_break() {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
(p.period, p.threshold)
|
||||
@@ -165,6 +177,7 @@ impl Timeline {
|
||||
self: &Arc<Self>,
|
||||
p: &EvictionPolicyLayerAccessThreshold,
|
||||
cancel: &CancellationToken,
|
||||
gate: &GateGuard,
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<()> {
|
||||
let now = SystemTime::now();
|
||||
@@ -180,7 +193,7 @@ impl Timeline {
|
||||
_ = self.cancel.cancelled() => return ControlFlow::Break(()),
|
||||
};
|
||||
|
||||
match self.imitate_layer_accesses(p, cancel, ctx).await {
|
||||
match self.imitate_layer_accesses(p, cancel, gate, ctx).await {
|
||||
ControlFlow::Break(()) => return ControlFlow::Break(()),
|
||||
ControlFlow::Continue(()) => (),
|
||||
}
|
||||
@@ -302,6 +315,7 @@ impl Timeline {
|
||||
self: &Arc<Self>,
|
||||
p: &EvictionPolicyLayerAccessThreshold,
|
||||
cancel: &CancellationToken,
|
||||
gate: &GateGuard,
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<()> {
|
||||
let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
|
||||
@@ -315,7 +329,7 @@ impl Timeline {
|
||||
_ = self.cancel.cancelled() => return ControlFlow::Break(()),
|
||||
};
|
||||
|
||||
self.imitate_layer_accesses(p, cancel, ctx).await
|
||||
self.imitate_layer_accesses(p, cancel, gate, ctx).await
|
||||
}
|
||||
|
||||
/// If we evict layers but keep cached values derived from those layers, then
|
||||
@@ -347,6 +361,7 @@ impl Timeline {
|
||||
&self,
|
||||
p: &EvictionPolicyLayerAccessThreshold,
|
||||
cancel: &CancellationToken,
|
||||
gate: &GateGuard,
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<()> {
|
||||
if !self.tenant_shard_id.is_zero() {
|
||||
@@ -365,7 +380,7 @@ impl Timeline {
|
||||
match state.last_layer_access_imitation {
|
||||
Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
|
||||
_ => {
|
||||
self.imitate_timeline_cached_layer_accesses(ctx).await;
|
||||
self.imitate_timeline_cached_layer_accesses(gate, ctx).await;
|
||||
state.last_layer_access_imitation = Some(tokio::time::Instant::now())
|
||||
}
|
||||
}
|
||||
@@ -405,12 +420,21 @@ impl Timeline {
|
||||
|
||||
/// Recompute the values which would cause on-demand downloads during restart.
|
||||
#[instrument(skip_all)]
|
||||
async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
|
||||
async fn imitate_timeline_cached_layer_accesses(
|
||||
&self,
|
||||
guard: &GateGuard,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
let lsn = self.get_last_record_lsn();
|
||||
|
||||
// imitiate on-restart initial logical size
|
||||
let size = self
|
||||
.calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx)
|
||||
.calculate_logical_size(
|
||||
lsn,
|
||||
LogicalSizeCalculationCause::EvictionTaskImitation,
|
||||
guard,
|
||||
ctx,
|
||||
)
|
||||
.instrument(info_span!("calculate_logical_size"))
|
||||
.await;
|
||||
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
import concurrent.futures
|
||||
import math
|
||||
import queue
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from contextlib import closing
|
||||
from pathlib import Path
|
||||
@@ -20,7 +18,6 @@ from fixtures.neon_fixtures import (
|
||||
VanillaPostgres,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_tenant_state,
|
||||
timeline_delete_wait_completed,
|
||||
@@ -331,41 +328,18 @@ def test_timeline_initial_logical_size_calculation_cancellation(
|
||||
assert_size_calculation_not_done()
|
||||
|
||||
log.info(
|
||||
f"try to delete the timeline using {deletion_method}, this should cancel size computation tasks and wait for them to finish"
|
||||
f"delete the timeline using {deletion_method}, this should cancel size computation tasks and wait for them to finish"
|
||||
)
|
||||
delete_timeline_success: queue.Queue[bool] = queue.Queue(maxsize=1)
|
||||
|
||||
def delete_timeline_thread_fn():
|
||||
try:
|
||||
if deletion_method == "tenant_detach":
|
||||
client.tenant_detach(tenant_id)
|
||||
elif deletion_method == "timeline_delete":
|
||||
timeline_delete_wait_completed(client, tenant_id, timeline_id)
|
||||
delete_timeline_success.put(True)
|
||||
except PageserverApiException:
|
||||
delete_timeline_success.put(False)
|
||||
raise
|
||||
if deletion_method == "tenant_detach":
|
||||
client.tenant_detach(tenant_id)
|
||||
elif deletion_method == "timeline_delete":
|
||||
timeline_delete_wait_completed(client, tenant_id, timeline_id)
|
||||
else:
|
||||
raise RuntimeError(deletion_method)
|
||||
|
||||
delete_timeline_thread = threading.Thread(target=delete_timeline_thread_fn)
|
||||
delete_timeline_thread.start()
|
||||
# give it some time to settle in the state where it waits for size computation task
|
||||
time.sleep(5)
|
||||
if not delete_timeline_success.empty():
|
||||
raise AssertionError(
|
||||
f"test is broken, the {deletion_method} should be stuck waiting for size computation task, got result {delete_timeline_success.get()}"
|
||||
)
|
||||
|
||||
log.info(
|
||||
"resume the size calculation. The failpoint checks that the timeline directory still exists."
|
||||
)
|
||||
client.configure_failpoints(("timeline-calculate-logical-size-check-dir-exists", "return"))
|
||||
client.configure_failpoints(("timeline-calculate-logical-size-pause", "off"))
|
||||
|
||||
log.info("wait for delete timeline thread to finish and assert that it succeeded")
|
||||
assert delete_timeline_success.get()
|
||||
|
||||
# if the implementation is incorrect, the teardown would complain about an error log
|
||||
# message emitted by the code behind failpoint "timeline-calculate-logical-size-check-dir-exists"
|
||||
# timeline-calculate-logical-size-pause is still paused, but it doesn't
|
||||
# matter because it's a pausable_failpoint, which can be cancelled by drop.
|
||||
|
||||
|
||||
def test_timeline_physical_size_init(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
Reference in New Issue
Block a user