mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 14:00:38 +00:00
Add test_uploads_and_deletions test (#7758)
Adds a test that is a reproducer for many tiered compaction bugs, both ones that have since been fixed as well as still unfxied ones: * (now fixed) #7296 * #7707 * #7759 * Likely also #7244 but I haven't tried that. The key ordering bug can be reproduced by switching to `merge_delta_keys` instead of `merge_delta_keys_buffered`, so reverting a big part of #7661, although it only sometimes reproduces (30-50% of cases). part of https://github.com/neondatabase/neon/issues/7554
This commit is contained in:
@@ -59,6 +59,7 @@ from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
wait_for_upload_queue_empty,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
@@ -79,6 +80,7 @@ from fixtures.utils import (
|
||||
allure_attach_from_dir,
|
||||
assert_no_errors,
|
||||
get_self_dir,
|
||||
print_gc_result,
|
||||
subprocess_capture,
|
||||
wait_until,
|
||||
)
|
||||
@@ -4419,3 +4421,79 @@ def parse_project_git_version_output(s: str) -> str:
|
||||
return commit
|
||||
|
||||
raise ValueError(f"unable to parse --version output: '{s}'")
|
||||
|
||||
|
||||
def generate_uploads_and_deletions(
|
||||
env: NeonEnv,
|
||||
*,
|
||||
init: bool = True,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
data: Optional[str] = None,
|
||||
pageserver: NeonPageserver,
|
||||
):
|
||||
"""
|
||||
Using the environment's default tenant + timeline, generate a load pattern
|
||||
that results in some uploads and some deletions to remote storage.
|
||||
"""
|
||||
|
||||
if tenant_id is None:
|
||||
tenant_id = env.initial_tenant
|
||||
assert tenant_id is not None
|
||||
|
||||
if timeline_id is None:
|
||||
timeline_id = env.initial_timeline
|
||||
assert timeline_id is not None
|
||||
|
||||
ps_http = pageserver.http_client()
|
||||
|
||||
with env.endpoints.create_start(
|
||||
"main", tenant_id=tenant_id, pageserver_id=pageserver.id
|
||||
) as endpoint:
|
||||
if init:
|
||||
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
|
||||
last_flush_lsn_upload(
|
||||
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
|
||||
)
|
||||
|
||||
def churn(data):
|
||||
endpoint.safe_psql_many(
|
||||
[
|
||||
f"""
|
||||
INSERT INTO foo (id, val)
|
||||
SELECT g, '{data}'
|
||||
FROM generate_series(1, 200) g
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET val = EXCLUDED.val
|
||||
""",
|
||||
# to ensure that GC can actually remove some layers
|
||||
"VACUUM foo",
|
||||
]
|
||||
)
|
||||
assert tenant_id is not None
|
||||
assert timeline_id is not None
|
||||
# We are waiting for uploads as well as local flush, in order to avoid leaving the system
|
||||
# in a state where there are "future layers" in remote storage that will generate deletions
|
||||
# after a restart.
|
||||
last_flush_lsn_upload(
|
||||
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
|
||||
)
|
||||
|
||||
# Compaction should generate some GC-elegible layers
|
||||
for i in range(0, 2):
|
||||
churn(f"{i if data is None else data}")
|
||||
|
||||
gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0)
|
||||
print_gc_result(gc_result)
|
||||
assert gc_result["layers_removed"] > 0
|
||||
|
||||
# Stop endpoint and flush all data to pageserver, then checkpoint it: this
|
||||
# ensures that the pageserver is in a fully idle state: there will be no more
|
||||
# background ingest, no more uploads pending, and therefore no non-determinism
|
||||
# in subsequent actions like pageserver restarts.
|
||||
final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
# Finish uploads
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn)
|
||||
# Finish all remote writes (including deletions)
|
||||
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
import enum
|
||||
import json
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, generate_uploads_and_deletions
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.workload import Workload
|
||||
|
||||
AGGRESIVE_COMPACTION_TENANT_CONF = {
|
||||
@@ -190,3 +192,61 @@ def test_sharding_compaction(
|
||||
|
||||
# Assert that everything is still readable
|
||||
workload.validate()
|
||||
|
||||
|
||||
class CompactionAlgorithm(str, enum.Enum):
|
||||
LEGACY = "Legacy"
|
||||
TIERED = "Tiered"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"compaction_algorithm", [CompactionAlgorithm.LEGACY, CompactionAlgorithm.TIERED]
|
||||
)
|
||||
def test_uploads_and_deletions(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
compaction_algorithm: CompactionAlgorithm,
|
||||
):
|
||||
"""
|
||||
:param compaction_algorithm: the compaction algorithm to use.
|
||||
"""
|
||||
|
||||
tenant_conf = {
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": f"{128 * 1024}",
|
||||
"compaction_threshold": "1",
|
||||
"compaction_target_size": f"{128 * 1024}",
|
||||
# no PITR horizon, we specify the horizon when we request on-demand GC
|
||||
"pitr_interval": "0s",
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
# create image layers eagerly, so that GC can remove some layers
|
||||
"image_creation_threshold": "1",
|
||||
"image_layer_creation_check_threshold": "0",
|
||||
"compaction_algorithm": json.dumps({"kind": compaction_algorithm.value}),
|
||||
}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
|
||||
|
||||
# TODO remove these allowed errors
|
||||
# https://github.com/neondatabase/neon/issues/7707
|
||||
# https://github.com/neondatabase/neon/issues/7759
|
||||
allowed_errors = [
|
||||
".*duplicated L1 layer.*",
|
||||
".*delta layer created with.*duplicate values.*",
|
||||
".*assertion failed: self.lsn_range.start <= lsn.*",
|
||||
".*HTTP request handler task panicked: task.*panicked.*",
|
||||
]
|
||||
if compaction_algorithm == CompactionAlgorithm.TIERED:
|
||||
env.pageserver.allowed_errors.extend(allowed_errors)
|
||||
|
||||
try:
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver)
|
||||
except PageserverApiException as e:
|
||||
log.info(f"Obtained PageserverApiException: {e}")
|
||||
|
||||
# The errors occur flakily and no error is ensured to occur,
|
||||
# however at least one of them occurs.
|
||||
if compaction_algorithm == CompactionAlgorithm.TIERED:
|
||||
found_allowed_error = any(env.pageserver.log_contains(e) for e in allowed_errors)
|
||||
if not found_allowed_error:
|
||||
raise Exception("None of the allowed_errors occured in the log")
|
||||
|
||||
@@ -21,11 +21,9 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
PgBin,
|
||||
S3Scrubber,
|
||||
flush_ep_to_pageserver,
|
||||
last_flush_lsn_upload,
|
||||
generate_uploads_and_deletions,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
@@ -33,12 +31,11 @@ from fixtures.pageserver.utils import (
|
||||
list_prefix,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
wait_for_upload_queue_empty,
|
||||
)
|
||||
from fixtures.remote_storage import (
|
||||
RemoteStorageKind,
|
||||
)
|
||||
from fixtures.utils import print_gc_result, wait_until
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
|
||||
# A tenant configuration that is convenient for generating uploads and deletions
|
||||
@@ -59,82 +56,6 @@ TENANT_CONF = {
|
||||
}
|
||||
|
||||
|
||||
def generate_uploads_and_deletions(
|
||||
env: NeonEnv,
|
||||
*,
|
||||
init: bool = True,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
data: Optional[str] = None,
|
||||
pageserver: NeonPageserver,
|
||||
):
|
||||
"""
|
||||
Using the environment's default tenant + timeline, generate a load pattern
|
||||
that results in some uploads and some deletions to remote storage.
|
||||
"""
|
||||
|
||||
if tenant_id is None:
|
||||
tenant_id = env.initial_tenant
|
||||
assert tenant_id is not None
|
||||
|
||||
if timeline_id is None:
|
||||
timeline_id = env.initial_timeline
|
||||
assert timeline_id is not None
|
||||
|
||||
ps_http = pageserver.http_client()
|
||||
|
||||
with env.endpoints.create_start(
|
||||
"main", tenant_id=tenant_id, pageserver_id=pageserver.id
|
||||
) as endpoint:
|
||||
if init:
|
||||
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
|
||||
last_flush_lsn_upload(
|
||||
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
|
||||
)
|
||||
|
||||
def churn(data):
|
||||
endpoint.safe_psql_many(
|
||||
[
|
||||
f"""
|
||||
INSERT INTO foo (id, val)
|
||||
SELECT g, '{data}'
|
||||
FROM generate_series(1, 200) g
|
||||
ON CONFLICT (id) DO UPDATE
|
||||
SET val = EXCLUDED.val
|
||||
""",
|
||||
# to ensure that GC can actually remove some layers
|
||||
"VACUUM foo",
|
||||
]
|
||||
)
|
||||
assert tenant_id is not None
|
||||
assert timeline_id is not None
|
||||
# We are waiting for uploads as well as local flush, in order to avoid leaving the system
|
||||
# in a state where there are "future layers" in remote storage that will generate deletions
|
||||
# after a restart.
|
||||
last_flush_lsn_upload(
|
||||
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
|
||||
)
|
||||
|
||||
# Compaction should generate some GC-elegible layers
|
||||
for i in range(0, 2):
|
||||
churn(f"{i if data is None else data}")
|
||||
|
||||
gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0)
|
||||
print_gc_result(gc_result)
|
||||
assert gc_result["layers_removed"] > 0
|
||||
|
||||
# Stop endpoint and flush all data to pageserver, then checkpoint it: this
|
||||
# ensures that the pageserver is in a fully idle state: there will be no more
|
||||
# background ingest, no more uploads pending, and therefore no non-determinism
|
||||
# in subsequent actions like pageserver restarts.
|
||||
final_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
# Finish uploads
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, final_lsn)
|
||||
# Finish all remote writes (including deletions)
|
||||
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
|
||||
|
||||
|
||||
def read_all(
|
||||
env: NeonEnv, tenant_id: Optional[TenantId] = None, timeline_id: Optional[TimelineId] = None
|
||||
):
|
||||
|
||||
Reference in New Issue
Block a user