Merge branch 'main' into cloud-22775-restore-to-connstring

This commit is contained in:
Gleb Novikov
2025-01-22 15:54:01 +00:00
75 changed files with 3987 additions and 1278 deletions

View File

@@ -208,6 +208,10 @@ class ShardIndex:
shard_count=int(input[2:4], 16),
)
@property
def is_sharded(self) -> bool:
return self.shard_count != 0
class TenantShardId:
def __init__(self, tenant_id: TenantId, shard_number: int, shard_count: int):

View File

@@ -126,12 +126,8 @@ PAGESERVER_GLOBAL_METRICS: tuple[str, ...] = (
"pageserver_page_cache_read_accesses_total",
"pageserver_page_cache_size_current_bytes",
"pageserver_page_cache_size_max_bytes",
"pageserver_getpage_reconstruct_seconds_bucket",
"pageserver_getpage_reconstruct_seconds_count",
"pageserver_getpage_reconstruct_seconds_sum",
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
*histogram("pageserver_smgr_query_seconds_global"),
*histogram("pageserver_getpage_get_reconstruct_data_seconds"),
*histogram("pageserver_wait_lsn_seconds"),
*histogram("pageserver_remote_operation_seconds"),
*histogram("pageserver_io_operations_seconds"),

View File

@@ -340,6 +340,10 @@ class PgProtocol:
"""
return self.safe_psql(query, log_query=log_query)[0][0]
def show_timeline_id(self) -> TimelineId:
"""SHOW neon.timeline_id"""
return TimelineId(cast("str", self.safe_psql("show neon.timeline_id")[0][0]))
class PageserverWalReceiverProtocol(StrEnum):
VANILLA = "vanilla"
@@ -397,6 +401,7 @@ class NeonEnvBuilder:
pageserver_config_override: str | Callable[[dict[str, Any]], None] | None = None,
num_safekeepers: int = 1,
num_pageservers: int = 1,
num_azs: int = 1,
# Use non-standard SK ids to check for various parsing bugs
safekeepers_id_start: int = 0,
# fsync is disabled by default to make the tests go faster
@@ -413,6 +418,7 @@ class NeonEnvBuilder:
storage_controller_port_override: int | None = None,
pageserver_virtual_file_io_mode: str | None = None,
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
pageserver_get_vectored_concurrent_io: str | None = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -428,6 +434,7 @@ class NeonEnvBuilder:
self.pageserver_config_override = pageserver_config_override
self.num_safekeepers = num_safekeepers
self.num_pageservers = num_pageservers
self.num_azs = num_azs
self.safekeepers_id_start = safekeepers_id_start
self.safekeepers_enable_fsync = safekeepers_enable_fsync
self.auth_enabled = auth_enabled
@@ -451,6 +458,9 @@ class NeonEnvBuilder:
self.storage_controller_config: dict[Any, Any] | None = None
self.pageserver_virtual_file_io_engine: str | None = pageserver_virtual_file_io_engine
self.pageserver_get_vectored_concurrent_io: str | None = (
pageserver_get_vectored_concurrent_io
)
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
pageserver_default_tenant_config_compaction_algorithm
@@ -477,6 +487,7 @@ class NeonEnvBuilder:
self.test_name = test_name
self.compatibility_neon_binpath = compatibility_neon_binpath
self.compatibility_pg_distrib_dir = compatibility_pg_distrib_dir
self.test_may_use_compatibility_snapshot_binaries = False
self.version_combination = combination
self.mixdir = self.test_output_dir / "mixdir_neon"
if self.version_combination is not None:
@@ -488,6 +499,7 @@ class NeonEnvBuilder:
), "the environment variable COMPATIBILITY_POSTGRES_DISTRIB_DIR is required when using mixed versions"
self.mixdir.mkdir(mode=0o755, exist_ok=True)
self._mix_versions()
self.test_may_use_compatibility_snapshot_binaries = True
def init_configs(self, default_remote_storage_if_missing: bool = True) -> NeonEnv:
# Cannot create more than one environment from one builder
@@ -1017,6 +1029,7 @@ class NeonEnv:
self.endpoints = EndpointFactory(self)
self.safekeepers: list[Safekeeper] = []
self.pageservers: list[NeonPageserver] = []
self.num_azs = config.num_azs
self.broker = NeonBroker(self)
self.pageserver_remote_storage = config.pageserver_remote_storage
self.safekeepers_remote_storage = config.safekeepers_remote_storage
@@ -1086,6 +1099,7 @@ class NeonEnv:
self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io
# Create the neon_local's `NeonLocalInitConf`
cfg: dict[str, Any] = {
@@ -1117,14 +1131,21 @@ class NeonEnv:
http=self.port_distributor.get_port(),
)
# Availabilty zones may also be configured manually with `NeonEnvBuilder.pageserver_config_override`
if self.num_azs > 1:
# Round-robin assignment of AZ names like us-east-2a, us-east-2b, etc.
az_prefix = DEFAULT_AZ_ID[:-1]
availability_zone = f"{az_prefix}{chr(ord('a') + (ps_id - 1) % self.num_azs)}"
else:
availability_zone = DEFAULT_AZ_ID
ps_cfg: dict[str, Any] = {
"id": ps_id,
"listen_pg_addr": f"localhost:{pageserver_port.pg}",
"listen_http_addr": f"localhost:{pageserver_port.http}",
"pg_auth_type": pg_auth_type,
"http_auth_type": http_auth_type,
# Default which can be overriden with `NeonEnvBuilder.pageserver_config_override`
"availability_zone": DEFAULT_AZ_ID,
"availability_zone": availability_zone,
# Disable pageserver disk syncs in tests: when running tests concurrently, this avoids
# the pageserver taking a long time to start up due to syncfs flushing other tests' data
"no_sync": True,
@@ -1132,12 +1153,24 @@ class NeonEnv:
# Batching (https://github.com/neondatabase/neon/issues/9377):
# enable batching by default in tests and benchmarks.
ps_cfg["page_service_pipelining"] = {
"mode": "pipelined",
"execution": "concurrent-futures",
"max_batch_size": 32,
}
# Concurrent IO (https://github.com/neondatabase/neon/issues/9378):
# enable concurrent IO by default in tests and benchmarks.
# Compat tests are exempt because old versions fail to parse the new config.
if not config.compatibility_neon_binpath:
ps_cfg["page_service_pipelining"] = {
"mode": "pipelined",
"execution": "concurrent-futures",
"max_batch_size": 32,
get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io
if config.test_may_use_compatibility_snapshot_binaries:
log.info(
"Forcing use of binary-built-in default to avoid forward-compatibility related test failures"
)
get_vectored_concurrent_io = None
if get_vectored_concurrent_io is not None:
ps_cfg["get_vectored_concurrent_io"] = {
"mode": self.pageserver_get_vectored_concurrent_io,
}
if self.pageserver_virtual_file_io_engine is not None:
@@ -1474,6 +1507,7 @@ def neon_simple_env(
pageserver_virtual_file_io_engine: str,
pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None,
pageserver_virtual_file_io_mode: str | None,
pageserver_get_vectored_concurrent_io: str | None,
) -> Iterator[NeonEnv]:
"""
Simple Neon environment, with 1 safekeeper and 1 pageserver. No authentication, no fsync.
@@ -1506,6 +1540,7 @@ def neon_simple_env(
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
pageserver_get_vectored_concurrent_io=pageserver_get_vectored_concurrent_io,
combination=combination,
) as builder:
env = builder.init_start()
@@ -1532,6 +1567,7 @@ def neon_env_builder(
pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None,
record_property: Callable[[str, object], None],
pageserver_virtual_file_io_mode: str | None,
pageserver_get_vectored_concurrent_io: str | None,
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -1574,6 +1610,7 @@ def neon_env_builder(
test_overlay_dir=test_overlay_dir,
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
pageserver_virtual_file_io_mode=pageserver_virtual_file_io_mode,
pageserver_get_vectored_concurrent_io=pageserver_get_vectored_concurrent_io,
) as builder:
yield builder
# Propogate `preserve_database_files` to make it possible to use in other fixtures,

View File

@@ -44,6 +44,11 @@ def pageserver_virtual_file_io_mode() -> str | None:
return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_MODE")
@pytest.fixture(scope="function", autouse=True)
def pageserver_get_vectored_concurrent_io() -> str | None:
return os.getenv("PAGESERVER_GET_VECTORED_CONCURRENT_IO")
def get_pageserver_default_tenant_config_compaction_algorithm() -> dict[str, Any] | None:
toml_table = os.getenv("PAGESERVER_DEFAULT_TENANT_CONFIG_COMPACTION_ALGORITHM")
if toml_table is None:

View File

@@ -176,6 +176,10 @@ def test_fully_custom_config(positive_env: NeonEnv):
"type": "interpreted",
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
},
"rel_size_v2_enabled": True,
"gc_compaction_enabled": True,
"gc_compaction_initial_threshold_kb": 1024000,
"gc_compaction_ratio_percent": 200,
}
vps_http = env.storage_controller.pageserver_api()

