Merge commit 'be5bbaeca' into problame/standby-horizon-leases

This commit is contained in:
Christian Schwarz
2025-08-06 17:46:44 +02:00
47 changed files with 1377 additions and 359 deletions

View File

@@ -1797,6 +1797,33 @@ def neon_env_builder(
record_property("preserve_database_files", builder.preserve_database_files)
@pytest.fixture(scope="function")
def neon_env_builder_local(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_distrib_dir: Path,
) -> NeonEnvBuilder:
"""
Fixture to create a Neon environment for test with its own pg_install copy.
This allows the test to edit the list of available extensions in the
local instance of Postgres used for the test, and install extensions via
downloading them when a remote extension is tested, for instance, or
copying files around for local extension testing.
"""
test_local_pginstall = test_output_dir / "pg_install"
log.info(f"copy {pg_distrib_dir} to {test_local_pginstall}")
# We can't copy only the version that we are currently testing because other
# binaries like the storage controller need specific Postgres versions.
shutil.copytree(pg_distrib_dir, test_local_pginstall)
neon_env_builder.pg_distrib_dir = test_local_pginstall
log.info(f"local neon_env_builder.pg_distrib_dir: {neon_env_builder.pg_distrib_dir}")
return neon_env_builder
@dataclass
class PageserverPort:
pg: int

View File

@@ -1002,7 +1002,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
def get_metrics_str(self) -> str:
"""You probably want to use get_metrics() instead."""
res = self.get(f"http://localhost:{self.port}/metrics")
res = self.get(f"http://localhost:{self.port}/metrics?use_latest=true")
self.verbose_error(res)
return res.text

View File

@@ -143,7 +143,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
def get_metrics_str(self) -> str:
"""You probably want to use get_metrics() instead."""
request_result = self.get(f"http://localhost:{self.port}/metrics")
request_result = self.get(f"http://localhost:{self.port}/metrics?use_latest=true")
request_result.raise_for_status()
return request_result.text

View File

@@ -0,0 +1,32 @@
\echo Use "CREATE EXTENSION test_event_trigger_extension" to load this file. \quit
CREATE SCHEMA event_trigger;
create sequence if not exists event_trigger.seq_schema_version as int cycle;
create or replace function event_trigger.increment_schema_version()
returns event_trigger
security definer
language plpgsql
as $$
begin
perform pg_catalog.nextval('event_trigger.seq_schema_version');
end;
$$;
create or replace function event_trigger.get_schema_version()
returns int
security definer
language sql
as $$
select last_value from event_trigger.seq_schema_version;
$$;
-- On DDL event, increment the schema version number
create event trigger event_trigger_watch_ddl
on ddl_command_end
execute procedure event_trigger.increment_schema_version();
create event trigger event_trigger_watch_drop
on sql_drop
execute procedure event_trigger.increment_schema_version();

View File

@@ -0,0 +1,8 @@
default_version = '1.0'
comment = 'Test extension with Event Trigger'
# make sure the extension objects are owned by the bootstrap user
# to check that the SECURITY DEFINER event trigger function is still
# called during non-superuser DDL events.
superuser = true
trusted = true

View File

@@ -165,6 +165,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"gc_horizon": 23 * (1024 * 1024),
"gc_period": "2h 13m",
"image_creation_threshold": 7,
"image_layer_force_creation_period": "1m",
"pitr_interval": "1m",
"lagging_wal_timeout": "23m",
"lazy_slru_download": True,

View File

@@ -7,6 +7,7 @@ from typing import TYPE_CHECKING
import pytest
from fixtures.common_types import Lsn, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import wait_for_last_flush_lsn
from fixtures.pageserver.http import TimelineCreate406
from fixtures.utils import query_scalar, skip_in_debug_build
@@ -162,6 +163,9 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
)
lsn = Lsn(res[2][0][0])
# Wait for all WAL to reach the pageserver, so GC cutoff LSN is greater than `lsn`.
wait_for_last_flush_lsn(env, endpoint0, tenant, b0)
# Use `failpoint=sleep` and `threading` to make the GC iteration triggers *before* the
# branch creation task but the individual timeline GC iteration happens *after*
# the branch creation task.

View File

