mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
test: duplicate L1 layer (#5412)
We overwrite L1 layers if compaction gets interrupted. We did not have a test showing that we do in fact do this. The test might be a bit flaky due to timestamp usage, but separating for smaller diff in as part of #5172. Also removes an unrelated 200s pgbench from the test suite.
This commit is contained in:
@@ -3722,6 +3722,11 @@ impl Timeline {
|
||||
});
|
||||
|
||||
writer.as_mut().unwrap().put_value(key, lsn, value).await?;
|
||||
|
||||
if !new_layers.is_empty() {
|
||||
fail_point!("after-timeline-compacted-first-L1");
|
||||
}
|
||||
|
||||
prev_key = Some(key);
|
||||
}
|
||||
if let Some(writer) = writer {
|
||||
@@ -3876,6 +3881,7 @@ impl Timeline {
|
||||
);
|
||||
let l = l as Arc<dyn PersistentLayer>;
|
||||
if guard.contains(&l) {
|
||||
tracing::error!(layer=%l, "duplicated L1 layer");
|
||||
duplicated_layers.insert(l.layer_desc().key());
|
||||
} else {
|
||||
if LayerMap::is_l0(l.layer_desc()) {
|
||||
|
||||
@@ -1,19 +1,21 @@
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
|
||||
from fixtures.pageserver.utils import wait_for_upload_queue_empty
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from requests.exceptions import ConnectionError
|
||||
|
||||
|
||||
# Test duplicate layer detection
|
||||
#
|
||||
# This test sets fail point at the end of first compaction phase:
|
||||
# after flushing new L1 layers but before deletion of L0 layers
|
||||
# it should cause generation of duplicate L1 layer by compaction after restart.
|
||||
@pytest.mark.timeout(600)
|
||||
def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# use a failpoint to return all L0s as L1s
|
||||
message = ".*duplicated L1 layer layer=.*"
|
||||
env.pageserver.allowed_errors.append(message)
|
||||
|
||||
# Use aggressive compaction and checkpoint settings
|
||||
tenant_id, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
@@ -33,4 +35,100 @@ def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
time.sleep(10) # let compaction to be performed
|
||||
assert env.pageserver.log_contains("compact-level0-phase1-return-same")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T200", "-Mprepared", connstr])
|
||||
|
||||
def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
"""
|
||||
This test sets fail point at the end of first compaction phase:
|
||||
after flushing new L1 layers but before deletion of L0 layers
|
||||
it should cause generation of duplicate L1 layer by compaction after restart.
|
||||
"""
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
"compaction_period": "0 s",
|
||||
"compaction_threshold": "3",
|
||||
}
|
||||
)
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
|
||||
|
||||
pageserver_http.configure_failpoints(("after-timeline-compacted-first-L1", "exit"))
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
connstr = endpoint.connstr(options="-csynchronous_commit=off")
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s1", connstr])
|
||||
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
# make sure we receive no new wal after this, so that we'll write over the same L1 file.
|
||||
endpoint.stop()
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
|
||||
# hit the exit failpoint
|
||||
with pytest.raises(ConnectionError, match="Remote end closed connection without response"):
|
||||
pageserver_http.timeline_compact(tenant_id, timeline_id)
|
||||
env.pageserver.stop()
|
||||
|
||||
# now the duplicate L1 has been created, but is not yet uploaded
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
|
||||
# path = env.remote_storage.timeline_path(tenant_id, timeline_id)
|
||||
l1_found = None
|
||||
for path in env.pageserver.timeline_dir(tenant_id, timeline_id).iterdir():
|
||||
if path.name == "metadata" or path.name.startswith("ephemeral-"):
|
||||
continue
|
||||
|
||||
if len(path.suffixes) > 0:
|
||||
# temp files
|
||||
continue
|
||||
|
||||
[key_range, lsn_range] = path.name.split("__", maxsplit=1)
|
||||
|
||||
if "-" not in lsn_range:
|
||||
# image layer
|
||||
continue
|
||||
|
||||
[key_start, key_end] = key_range.split("-", maxsplit=1)
|
||||
|
||||
if key_start == "0" * 36 and key_end == "F" * 36:
|
||||
# L0
|
||||
continue
|
||||
|
||||
if l1_found is not None:
|
||||
raise RuntimeError(f"found multiple L1: {l1_found.name} and {path.name}")
|
||||
l1_found = path
|
||||
|
||||
assert l1_found is not None, "failed to find L1 locally"
|
||||
original_created_at = l1_found.stat()[8]
|
||||
|
||||
uploaded = env.pageserver_remote_storage.timeline_path(tenant_id, timeline_id) / l1_found.name
|
||||
assert not uploaded.exists(), "to-be-overwritten should not yet be uploaded"
|
||||
|
||||
# give room for fs timestamps
|
||||
time.sleep(1)
|
||||
|
||||
env.pageserver.start()
|
||||
message = f".*duplicated L1 layer layer={l1_found.name}"
|
||||
env.pageserver.allowed_errors.append(message)
|
||||
|
||||
pageserver_http.timeline_compact(tenant_id, timeline_id)
|
||||
# give time for log flush
|
||||
time.sleep(1)
|
||||
|
||||
found_msg = env.pageserver.log_contains(message)
|
||||
assert found_msg is not None, "no layer was duplicated, has this been fixed already?"
|
||||
|
||||
log.info(f"found log line: {found_msg}")
|
||||
|
||||
overwritten_at = l1_found.stat()[8]
|
||||
assert original_created_at < overwritten_at, "expected the L1 to be overwritten"
|
||||
|
||||
wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id)
|
||||
|
||||
uploaded_at = uploaded.stat()[8]
|
||||
assert overwritten_at <= uploaded_at, "expected the L1 to finally be uploaded"
|
||||
|
||||
@@ -120,6 +120,10 @@ def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# these can happen, if we shutdown at a good time. to be fixed as part of #5172.
|
||||
message = ".*duplicated L1 layer layer=.*"
|
||||
env.pageserver.allowed_errors.append(message)
|
||||
|
||||
# Use a tiny checkpoint distance, to create a lot of layers quickly.
|
||||
# That allows us to stress the compaction and layer flushing logic more.
|
||||
tenant, _ = env.neon_cli.create_tenant(
|
||||
|
||||
Reference in New Issue
Block a user