Files
neon/test_runner/regress/test_layer_eviction.py
Alexander Bayandin 30a7dd630c ruff: enable TC — flake8-type-checking (#11368)
## Problem

`TYPE_CHECKING` is used inconsistently across Python tests.

## Summary of changes
- Update `ruff`: 0.7.0 -> 0.11.2
- Enable TC (flake8-type-checking):
https://docs.astral.sh/ruff/rules/#flake8-type-checking-tc
- (auto)fix all new issues
2025-03-30 18:58:33 +00:00

311 lines
14 KiB
Python

from __future__ import annotations
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
flush_ep_to_pageserver,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.common_types import parse_layer_file_name
from fixtures.pageserver.utils import wait_for_upload
from fixtures.remote_storage import RemoteStorageKind
from fixtures.utils import skip_in_debug_build
# Crates a few layers, ensures that we can evict them (removing locally but keeping track of them anyway)
# and then download them back.
@skip_in_debug_build("times out in debug builds")
def test_basic_eviction(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start(
initial_tenant_conf={
# disable gc and compaction background loops because they perform on-demand downloads
"gc_period": "0s",
"compaction_period": "0s",
}
)
client = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Create a number of layers in the tenant
with endpoint.cursor() as cur:
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 5000000) g
"""
)
# stops the endpoint
current_lsn = flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id)
client.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
# stop sks to avoid on-demand downloads by walreceiver / getpage; endpoint
# has already been stopped by flush_ep_to_pageserver
for sk in env.safekeepers:
sk.stop()
initial_local_layers = dict(
(parse_layer_file_name(path.name), path)
for path in env.pageserver.list_layers(tenant_id, timeline_id)
)
assert len(initial_local_layers) > 1, (
f"Should create multiple layers for timeline, but got {initial_local_layers}"
)
# Compare layer map dump with the local layers, ensure everything's present locally and matches
initial_layer_map_info = client.layer_map_info(tenant_id=tenant_id, timeline_id=timeline_id)
assert not initial_layer_map_info.in_memory_layers, (
"Should have no in memory layers after flushing"
)
assert len(initial_local_layers) == len(initial_layer_map_info.historic_layers), (
"Should have the same layers in memory and on disk"
)
for returned_layer in initial_layer_map_info.historic_layers:
assert returned_layer.kind == "Delta", (
f"Did not create and expect image layers, but got {returned_layer}"
)
assert not returned_layer.remote, (
f"All created layers should be present locally, but got {returned_layer}"
)
returned_layer_name = parse_layer_file_name(returned_layer.layer_file_name)
assert returned_layer_name in initial_local_layers, (
f"Did not find returned layer {returned_layer_name} in local layers {list(initial_local_layers.keys())}"
)
local_layer_path = (
env.pageserver.timeline_dir(tenant_id, timeline_id)
/ initial_local_layers[returned_layer_name]
)
assert returned_layer.layer_file_size == local_layer_path.stat().st_size, (
f"Returned layer {returned_layer} has a different file size than local layer {local_layer_path}"
)
# Detach all layers, ensre they are not in the local FS, but are still dumped as part of the layer map
for local_layer_name, local_layer_path in initial_local_layers.items():
client.evict_layer(
tenant_id=tenant_id, timeline_id=timeline_id, layer_name=local_layer_path.name
)
assert not env.pageserver.layer_exists(tenant_id, timeline_id, local_layer_name), (
f"Did not expect to find {local_layer_name} layer after evicting"
)
empty_layers = env.pageserver.list_layers(tenant_id, timeline_id)
assert not empty_layers, (
f"After evicting all layers, timeline {tenant_id}/{timeline_id} should have no layers locally, but got: {empty_layers}"
)
evicted_layer_map_info = client.layer_map_info(tenant_id=tenant_id, timeline_id=timeline_id)
assert not evicted_layer_map_info.in_memory_layers, (
"Should have no in memory layers after flushing and evicting"
)
assert len(initial_local_layers) == len(evicted_layer_map_info.historic_layers), (
"Should have the same layers in memory and on disk initially"
)
for returned_layer in evicted_layer_map_info.historic_layers:
assert returned_layer.kind == "Delta", (
f"Did not create and expect image layers, but got {returned_layer}"
)
assert returned_layer.remote, (
f"All layers should be evicted and not present locally, but got {returned_layer}"
)
returned_layer_name = parse_layer_file_name(returned_layer.layer_file_name)
assert returned_layer_name in initial_local_layers, (
f"Did not find returned layer {returned_layer} in local layers {initial_local_layers}"
)
# redownload all evicted layers and ensure the initial state is restored
for local_layer_name, _local_layer_path in initial_local_layers.items():
client.download_layer(
tenant_id=tenant_id, timeline_id=timeline_id, layer_name=local_layer_name.to_str()
)
client.timeline_download_remote_layers(
tenant_id,
timeline_id,
# allow some concurrency to unveil potential concurrency bugs
max_concurrent_downloads=10,
errors_ok=False,
at_least_one_download=False,
)
redownloaded_layers = dict(
(parse_layer_file_name(path.name), path)
for path in env.pageserver.list_layers(tenant_id, timeline_id)
)
assert redownloaded_layers == initial_local_layers, (
"Should have the same layers locally after redownloading the evicted layers"
)
redownloaded_layer_map_info = client.layer_map_info(
tenant_id=tenant_id, timeline_id=timeline_id
)
assert redownloaded_layer_map_info == initial_layer_map_info, (
"Should have the same layer map after redownloading the evicted layers"
)
def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
# don't create initial tenant, we'll create it manually with custom config
env = neon_env_builder.init_configs()
env.start()
tenant_config = {
"pitr_interval": "1s", # set to non-zero, so GC actually does something
"gc_period": "0s", # we want to control when GC runs
"compaction_period": "0s", # we want to control when compaction runs
"checkpoint_timeout": "24h", # something we won't reach
"checkpoint_distance": f"{50 * (1024**2)}", # something we won't reach, we checkpoint manually
"compaction_threshold": "3",
# "image_creation_threshold": set at runtime
"compaction_target_size": f"{128 * (1024**2)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
"image_layer_creation_check_threshold": "0", # always check if a new image layer can be created
"lsn_lease_length": "0s",
}
def tenant_update_config(changes):
tenant_config.update(changes)
env.config_tenant(tenant_id, tenant_config)
tenant_id, timeline_id = env.create_tenant(conf=tenant_config)
log.info("tenant id is %s", tenant_id)
env.initial_tenant = tenant_id # update_and_gc relies on this
ps_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
log.info("fill with data, creating delta & image layers, some of which are GC'able after")
# no particular reason to create the layers like this, but we are sure
# not to hit the image_creation_threshold here.
with endpoint.cursor() as cur:
cur.execute("create table a (id bigserial primary key, some_value bigint not null)")
cur.execute("insert into a(some_value) select i from generate_series(1, 10000) s(i)")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# Create delta layers, then turn them into image layers.
# Do it multiple times so that there's something to GC.
for k in range(0, 2):
# produce delta layers => disable image layer creation by setting high threshold
tenant_update_config({"image_creation_threshold": "100"})
for i in range(0, 2):
for j in range(0, 3):
# create a minimal amount of "delta difficulty" for this table
with endpoint.cursor() as cur:
cur.execute("update a set some_value = -some_value + %s", (j,))
with endpoint.cursor() as cur:
# vacuuming should aid to reuse keys, though it's not really important
# with image_creation_threshold=1 which we will use on the last compaction
cur.execute("vacuum")
last_lsn = wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
if i == 1 and j == 2 and k == 1:
# last iteration; stop before checkpoint to avoid leaving an inmemory layer
endpoint.stop_and_destroy()
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# images should not yet be created, because threshold is too high,
# but these will be reshuffled to L1 layers
ps_http.timeline_compact(tenant_id, timeline_id)
for _ in range(0, 20):
# loop in case flushing is still in progress
layers = ps_http.layer_map_info(tenant_id, timeline_id)
if not layers.in_memory_layers:
break
time.sleep(0.2)
# now that we've grown some delta layers, turn them into image layers
tenant_update_config({"image_creation_threshold": "1"})
ps_http.timeline_compact(tenant_id, timeline_id)
# wait for all uploads to finish (checkpoint has been done above)
wait_for_upload(ps_http, tenant_id, timeline_id, last_lsn)
# shutdown safekeepers to avoid on-demand downloads from walreceiver
for sk in env.safekeepers:
sk.stop()
ps_http.timeline_checkpoint(tenant_id, timeline_id)
log.info("ensure the code above produced image and delta layers")
pre_evict_info = ps_http.layer_map_info(tenant_id, timeline_id)
log.info("layer map dump: %s", pre_evict_info)
by_kind = pre_evict_info.kind_count()
log.info("by kind: %s", by_kind)
assert by_kind["Image"] > 0
assert by_kind["Delta"] > 0
assert by_kind["InMemory"] == 0
resident_layers = list(env.pageserver.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
log.info("resident layers count before eviction: %s", len(resident_layers))
log.info("evict all layers")
ps_http.evict_all_layers(tenant_id, timeline_id)
def ensure_resident_and_remote_size_metrics():
resident_layers = list(env.pageserver.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
# we have disabled all background loops, so, this should hold
assert len(resident_layers) == 0, "ensure that all the layers are gone"
info = ps_http.layer_map_info(tenant_id, timeline_id)
log.info("layer map dump: %s", info)
resident_physical_size_metric = ps_http.get_timeline_metric(
tenant_id, timeline_id, "pageserver_resident_physical_size"
)
assert resident_physical_size_metric == 0, (
"ensure that resident_physical_size metric is zero"
)
assert resident_physical_size_metric == sum(
layer.layer_file_size for layer in info.historic_layers if not layer.remote
), "ensure that resident_physical_size metric corresponds to layer map dump"
remote_physical_size_metric = ps_http.get_timeline_metric(
tenant_id, timeline_id, "pageserver_remote_physical_size"
)
assert remote_physical_size_metric == sum(
layer.layer_file_size for layer in info.historic_layers if layer.remote
), "ensure that remote_physical_size metric corresponds to layer map dump"
log.info("before runnning GC, ensure that remote_physical size is zero")
# leaving index_part.json upload from successful compaction out will show
# up here as a mismatch between remove_physical_size and summed up layermap
# size
ensure_resident_and_remote_size_metrics()
log.info("run GC")
time.sleep(2) # let pitr_interval + 1 second pass
ps_http.timeline_gc(tenant_id, timeline_id, 0)
time.sleep(1)
assert not env.pageserver.log_contains("Nothing to GC")
log.info("ensure GC deleted some layers, otherwise this test is pointless")
post_gc_info = ps_http.layer_map_info(tenant_id, timeline_id)
log.info("layer map dump: %s", post_gc_info)
log.info("by kind: %s", post_gc_info.kind_count())
pre_evict_layers = set([layer.layer_file_name for layer in pre_evict_info.historic_layers])
post_gc_layers = set([layer.layer_file_name for layer in post_gc_info.historic_layers])
assert post_gc_layers.issubset(pre_evict_layers)
assert len(post_gc_layers) < len(pre_evict_layers)
log.info("update_gc_info might download some layers. Evict them again.")
ps_http.evict_all_layers(tenant_id, timeline_id)
log.info("after running GC, ensure that resident size is still zero")
ensure_resident_and_remote_size_metrics()