@@ -944,3 +944,78 @@ def test_image_layer_compression(neon_env_builder: NeonEnvBuilder, enabled: bool
f"SELECT count(*) FROM foo WHERE id={v} and val=repeat('abcde{v:0>3}', 500)"
)
assert res[0][0] == 1
# BEGIN_HADRON
def get_layer_map(env, tenant_shard_id, timeline_id, ps_id):
client = env.pageservers[ps_id].http_client()
layer_map = client.layer_map_info(tenant_shard_id, timeline_id)
image_layer_count = 0
delta_layer_count = 0
for layer in layer_map.historic_layers:
if layer.kind == "Image":
image_layer_count += 1
elif layer.kind == "Delta":
delta_layer_count += 1
return image_layer_count, delta_layer_count
def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder):
"""
Tests that page server can force creating new images if image creation timeout is enabled
"""
# use large knobs to disable L0 compaction/image creation except for the force image creation
tenant_conf = {
"compaction_threshold": "100",
"image_creation_threshold": "100",
"image_layer_creation_check_threshold": "1",
"checkpoint_distance": 10 * 1024,
"checkpoint_timeout": "1s",
"image_layer_force_creation_period": "1s",
# The lsn for forced image layer creations is calculated once every 10 minutes.
# Hence, drive compaction manually such that the test doesn't compute it at the
# wrong time.
"compaction_period": "0s",
}
# consider every tenant large to run the image layer generation check more eagerly
neon_env_builder.pageserver_config_override = (
"image_layer_generation_large_timeline_threshold=0"
)
neon_env_builder.num_pageservers = 1
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
# Generate some rows.
for v in range(10):
endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))")
# Sleep a bit such that the inserts are considered when calculating the forced image layer creation LSN.
time.sleep(2)
def check_force_image_creation():
ps_http = env.pageserver.http_client()
ps_http.timeline_compact(tenant_id, timeline_id)
image, delta = get_layer_map(env, tenant_id, timeline_id, 0)
log.info(f"images: {image}, deltas: {delta}")
assert image > 0
env.pageserver.assert_log_contains("forcing L0 compaction of")
env.pageserver.assert_log_contains("forcing image creation for partitioned range")
wait_until(check_force_image_creation)
endpoint.stop_and_destroy()
env.pageserver.allowed_errors.append(
".*created delta file of size.*larger than double of target.*"
)
# END_HADRON

View File

@@ -2,7 +2,6 @@ from __future__ import annotations
import os
import platform
import shutil
import tarfile
from enum import StrEnum
from pathlib import Path
@@ -31,27 +30,6 @@ if TYPE_CHECKING:
from werkzeug.wrappers.request import Request
# use neon_env_builder_local fixture to override the default neon_env_builder fixture
# and use a test-specific pg_install instead of shared one
@pytest.fixture(scope="function")
def neon_env_builder_local(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_distrib_dir: Path,
) -> NeonEnvBuilder:
test_local_pginstall = test_output_dir / "pg_install"
log.info(f"copy {pg_distrib_dir} to {test_local_pginstall}")
# We can't copy only the version that we are currently testing because other
# binaries like the storage controller need specific Postgres versions.
shutil.copytree(pg_distrib_dir, test_local_pginstall)
neon_env_builder.pg_distrib_dir = test_local_pginstall
log.info(f"local neon_env_builder.pg_distrib_dir: {neon_env_builder.pg_distrib_dir}")
return neon_env_builder
@final
class RemoteExtension(StrEnum):
SQL_ONLY = "test_extension_sql_only"

View File

@@ -0,0 +1,102 @@
from __future__ import annotations
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, cast
import pytest
from fixtures.log_helper import log
from fixtures.paths import BASE_DIR
if TYPE_CHECKING:
from pathlib import Path
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.pg_version import PgVersion
# use neon_env_builder_local fixture to override the default neon_env_builder fixture
# and use a test-specific pg_install instead of shared one
@pytest.fixture(scope="function")
def neon_env_builder_event_trigger_extension(
neon_env_builder_local: NeonEnvBuilder,
test_output_dir: Path,
pg_version: PgVersion,
) -> NeonEnvBuilder:
test_local_pginstall = test_output_dir / "pg_install"
# Now copy the SQL only extension test_event_trigger_extension in the local
# pginstall extension directory on-disk
test_event_trigger_extension_dir = (
BASE_DIR / "test_runner" / "regress" / "data" / "test_event_trigger_extension"
)
test_local_extension_dir = (
test_local_pginstall / f"v{pg_version}" / "share" / "postgresql" / "extension"
)
log.info(f"copy {test_event_trigger_extension_dir} to {test_local_extension_dir}")
for f in [
test_event_trigger_extension_dir / "test_event_trigger_extension.control",
test_event_trigger_extension_dir / "test_event_trigger_extension--1.0.sql",
]:
shutil.copy(f, test_local_extension_dir)
return neon_env_builder_local
def test_event_trigger_extension(neon_env_builder_event_trigger_extension: NeonEnvBuilder):
"""
Test installing an extension that contains an Event Trigger.
The Event Trigger function is owned by the extension owner, which at
CREATE EXTENSION is going to be the Postgres bootstrap user, per the
extension control file where both superuser = true and trusted = true.
Also this function is SECURTY DEFINER, to allow for making changes to
the extension SQL objects, in our case a sequence.
This test makes sure that the event trigger function is fired correctly
by non-privileged user DDL actions such as CREATE TABLE.
"""
env = neon_env_builder_event_trigger_extension.init_start()
env.create_branch("test_event_trigger_extension")
endpoint = env.endpoints.create_start("test_event_trigger_extension")
extension = "test_event_trigger_extension"
database = "test_event_trigger_extension"
endpoint.safe_psql(f"CREATE DATABASE {database}")
endpoint.safe_psql(f"CREATE EXTENSION {extension}", dbname=database)
# check that the extension is owned by the bootstrap superuser (cloud_admin)
pg_bootstrap_superuser_name = "cloud_admin"
with endpoint.connect(dbname=database) as pg_conn:
with pg_conn.cursor() as cur:
cur.execute(
f"select rolname from pg_roles r join pg_extension e on r.oid = e.extowner where extname = '{extension}'"
)
owner = cast("tuple[str]", cur.fetchone())[0]
assert owner == pg_bootstrap_superuser_name, (
f"extension {extension} is not owned by bootstrap user '{pg_bootstrap_superuser_name}'"
)
# test that the SQL-only Event Trigger (SECURITY DEFINER function) runs
# correctly now that the extension has been installed
#
# create table to trigger the event trigger, twice, check sequence count
with endpoint.connect(dbname=database) as pg_conn:
log.info("creating SQL objects (tables)")
with pg_conn.cursor() as cur:
cur.execute("CREATE TABLE foo1(id int primary key)")
cur.execute("CREATE TABLE foo2(id int)")
cur.execute("SELECT event_trigger.get_schema_version()")
res = cast("tuple[int]", cur.fetchone())
ver = res[0]
log.info(f"schema version is now {ver}")
assert ver == 2, "schema version is not 2"