View File

@@ -251,6 +251,8 @@ def test_forward_compatibility(
os.environ.get("ALLOW_FORWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"
)
neon_env_builder.test_may_use_compatibility_snapshot_binaries = True
try:
neon_env_builder.num_safekeepers = 3

View File

@@ -7,9 +7,78 @@ import threading
import time
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.utils import USE_LFC, query_scalar
"""
Test whether LFC doesn't error out when the LRU is empty, but the LFC is
already at its maximum size.
If we don't handle this safely, we might allocate more hash entries than
otherwise considered safe, thus causing ERRORs in hash_search(HASH_ENTER) once
we hit lfc->used >= lfc->limit.
"""
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_local_file_cache_all_pinned(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"neon.max_file_cache_size='1MB'",
"neon.file_cache_size_limit='1MB'",
],
)
top_cur = endpoint.connect().cursor()
stop = threading.Event()
n_rows = 10000
n_threads = 5
n_updates_per_connection = 1000
top_cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
top_cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
# Start threads that will perform random UPDATEs. Each UPDATE
# increments the counter on the row, so that we can check at the
# end that the sum of all the counters match the number of updates
# performed (plus the initial 1 on each row).
#
# Furthermore, each thread will reconnect between every 1000 updates.
def run_updates(n_updates_performed_q: queue.Queue[int]):
n_updates_performed = 0
conn = endpoint.connect()
cur = conn.cursor()
while not stop.is_set():
id = random.randint(1, n_rows)
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
n_updates_performed += 1
if n_updates_performed % n_updates_per_connection == 0:
cur.close()
conn.close()
conn = endpoint.connect()
cur = conn.cursor()
n_updates_performed_q.put(n_updates_performed)
n_updates_performed_q: queue.Queue[int] = queue.Queue()
threads: list[threading.Thread] = []
for _i in range(n_threads):
thread = threading.Thread(target=run_updates, args=(n_updates_performed_q,), daemon=True)
thread.start()
threads.append(thread)
time.sleep(15)
stop.set()
n_updates_performed = 0
for thread in threads:
thread.join()
n_updates_performed += n_updates_performed_q.get()
assert query_scalar(top_cur, "SELECT SUM(n) FROM lfctest") == n_rows + n_updates_performed
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):

