mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 04:20:39 +00:00
Merge remote-tracking branch 'origin/main' into bayandin/preserve_database_files-on-CI
This commit is contained in:
@@ -285,6 +285,21 @@ def test_foobar(neon_env_builder: NeonEnvBuilder):
|
||||
...
|
||||
```
|
||||
|
||||
The env includes a default tenant and timeline. Therefore, you do not need to create your own
|
||||
tenant/timeline for testing.
|
||||
|
||||
```python
|
||||
def test_foobar2(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start() # Start the environment
|
||||
with env.endpoints.create_start("main") as endpoint:
|
||||
# Start the compute endpoint
|
||||
client = env.pageserver.http_client() # Get the pageserver client
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
client.timeline_detail(tenant_id=tenant_id, timeline_id=timeline_id)
|
||||
```
|
||||
|
||||
For more information about pytest fixtures, see https://docs.pytest.org/en/stable/fixture.html
|
||||
|
||||
At the end of a test, all the nodes in the environment are automatically stopped, so you
|
||||
|
||||
@@ -833,7 +833,7 @@ class NeonEnvBuilder:
|
||||
def enable_scrub_on_exit(self):
|
||||
"""
|
||||
Call this if you would like the fixture to automatically run
|
||||
s3_scrubber at the end of the test, as a bidirectional test
|
||||
storage_scrubber at the end of the test, as a bidirectional test
|
||||
that the scrubber is working properly, and that the code within
|
||||
the test didn't produce any invalid remote state.
|
||||
"""
|
||||
@@ -948,7 +948,7 @@ class NeonEnvBuilder:
|
||||
|
||||
if self.scrub_on_exit:
|
||||
try:
|
||||
S3Scrubber(self).scan_metadata()
|
||||
StorageScrubber(self).scan_metadata()
|
||||
except Exception as e:
|
||||
log.error(f"Error during remote storage scrub: {e}")
|
||||
cleanup_error = e
|
||||
@@ -3391,7 +3391,7 @@ def static_proxy(
|
||||
yield proxy
|
||||
|
||||
|
||||
class Endpoint(PgProtocol):
|
||||
class Endpoint(PgProtocol, LogUtils):
|
||||
"""An object representing a Postgres compute endpoint managed by the control plane."""
|
||||
|
||||
def __init__(
|
||||
@@ -3457,6 +3457,7 @@ class Endpoint(PgProtocol):
|
||||
)
|
||||
path = Path("endpoints") / self.endpoint_id / "pgdata"
|
||||
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
|
||||
self.logfile = self.endpoint_path() / "compute.log"
|
||||
|
||||
config_lines = config_lines or []
|
||||
|
||||
@@ -3941,7 +3942,7 @@ class Safekeeper(LogUtils):
|
||||
wait_until(20, 0.5, paused)
|
||||
|
||||
|
||||
class S3Scrubber:
|
||||
class StorageScrubber:
|
||||
def __init__(self, env: NeonEnvBuilder, log_dir: Optional[Path] = None):
|
||||
self.env = env
|
||||
self.log_dir = log_dir or env.test_output_dir
|
||||
@@ -3961,7 +3962,7 @@ class S3Scrubber:
|
||||
if s3_storage.endpoint is not None:
|
||||
env.update({"AWS_ENDPOINT_URL": s3_storage.endpoint})
|
||||
|
||||
base_args = [str(self.env.neon_binpath / "s3_scrubber")]
|
||||
base_args = [str(self.env.neon_binpath / "storage_scrubber")]
|
||||
args = base_args + args
|
||||
|
||||
(output_path, stdout, status_code) = subprocess_capture(
|
||||
|
||||
@@ -313,7 +313,7 @@ def assert_prefix_empty(
|
||||
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-5322/6207777020/index.html#suites/3556ed71f2d69272a7014df6dcb02317/53b5c368b5a68865
|
||||
# this seems like a mock_s3 issue
|
||||
log.warning(
|
||||
f"contrading ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}, assuming this means KeyCount=0"
|
||||
f"contradicting ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}, assuming this means KeyCount=0"
|
||||
)
|
||||
keys = 0
|
||||
elif keys != 0 and len(objects) == 0:
|
||||
|
||||
@@ -591,3 +591,20 @@ class PropagatingThread(threading.Thread):
|
||||
if self.exc:
|
||||
raise self.exc
|
||||
return self.ret
|
||||
|
||||
|
||||
def human_bytes(amt: float) -> str:
|
||||
"""
|
||||
Render a bytes amount into nice IEC bytes string.
|
||||
"""
|
||||
|
||||
suffixes = ["", "Ki", "Mi", "Gi"]
|
||||
|
||||
last = suffixes[-1]
|
||||
|
||||
for name in suffixes:
|
||||
if amt < 1024 or name == last:
|
||||
return f"{int(round(amt))} {name}B"
|
||||
amt = amt / 1024
|
||||
|
||||
raise RuntimeError("unreachable")
|
||||
|
||||
@@ -17,7 +17,7 @@ from fixtures.neon_fixtures import (
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import wait_for_upload_queue_empty
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.utils import human_bytes, wait_until
|
||||
|
||||
GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"
|
||||
|
||||
@@ -218,19 +218,6 @@ def count_layers_per_tenant(
|
||||
return dict(ret)
|
||||
|
||||
|
||||
def human_bytes(amt: float) -> str:
|
||||
suffixes = ["", "Ki", "Mi", "Gi"]
|
||||
|
||||
last = suffixes[-1]
|
||||
|
||||
for name in suffixes:
|
||||
if amt < 1024 or name == last:
|
||||
return f"{int(round(amt))} {name}B"
|
||||
amt = amt / 1024
|
||||
|
||||
raise RuntimeError("unreachable")
|
||||
|
||||
|
||||
def _eviction_env(
|
||||
request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, num_pageservers: int
|
||||
) -> EvictionEnv:
|
||||
@@ -294,7 +281,7 @@ def pgbench_init_tenant(
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
"checkpoint_distance": f"{layer_size}",
|
||||
"image_creation_threshold": "100",
|
||||
"image_creation_threshold": "999999",
|
||||
"compaction_target_size": f"{layer_size}",
|
||||
}
|
||||
)
|
||||
@@ -668,11 +655,10 @@ def test_fast_growing_tenant(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, or
|
||||
finish_tenant_creation(env, tenant_id, timeline_id, min_expected_layers)
|
||||
|
||||
tenant_layers = count_layers_per_tenant(env.pageserver, map(lambda x: x[0], timelines))
|
||||
(total_on_disk, _, _) = poor_mans_du(env, map(lambda x: x[0], timelines), env.pageserver, False)
|
||||
(total_on_disk, _, _) = poor_mans_du(env, map(lambda x: x[0], timelines), env.pageserver, True)
|
||||
|
||||
# cut 10 percent
|
||||
response = env.pageserver.http_client().disk_usage_eviction_run(
|
||||
{"evict_bytes": total_on_disk // 10, "eviction_order": order.config()}
|
||||
{"evict_bytes": total_on_disk // 5, "eviction_order": order.config()}
|
||||
)
|
||||
log.info(f"{response}")
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import asyncio
|
||||
import os
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from functools import partial
|
||||
@@ -18,20 +17,6 @@ from fixtures.neon_fixtures import (
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
# Check for corrupted WAL messages which might otherwise go unnoticed if
|
||||
# reconnection fixes this.
|
||||
def scan_standby_log_for_errors(secondary):
|
||||
log_path = secondary.endpoint_path() / "compute.log"
|
||||
with log_path.open("r") as f:
|
||||
markers = re.compile(
|
||||
r"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr"
|
||||
)
|
||||
for line in f:
|
||||
if markers.search(line):
|
||||
log.info(f"bad error in standby log: {line}")
|
||||
raise AssertionError()
|
||||
|
||||
|
||||
def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
@@ -91,7 +76,11 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
assert response is not None
|
||||
assert response == responses[query]
|
||||
|
||||
scan_standby_log_for_errors(secondary)
|
||||
# Check for corrupted WAL messages which might otherwise go unnoticed if
|
||||
# reconnection fixes this.
|
||||
assert not secondary.log_contains(
|
||||
"incorrect resource manager data|record with incorrect|invalid magic number|unexpected pageaddr"
|
||||
)
|
||||
|
||||
# clean up
|
||||
if slow_down_send:
|
||||
|
||||
151
test_runner/regress/test_ingestion_layer_size.py
Normal file
151
test_runner/regress/test_ingestion_layer_size.py
Normal file
@@ -0,0 +1,151 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable, List, Union
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_last_flush_lsn
|
||||
from fixtures.pageserver.http import HistoricLayerInfo, LayerMapInfo
|
||||
from fixtures.utils import human_bytes
|
||||
|
||||
|
||||
def test_ingesting_large_batches_of_images(neon_env_builder: NeonEnvBuilder, build_type: str):
|
||||
"""
|
||||
Build a non-small GIN index which includes similarly batched up images in WAL stream as does pgvector
|
||||
to show that we no longer create oversized layers.
|
||||
"""
|
||||
|
||||
if build_type == "debug":
|
||||
pytest.skip("debug run is unnecessarily slow")
|
||||
|
||||
minimum_initdb_size = 20 * 1024**2
|
||||
checkpoint_distance = 32 * 1024**2
|
||||
minimum_good_layer_size = checkpoint_distance * 0.9
|
||||
minimum_too_large_layer_size = 2 * checkpoint_distance
|
||||
|
||||
# index size: 99MiB
|
||||
rows = 2_500_000
|
||||
|
||||
# bucket lower limits
|
||||
buckets = [0, minimum_initdb_size, minimum_good_layer_size, minimum_too_large_layer_size]
|
||||
|
||||
assert (
|
||||
minimum_initdb_size < minimum_good_layer_size
|
||||
), "keep checkpoint_distance higher than the initdb size (find it by experimenting)"
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"checkpoint_distance": f"{checkpoint_distance}",
|
||||
"compaction_target_size": f"{checkpoint_distance}",
|
||||
# this test is primarly interested in L0 sizes but we'll compact after ingestion to ensure sizes are good even then
|
||||
"compaction_period": "0s",
|
||||
"gc_period": "0s",
|
||||
"compaction_threshold": "255",
|
||||
"image_creation_threshold": "99999",
|
||||
}
|
||||
)
|
||||
|
||||
# build a larger than 3*checkpoint_distance sized gin index.
|
||||
# gin index building exhibits the same behaviour as the pgvector with the two phase build
|
||||
with env.endpoints.create_start("main") as ep, ep.cursor() as cur:
|
||||
cur.execute(
|
||||
f"create table int_array_test as select array_agg(g) as int_array from generate_series(1, {rows}) g group by g / 10;"
|
||||
)
|
||||
cur.execute(
|
||||
"create index int_array_test_gin_index on int_array_test using gin (int_array);"
|
||||
)
|
||||
cur.execute("select pg_table_size('int_array_test_gin_index')")
|
||||
size = cur.fetchone()
|
||||
assert size is not None
|
||||
assert isinstance(size[0], int)
|
||||
log.info(f"gin index size: {human_bytes(size[0])}")
|
||||
assert (
|
||||
size[0] > checkpoint_distance * 3
|
||||
), f"gin index is not large enough: {human_bytes(size[0])}"
|
||||
wait_for_last_flush_lsn(env, ep, env.initial_tenant, env.initial_timeline)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
ps_http.timeline_checkpoint(env.initial_tenant, env.initial_timeline)
|
||||
|
||||
infos = ps_http.layer_map_info(env.initial_tenant, env.initial_timeline)
|
||||
assert len(infos.in_memory_layers) == 0, "should had flushed open layers"
|
||||
post_ingest = histogram_historic_layers(infos, buckets)
|
||||
|
||||
# describe first, assert later for easier debugging
|
||||
log.info("non-cumulative layer size distribution after ingestion:")
|
||||
print_layer_size_histogram(post_ingest)
|
||||
|
||||
# since all we have are L0s, we should be getting nice L1s and images out of them now
|
||||
ps_http.patch_tenant_config_client_side(
|
||||
env.initial_tenant,
|
||||
{
|
||||
"compaction_threshold": 1,
|
||||
"image_creation_threshold": 1,
|
||||
},
|
||||
)
|
||||
|
||||
ps_http.timeline_compact(env.initial_tenant, env.initial_timeline, True, True)
|
||||
|
||||
infos = ps_http.layer_map_info(env.initial_tenant, env.initial_timeline)
|
||||
assert len(infos.in_memory_layers) == 0, "no new inmem layers expected"
|
||||
post_compact = histogram_historic_layers(infos, buckets)
|
||||
|
||||
log.info("non-cumulative layer size distribution after compaction:")
|
||||
print_layer_size_histogram(post_compact)
|
||||
|
||||
assert (
|
||||
post_ingest.counts[3] == 0
|
||||
), f"there should be no layers larger than 2*checkpoint_distance ({human_bytes(2*checkpoint_distance)})"
|
||||
assert post_ingest.counts[1] == 1, "expect one smaller layer for initdb"
|
||||
assert (
|
||||
post_ingest.counts[0] <= 1
|
||||
), "expect at most one tiny layer from shutting down the endpoint"
|
||||
|
||||
# just make sure we don't have trouble splitting the layers apart
|
||||
assert post_compact.counts[3] == 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class Histogram:
|
||||
buckets: List[Union[int, float]]
|
||||
counts: List[int]
|
||||
sums: List[int]
|
||||
|
||||
|
||||
def histogram_historic_layers(
|
||||
infos: LayerMapInfo, minimum_sizes: List[Union[int, float]]
|
||||
) -> Histogram:
|
||||
def log_layer(layer: HistoricLayerInfo) -> HistoricLayerInfo:
|
||||
log.info(
|
||||
f"{layer.layer_file_name} {human_bytes(layer.layer_file_size)} ({layer.layer_file_size} bytes)"
|
||||
)
|
||||
return layer
|
||||
|
||||
layers = map(log_layer, infos.historic_layers)
|
||||
sizes = (x.layer_file_size for x in layers)
|
||||
return histogram(sizes, minimum_sizes)
|
||||
|
||||
|
||||
def histogram(sizes: Iterable[int], minimum_sizes: List[Union[int, float]]) -> Histogram:
|
||||
assert all(minimum_sizes[i] < minimum_sizes[i + 1] for i in range(len(minimum_sizes) - 1))
|
||||
buckets = list(enumerate(minimum_sizes))
|
||||
counts = [0 for _ in buckets]
|
||||
sums = [0 for _ in buckets]
|
||||
|
||||
for size in sizes:
|
||||
found = False
|
||||
for index, min_size in reversed(buckets):
|
||||
if size >= min_size:
|
||||
counts[index] += 1
|
||||
sums[index] += size
|
||||
found = True
|
||||
break
|
||||
assert found
|
||||
|
||||
return Histogram(minimum_sizes, counts, sums)
|
||||
|
||||
|
||||
def print_layer_size_histogram(h: Histogram):
|
||||
for index, min_size in enumerate(h.buckets):
|
||||
log.info(
|
||||
f">= {human_bytes(min_size)}: {h.counts[index]} layers total {human_bytes(h.sums[index])}"
|
||||
)
|
||||
@@ -37,7 +37,8 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"]
|
||||
)
|
||||
|
||||
@@ -221,6 +221,35 @@ def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
wait_until(number_of_iterations=10, interval=2, func=partial(slot_removed, endpoint))
|
||||
|
||||
|
||||
def test_ondemand_wal_download_in_replication_slot_funcs(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("init")
|
||||
endpoint = env.endpoints.create_start("init")
|
||||
|
||||
with endpoint.connect().cursor() as cur:
|
||||
cur.execute("create table wal_generator (id serial primary key, data text)")
|
||||
cur.execute(
|
||||
"SELECT * FROM pg_create_logical_replication_slot('slotty_mcslotface', 'test_decoding')"
|
||||
)
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO wal_generator (data)
|
||||
SELECT repeat('A', 1024) -- Generates a kilobyte of data per row
|
||||
FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of data
|
||||
"""
|
||||
)
|
||||
|
||||
endpoint.stop_and_destroy()
|
||||
endpoint = env.endpoints.create_start("init")
|
||||
|
||||
with endpoint.connect().cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT * FROM pg_logical_slot_peek_binary_changes('slotty_mcslotface', NULL, NULL, 'include-xids', '0')"
|
||||
)
|
||||
|
||||
|
||||
# Tests that walsender correctly blocks until WAL is downloaded from safekeepers
|
||||
def test_lr_with_slow_safekeeper(neon_env_builder: NeonEnvBuilder, vanilla_pg):
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
@@ -247,6 +276,7 @@ FROM generate_series(1, 16384) AS seq; -- Inserts enough rows to exceed 16MB of
|
||||
connstr = endpoint.connstr().replace("'", "''")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub")
|
||||
logical_replication_sync(vanilla_pg, endpoint)
|
||||
|
||||
vanilla_pg.stop()
|
||||
|
||||
# Pause the safekeepers so that they can't send WAL (except to pageserver)
|
||||
|
||||
@@ -8,8 +8,6 @@ def test_migrations(neon_simple_env: NeonEnv):
|
||||
env.neon_cli.create_branch("test_migrations", "empty")
|
||||
|
||||
endpoint = env.endpoints.create("test_migrations")
|
||||
log_path = endpoint.endpoint_path() / "compute.log"
|
||||
|
||||
endpoint.respec(skip_pg_catalog_updates=False)
|
||||
endpoint.start()
|
||||
|
||||
@@ -22,9 +20,7 @@ def test_migrations(neon_simple_env: NeonEnv):
|
||||
migration_id = cur.fetchall()
|
||||
assert migration_id[0][0] == num_migrations
|
||||
|
||||
with open(log_path, "r") as log_file:
|
||||
logs = log_file.read()
|
||||
assert f"INFO handle_migrations: Ran {num_migrations} migrations" in logs
|
||||
endpoint.assert_log_contains(f"INFO handle_migrations: Ran {num_migrations} migrations")
|
||||
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
@@ -36,6 +32,4 @@ def test_migrations(neon_simple_env: NeonEnv):
|
||||
migration_id = cur.fetchall()
|
||||
assert migration_id[0][0] == num_migrations
|
||||
|
||||
with open(log_path, "r") as log_file:
|
||||
logs = log_file.read()
|
||||
assert "INFO handle_migrations: Ran 0 migrations" in logs
|
||||
endpoint.assert_log_contains("INFO handle_migrations: Ran 0 migrations")
|
||||
|
||||
@@ -22,7 +22,7 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
S3Scrubber,
|
||||
StorageScrubber,
|
||||
generate_uploads_and_deletions,
|
||||
)
|
||||
from fixtures.pageserver.common_types import parse_layer_file_name
|
||||
@@ -215,7 +215,7 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Having written a mixture of generation-aware and legacy index_part.json,
|
||||
# ensure the scrubber handles the situation as expected.
|
||||
metadata_summary = S3Scrubber(neon_env_builder).scan_metadata()
|
||||
metadata_summary = StorageScrubber(neon_env_builder).scan_metadata()
|
||||
assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline
|
||||
assert metadata_summary["timeline_count"] == 1
|
||||
assert metadata_summary["timeline_shard_count"] == 1
|
||||
|
||||
@@ -7,7 +7,7 @@ from typing import Any, Dict, Optional
|
||||
import pytest
|
||||
from fixtures.common_types import TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver, S3Scrubber
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver, StorageScrubber
|
||||
from fixtures.pageserver.common_types import parse_layer_file_name
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_prefix_empty,
|
||||
@@ -214,7 +214,7 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
|
||||
# Having done a bunch of attach/detach cycles, we will have generated some index garbage: check
|
||||
# that the scrubber sees it and cleans it up. We do this before the final attach+validate pass,
|
||||
# to also validate that the scrubber isn't breaking anything.
|
||||
gc_summary = S3Scrubber(neon_env_builder).pageserver_physical_gc(min_age_secs=1)
|
||||
gc_summary = StorageScrubber(neon_env_builder).pageserver_physical_gc(min_age_secs=1)
|
||||
assert gc_summary["remote_storage_errors"] == 0
|
||||
assert gc_summary["indices_deleted"] > 0
|
||||
|
||||
@@ -536,7 +536,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
# Scrub the remote storage
|
||||
# ========================
|
||||
# This confirms that the scrubber isn't upset by the presence of the heatmap
|
||||
S3Scrubber(neon_env_builder).scan_metadata()
|
||||
StorageScrubber(neon_env_builder).scan_metadata()
|
||||
|
||||
# Detach secondary and delete tenant
|
||||
# ===================================
|
||||
@@ -563,6 +563,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
),
|
||||
)
|
||||
workload.stop()
|
||||
|
||||
|
||||
def test_secondary_background_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
@@ -2,6 +2,7 @@ import time
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
@@ -32,7 +33,12 @@ def test_tenant_s3_restore(
|
||||
assert remote_storage, "remote storage not configured"
|
||||
enable_remote_storage_versioning(remote_storage)
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
|
||||
# change it back after initdb, recovery doesn't work if the two
|
||||
# index_part.json uploads happen at same second or too close to each other.
|
||||
initial_tenant_conf = MANY_SMALL_LAYERS_TENANT_CONFIG
|
||||
del initial_tenant_conf["checkpoint_distance"]
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf)
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# The deletion queue will complain when it encounters simulated S3 errors
|
||||
@@ -43,14 +49,16 @@ def test_tenant_s3_restore(
|
||||
)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
# now lets create the small layers
|
||||
ps_http.set_tenant_config(tenant_id, MANY_SMALL_LAYERS_TENANT_CONFIG)
|
||||
|
||||
# Default tenant and the one we created
|
||||
assert ps_http.get_metric_value("pageserver_tenant_manager_slots", {"mode": "attached"}) == 1
|
||||
|
||||
# create two timelines one being the parent of another, both with non-trivial data
|
||||
parent = None
|
||||
parent = "main"
|
||||
last_flush_lsns = []
|
||||
|
||||
for timeline in ["first", "second"]:
|
||||
@@ -64,6 +72,7 @@ def test_tenant_s3_restore(
|
||||
last_flush_lsns.append(last_flush_lsn)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
|
||||
log.info(f"{timeline} timeline {timeline_id} {last_flush_lsn=}")
|
||||
parent = timeline
|
||||
|
||||
# These sleeps are important because they fend off differences in clocks between us and S3
|
||||
@@ -108,6 +117,9 @@ def test_tenant_s3_restore(
|
||||
ps_http.tenant_attach(tenant_id, generation=generation)
|
||||
env.pageserver.quiesce_tenants()
|
||||
|
||||
for tline in ps_http.timeline_list(env.initial_tenant):
|
||||
log.info(f"timeline detail: {tline}")
|
||||
|
||||
for i, timeline in enumerate(["first", "second"]):
|
||||
with env.endpoints.create_start(timeline, tenant_id=tenant_id) as endpoint:
|
||||
endpoint.safe_psql(f"SELECT * FROM created_{timeline};")
|
||||
|
||||
@@ -11,8 +11,8 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
S3Scrubber,
|
||||
StorageControllerApiException,
|
||||
StorageScrubber,
|
||||
last_flush_lsn_upload,
|
||||
tenant_get_shards,
|
||||
wait_for_last_flush_lsn,
|
||||
@@ -126,7 +126,7 @@ def test_sharding_smoke(
|
||||
|
||||
# Check the scrubber isn't confused by sharded content, then disable
|
||||
# it during teardown because we'll have deleted by then
|
||||
S3Scrubber(neon_env_builder).scan_metadata()
|
||||
StorageScrubber(neon_env_builder).scan_metadata()
|
||||
neon_env_builder.scrub_on_exit = False
|
||||
|
||||
env.storage_controller.pageserver_api().tenant_delete(tenant_id)
|
||||
@@ -693,6 +693,9 @@ def test_sharding_ingest_layer_sizes(
|
||||
# small checkpointing and compaction targets to ensure we generate many upload operations
|
||||
"checkpoint_distance": f"{expect_layer_size}",
|
||||
"compaction_target_size": f"{expect_layer_size}",
|
||||
# aim to reduce flakyness, we are not doing explicit checkpointing
|
||||
"compaction_period": "0s",
|
||||
"gc_period": "0s",
|
||||
}
|
||||
shard_count = 4
|
||||
neon_env_builder.num_pageservers = shard_count
|
||||
@@ -708,6 +711,23 @@ def test_sharding_ingest_layer_sizes(
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
# ignore the initdb layer(s) for the purposes of the size comparison as a initdb image layer optimization
|
||||
# will produce a lot more smaller layers.
|
||||
initial_layers_per_shard = {}
|
||||
log.info("initdb distribution (not asserted on):")
|
||||
for shard in env.storage_controller.locate(tenant_id):
|
||||
pageserver = env.get_pageserver(shard["node_id"])
|
||||
shard_id = shard["shard_id"]
|
||||
layers = (
|
||||
env.get_pageserver(shard["node_id"]).http_client().layer_map_info(shard_id, timeline_id)
|
||||
)
|
||||
for layer in layers.historic_layers:
|
||||
log.info(
|
||||
f"layer[{pageserver.id}]: {layer.layer_file_name} (size {layer.layer_file_size})"
|
||||
)
|
||||
|
||||
initial_layers_per_shard[shard_id] = set(layers.historic_layers)
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
workload.write_rows(4096, upload=False)
|
||||
@@ -729,7 +749,13 @@ def test_sharding_ingest_layer_sizes(
|
||||
|
||||
historic_layers = sorted(layer_map.historic_layers, key=lambda layer: layer.lsn_start)
|
||||
|
||||
initial_layers = initial_layers_per_shard[shard_id]
|
||||
|
||||
for layer in historic_layers:
|
||||
if layer in initial_layers:
|
||||
# ignore the initdb image layers for the size histogram
|
||||
continue
|
||||
|
||||
if layer.layer_file_size < expect_layer_size // 2:
|
||||
classification = "Small"
|
||||
small_layer_count += 1
|
||||
@@ -759,7 +785,8 @@ def test_sharding_ingest_layer_sizes(
|
||||
pass
|
||||
else:
|
||||
# General case:
|
||||
assert float(small_layer_count) / float(ok_layer_count) < 0.25
|
||||
# old limit was 0.25 but pg14 is right at the limit with 7/28
|
||||
assert float(small_layer_count) / float(ok_layer_count) < 0.3
|
||||
|
||||
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
|
||||
assert huge_layer_count <= shard_count
|
||||
|
||||
@@ -6,7 +6,7 @@ import pytest
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
S3Scrubber,
|
||||
StorageScrubber,
|
||||
)
|
||||
from fixtures.remote_storage import S3Storage, s3_storage
|
||||
from fixtures.workload import Workload
|
||||
@@ -60,7 +60,7 @@ def test_scrubber_tenant_snapshot(neon_env_builder: NeonEnvBuilder, shard_count:
|
||||
output_path = neon_env_builder.test_output_dir / "snapshot"
|
||||
os.makedirs(output_path)
|
||||
|
||||
scrubber = S3Scrubber(neon_env_builder)
|
||||
scrubber = StorageScrubber(neon_env_builder)
|
||||
scrubber.tenant_snapshot(tenant_id, output_path)
|
||||
|
||||
assert len(os.listdir(output_path)) > 0
|
||||
@@ -143,18 +143,18 @@ def test_scrubber_physical_gc(neon_env_builder: NeonEnvBuilder, shard_count: Opt
|
||||
workload.write_rows(1)
|
||||
|
||||
# With a high min_age, the scrubber should decline to delete anything
|
||||
gc_summary = S3Scrubber(neon_env_builder).pageserver_physical_gc(min_age_secs=3600)
|
||||
gc_summary = StorageScrubber(neon_env_builder).pageserver_physical_gc(min_age_secs=3600)
|
||||
assert gc_summary["remote_storage_errors"] == 0
|
||||
assert gc_summary["indices_deleted"] == 0
|
||||
|
||||
# If targeting a different tenant, the scrubber shouldn't do anything
|
||||
gc_summary = S3Scrubber(neon_env_builder).pageserver_physical_gc(
|
||||
gc_summary = StorageScrubber(neon_env_builder).pageserver_physical_gc(
|
||||
min_age_secs=1, tenant_ids=[TenantId.generate()]
|
||||
)
|
||||
assert gc_summary["remote_storage_errors"] == 0
|
||||
assert gc_summary["indices_deleted"] == 0
|
||||
|
||||
# With a low min_age, the scrubber should go ahead and clean up all but the latest 2 generations
|
||||
gc_summary = S3Scrubber(neon_env_builder).pageserver_physical_gc(min_age_secs=1)
|
||||
gc_summary = StorageScrubber(neon_env_builder).pageserver_physical_gc(min_age_secs=1)
|
||||
assert gc_summary["remote_storage_errors"] == 0
|
||||
assert gc_summary["indices_deleted"] == (expect_indices_per_shard - 2) * shard_count
|
||||
@@ -10,7 +10,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
S3Scrubber,
|
||||
StorageScrubber,
|
||||
last_flush_lsn_upload,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
@@ -707,7 +707,7 @@ def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder)
|
||||
|
||||
remote_storage_kind = RemoteStorageKind.MOCK_S3
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
scrubber = S3Scrubber(neon_env_builder)
|
||||
scrubber = StorageScrubber(neon_env_builder)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import time
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, fork_at_current_lsn
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
#
|
||||
@@ -113,11 +115,90 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
assert cur_new.fetchall() == []
|
||||
|
||||
|
||||
#
|
||||
# Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK
|
||||
# record.
|
||||
#
|
||||
def test_vm_bit_clear_on_heap_lock(neon_env_builder: NeonEnvBuilder):
|
||||
def test_vm_bit_clear_on_heap_lock_whitebox(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK record.
|
||||
|
||||
This is a repro for the bug fixed in commit 66fa176cc8.
|
||||
"""
|
||||
# Debugging https://github.com/neondatabase/neon/issues/6967
|
||||
neon_env_builder.preserve_database_files = True
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
# If auto-analyze runs at the same time that we run VACUUM FREEZE, it
|
||||
# can hold a snasphot that prevent the tuples from being frozen.
|
||||
"autovacuum=off",
|
||||
"log_checkpoints=on",
|
||||
],
|
||||
)
|
||||
|
||||
# Run the tests in a dedicated database, because the activity monitor
|
||||
# periodically runs some queries on to the 'postgres' database. If that
|
||||
# happens at the same time that we're trying to freeze, the activity
|
||||
# monitor's queries can hold back the xmin horizon and prevent freezing.
|
||||
with closing(endpoint.connect()) as pg_conn:
|
||||
pg_conn.cursor().execute("CREATE DATABASE vmbitsdb")
|
||||
pg_conn = endpoint.connect(dbname="vmbitsdb")
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# Install extension containing function needed for test
|
||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
cur.execute("CREATE EXTENSION pageinspect")
|
||||
|
||||
# Create a test table and freeze it to set the all-frozen VM bit on all pages.
|
||||
cur.execute("CREATE TABLE vmtest_lock (id integer PRIMARY KEY)")
|
||||
cur.execute("BEGIN")
|
||||
cur.execute("INSERT INTO vmtest_lock SELECT g FROM generate_series(1, 50000) g")
|
||||
xid = int(query_scalar(cur, "SELECT txid_current()"))
|
||||
cur.execute("COMMIT")
|
||||
cur.execute("VACUUM (FREEZE, DISABLE_PAGE_SKIPPING true, VERBOSE) vmtest_lock")
|
||||
for notice in pg_conn.notices:
|
||||
log.info(f"{notice}")
|
||||
|
||||
# This test has been flaky in the past, because background activity like
|
||||
# auto-analyze and compute_ctl's activity monitor queries have prevented the
|
||||
# tuples from being frozen. Check that they were frozen.
|
||||
relfrozenxid = int(
|
||||
query_scalar(cur, "SELECT relfrozenxid FROM pg_class WHERE relname='vmtest_lock'")
|
||||
)
|
||||
assert (
|
||||
relfrozenxid > xid
|
||||
), f"Inserted rows were not frozen. This can be caused by concurrent activity in the database. (XID {xid}, relfrozenxid {relfrozenxid}"
|
||||
|
||||
# Lock a row. This clears the all-frozen VM bit for that page.
|
||||
cur.execute("BEGIN")
|
||||
cur.execute("SELECT * FROM vmtest_lock WHERE id = 40000 FOR UPDATE")
|
||||
cur.execute("COMMIT")
|
||||
|
||||
# The VM page in shared buffer cache, and the same page as reconstructed by
|
||||
# the pageserver, should be equal. Except for the LSN: Clearing a bit in the
|
||||
# VM doesn't bump the LSN in PostgreSQL, but the pageserver updates the LSN
|
||||
# when it replays the VM-bit clearing record (since commit 387a36874c)
|
||||
#
|
||||
# This is a bit fragile, we've had lot of flakiness in this test before. For
|
||||
# example, because all the VM bits were not set because concurrent
|
||||
# autoanalyze prevented the VACUUM FREEZE from freezing the tuples. Or
|
||||
# because autoavacuum kicked in and re-froze the page between the
|
||||
# get_raw_page() and get_raw_page_at_lsn() calls. We disable autovacuum now,
|
||||
# which should make this deterministic.
|
||||
cur.execute("select get_raw_page( 'vmtest_lock', 'vm', 0 )")
|
||||
vm_page_in_cache = (cur.fetchall()[0][0])[8:100].hex()
|
||||
cur.execute(
|
||||
"select get_raw_page_at_lsn( 'vmtest_lock', 'vm', 0, pg_current_wal_insert_lsn(), NULL )"
|
||||
)
|
||||
vm_page_at_pageserver = (cur.fetchall()[0][0])[8:100].hex()
|
||||
|
||||
assert vm_page_at_pageserver == vm_page_in_cache
|
||||
|
||||
|
||||
def test_vm_bit_clear_on_heap_lock_blackbox(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
The previous test is enough to verify the bug that was fixed in
|
||||
commit 66fa176cc8. But for good measure, we also reproduce the
|
||||
original problem that the missing VM page update caused.
|
||||
"""
|
||||
# Debugging https://github.com/neondatabase/neon/issues/6967
|
||||
neon_env_builder.preserve_database_files = True
|
||||
|
||||
@@ -133,9 +214,9 @@ def test_vm_bit_clear_on_heap_lock(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_vm_bit_clear_on_heap_lock")
|
||||
timeline_id = env.initial_timeline
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_vm_bit_clear_on_heap_lock",
|
||||
"main",
|
||||
config_lines=[
|
||||
"log_autovacuum_min_duration = 0",
|
||||
# Perform anti-wraparound vacuuming aggressively
|
||||
@@ -149,12 +230,10 @@ def test_vm_bit_clear_on_heap_lock(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Install extension containing function needed for test
|
||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
cur.execute("CREATE EXTENSION pageinspect")
|
||||
|
||||
# Create a test table and freeze it to set the all-frozen VM bit on all pages.
|
||||
cur.execute("CREATE TABLE vmtest_lock (id integer PRIMARY KEY)")
|
||||
cur.execute("INSERT INTO vmtest_lock SELECT g FROM generate_series(1, 50000) g")
|
||||
|
||||
cur.execute("VACUUM (FREEZE, DISABLE_PAGE_SKIPPING true) vmtest_lock")
|
||||
|
||||
# Lock a row. This clears the all-frozen VM bit for that page.
|
||||
@@ -168,27 +247,6 @@ def test_vm_bit_clear_on_heap_lock(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
cur.execute("COMMIT")
|
||||
|
||||
# The VM page in shared buffer cache, and the same page as reconstructed
|
||||
# by the pageserver, should be equal.
|
||||
#
|
||||
# Ignore page header (24 bytes) of visibility map.
|
||||
# If the dirty VM page is flushed from the cache for some reason,
|
||||
# it gets WAL-logged, which changes the LSN on the page.
|
||||
# Also in neon SMGR we can replace empty heap page with zero (uninitialized) heap page.
|
||||
cur.execute("select get_raw_page( 'vmtest_lock', 'vm', 0 )")
|
||||
vm_page_in_cache = (cur.fetchall()[0][0])[24:100].hex()
|
||||
cur.execute(
|
||||
"select get_raw_page_at_lsn( 'vmtest_lock', 'vm', 0, pg_current_wal_insert_lsn(), NULL )"
|
||||
)
|
||||
vm_page_at_pageserver = (cur.fetchall()[0][0])[24:100].hex()
|
||||
|
||||
assert vm_page_at_pageserver == vm_page_in_cache
|
||||
|
||||
# The above assert is enough to verify the bug that was fixed in
|
||||
# commit 66fa176cc8. But for good measure, we also reproduce the
|
||||
# original problem that the missing VM page update caused. The
|
||||
# rest of the test does that.
|
||||
|
||||
# Kill and restart postgres, to clear the buffer cache.
|
||||
#
|
||||
# NOTE: clear_buffer_cache() will not do, because it evicts the dirty pages
|
||||
|
||||
Reference in New Issue
Block a user