View File

@@ -1,6 +1,7 @@
import random
import threading
from enum import StrEnum
from time import sleep
from typing import Any
import pytest
@@ -24,18 +25,7 @@ OFFLOAD_LABEL = "compute_ctl_lfc_offloads_total"
OFFLOAD_ERR_LABEL = "compute_ctl_lfc_offload_errors_total"
METHOD_VALUES = [e for e in PrewarmMethod]
METHOD_IDS = [e.value for e in PrewarmMethod]
def check_pinned_entries(cur: Cursor):
"""
Wait till none of LFC buffers are pinned
"""
def none_pinned():
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_chunks_pinned'")
assert cur.fetchall()[0][0] == 0
wait_until(none_pinned)
AUTOOFFLOAD_INTERVAL_SECS = 2
def prom_parse(client: EndpointHttpClient) -> dict[str, float]:
@@ -49,9 +39,18 @@ def prom_parse(client: EndpointHttpClient) -> dict[str, float]:
def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor) -> Any:
if method == PrewarmMethod.POSTGRES:
cur.execute("select get_local_cache_state()")
return cur.fetchall()[0][0]
if method == PrewarmMethod.AUTOPREWARM:
# With autoprewarm, we need to be sure LFC was offloaded after all writes
# finish, so we sleep. Otherwise we'll have less prewarmed pages than we want
sleep(AUTOOFFLOAD_INTERVAL_SECS)
client.offload_lfc_wait()
elif method == PrewarmMethod.COMPUTE_CTL:
return
if method == PrewarmMethod.COMPUTE_CTL:
status = client.prewarm_lfc_status()
assert status["status"] == "not_prewarmed"
assert "error" not in status
@@ -60,11 +59,9 @@ def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor)
parsed = prom_parse(client)
desired = {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0, OFFLOAD_ERR_LABEL: 0, PREWARM_ERR_LABEL: 0}
assert parsed == desired, f"{parsed=} != {desired=}"
elif method == PrewarmMethod.POSTGRES:
cur.execute("select get_local_cache_state()")
return cur.fetchall()[0][0]
else:
raise AssertionError(f"{method} not in PrewarmMethod")
return
raise AssertionError(f"{method} not in PrewarmMethod")
def prewarm_endpoint(
@@ -106,14 +103,13 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
"neon.file_cache_size_limit=1GB",
"neon.file_cache_prewarm_limit=1000",
]
offload_secs = 2
if method == PrewarmMethod.AUTOPREWARM:
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=cfg,
autoprewarm=True,
offload_lfc_interval_seconds=offload_secs,
offload_lfc_interval_seconds=AUTOOFFLOAD_INTERVAL_SECS,
)
else:
endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg)
@@ -135,7 +131,7 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
endpoint.stop()
if method == PrewarmMethod.AUTOPREWARM:
endpoint.start(autoprewarm=True, offload_lfc_interval_seconds=offload_secs)
endpoint.start(autoprewarm=True, offload_lfc_interval_seconds=AUTOOFFLOAD_INTERVAL_SECS)
else:
endpoint.start()
@@ -162,7 +158,6 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
lfc_cur.execute("select sum(pk) from t")
assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
check_pinned_entries(pg_cur)
desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped}
check_prewarmed(method, client, desired)
@@ -243,9 +238,9 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet
prewarm_thread.start()
def prewarmed():
assert n_prewarms > 5
assert n_prewarms > 3
wait_until(prewarmed)
wait_until(prewarmed, timeout=40) # debug builds don't finish in 20s
running = False
for t in workload_threads:
@@ -256,7 +251,6 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet
total_balance = lfc_cur.fetchall()[0][0]
assert total_balance == 0
check_pinned_entries(pg_cur)
if method == PrewarmMethod.POSTGRES:
return
desired = {