fix: make compaction more sensitive to cancellation (#8706)

A few of the benchmarks have started failing after #8655 where they are
waiting for compactor task. Reads done by image layer creation should
already be cancellation sensitive because vectored get does a check each
time, but try sprinkling additional cancellation points to:

- each partition
- after each vectored read batch
This commit is contained in:
Joonas Koivunen
2024-08-13 20:00:54 +03:00
committed by GitHub
parent e0946e334a
commit 8f170c5105
4 changed files with 31 additions and 0 deletions

View File

@@ -3917,6 +3917,10 @@ impl Timeline {
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
.await?;
if self.cancel.is_cancelled() {
return Err(CreateImageLayersError::Cancelled);
}
for (img_key, img) in results {
let img = match img {
Ok(img) => img,
@@ -4024,6 +4028,9 @@ impl Timeline {
next_start_key: img_range.end,
});
}
if self.cancel.is_cancelled() {
return Err(CreateImageLayersError::Cancelled);
}
let mut wrote_any_image = false;
for (k, v) in data {
if v.is_empty() {
@@ -4138,6 +4145,10 @@ impl Timeline {
let check_for_image_layers = self.should_check_if_image_layers_required(lsn);
for partition in partitioning.parts.iter() {
if self.cancel.is_cancelled() {
return Err(CreateImageLayersError::Cancelled);
}
let img_range = start..partition.ranges.last().unwrap().end;
let compact_metadata = partition.overlaps(&Key::metadata_key_range());
if compact_metadata {

View File

@@ -748,6 +748,9 @@ impl Timeline {
let all_keys = {
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
if self.cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
}
// The current stdlib sorting implementation is designed in a way where it is
@@ -830,6 +833,11 @@ impl Timeline {
};
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
drop_rlock(guard);
if self.cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
// This iterator walks through all key-value pairs from all the layers

View File

@@ -1251,6 +1251,8 @@ class NeonEnv:
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
"""
After this method returns, there should be no child processes running.
Unless of course, some stopping failed, in that case, all remaining child processes are leaked.
"""
self.endpoints.stop_all(fail_on_endpoint_errors)

View File

@@ -159,6 +159,8 @@ def test_pageserver_chaos(
if build_type == "debug":
pytest.skip("times out in debug builds")
# same rationale as with the immediate stop; we might leave orphan layers behind.
neon_env_builder.disable_scrub_on_exit()
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
@@ -220,3 +222,11 @@ def test_pageserver_chaos(
# Check that all the updates are visible
num_updates = endpoint.safe_psql("SELECT sum(updates) FROM foo")[0][0]
assert num_updates == i * 100000
# currently pageserver cannot tolerate the fact that "s3" goes away, and if
# we succeeded in a compaction before shutdown, there might be a lot of
# uploads pending, certainly more than what we can ingest with MOCK_S3
#
# so instead, do a fast shutdown for this one test.
# See https://github.com/neondatabase/neon/issues/8709
env.stop(immediate=True)