View File

@@ -0,0 +1,60 @@
# NB: there are benchmarks that double-serve as tests inside the `performance` directory.
import subprocess
from pathlib import Path
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
@pytest.mark.timeout(30) # test takes <20s if pageserver impl is correct
@pytest.mark.parametrize("kind", ["pageserver-stop", "tenant-detach"])
def test_slow_flush(neon_env_builder: NeonEnvBuilder, neon_binpath: Path, kind: str):
def patch_pageserver_toml(config):
config["page_service_pipelining"] = {
"mode": "pipelined",
"max_batch_size": 32,
"execution": "concurrent-futures",
}
neon_env_builder.pageserver_config_override = patch_pageserver_toml
env = neon_env_builder.init_start()
log.info("make flush appear slow")
log.info("sending requests until pageserver accepts no more")
# TODO: extract this into a helper, like subprocess_capture,
# so that we capture the stderr from the helper somewhere.
child = subprocess.Popen(
[
neon_binpath / "test_helper_slow_client_reads",
env.pageserver.connstr(),
str(env.initial_tenant),
str(env.initial_timeline),
],
bufsize=0, # unbuffered
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
assert child.stdout is not None
buf = child.stdout.read(1)
if len(buf) != 1:
raise Exception("unexpected EOF")
if buf != b"R":
raise Exception(f"unexpected data: {buf!r}")
log.info("helper reports pageserver accepts no more requests")
log.info(
"assuming pageserver connection handle is in a state where TCP has backpressured pageserver=>client response flush() into userspace"
)
if kind == "pageserver-stop":
log.info("try to shut down the pageserver cleanly")
env.pageserver.stop()
elif kind == "tenant-detach":
log.info("try to shut down the tenant")
env.pageserver.tenant_detach(env.initial_tenant)
else:
raise ValueError(f"unexpected kind: {kind}")
log.info("shutdown did not time out, test passed")

View File

@@ -2394,6 +2394,7 @@ def test_storage_controller_node_deletion(
Test that deleting a node works & properly reschedules everything that was on the node.
"""
neon_env_builder.num_pageservers = 3
neon_env_builder.num_azs = 3
env = neon_env_builder.init_configs()
env.start()
@@ -2407,6 +2408,9 @@ def test_storage_controller_node_deletion(
tid, placement_policy='{"Attached":1}', shard_count=shard_count_per_tenant
)
# Sanity check: initial creations should not leave the system in an unstable scheduling state
assert env.storage_controller.reconcile_all() == 0
victim = env.pageservers[-1]
# The procedure a human would follow is:

View File

@@ -227,7 +227,9 @@ def test_scrubber_physical_gc_ancestors(neon_env_builder: NeonEnvBuilder, shard_
new_shard_count = 4
assert shard_count is None or new_shard_count > shard_count
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=new_shard_count)
env.storage_controller.reconcile_until_idle() # Move shards to their final locations immediately
env.storage_controller.reconcile_until_idle(
timeout_secs=120
) # Move shards to their final locations immediately
# Create a timeline after split, to ensure scrubber can handle timelines that exist in child shards but not ancestors
env.storage_controller.pageserver_api().timeline_create(
@@ -269,6 +271,8 @@ def test_scrubber_physical_gc_ancestors(neon_env_builder: NeonEnvBuilder, shard_
ps.http_client().timeline_compact(
shard, timeline_id, force_image_layer_creation=True, wait_until_uploaded=True
)
# Add some WAL so that we don't gc at the latest remote consistent lsn
workload.churn_rows(1)
ps.http_client().timeline_gc(shard, timeline_id, 0)
# We will use a min_age_secs=1 threshold for deletion, let it pass

View File

@@ -194,7 +194,7 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
io_metrics = query_all_safekeepers(
"safekeeper_pg_io_bytes_total",
{
"app_name": "pageserver",
"app_name": f"pageserver-{env.pageserver.id}",
"client_az": "test_ps_az",
"dir": io_direction,
"same_az": "false",