Merge branch 'main' into erik/history-size-consumption-metric

This commit is contained in:
Erik Grinaker
2025-05-20 13:05:15 +02:00
174 changed files with 7581 additions and 1793 deletions

View File

@@ -0,0 +1,59 @@
"""
Script to creates a stack of L0 deltas each of which should have 1 Value::Delta per page in `data`,
in your running neon_local setup.
Use this bash setup to reset your neon_local environment.
The last line of this bash snippet will run this file here.
```
export NEON_REPO_DIR=$PWD/.neon
export NEON_BIN_DIR=$PWD/target/release
$NEON_BIN_DIR/neon_local stop
rm -rf $NEON_REPO_DIR
$NEON_BIN_DIR/neon_local init
cat >> $NEON_REPO_DIR/pageserver_1/pageserver.toml <<"EOF"
# customizations
virtual_file_io_mode = "direct-rw"
page_service_pipelining={mode="pipelined", max_batch_size=32, execution="concurrent-futures"}
get_vectored_concurrent_io={mode="sidecar-task"}
EOF
$NEON_BIN_DIR/neon_local start
psql 'postgresql://localhost:1235/storage_controller' -c 'DELETE FROM tenant_shards'
sed 's/.*get_vectored_concurrent_io.*/get_vectored_concurrent_io={mode="sidecar-task"}/' -i $NEON_REPO_DIR/pageserver_1/pageserver.toml
$NEON_BIN_DIR/neon_local pageserver restart
sleep 2
$NEON_BIN_DIR/neon_local tenant create --set-default
./target/debug/neon_local endpoint stop foo
rm -rf $NEON_REPO_DIR/endpoints/foo
./target/debug/neon_local endpoint create foo
echo 'full_page_writes=off' >> $NEON_REPO_DIR/endpoints/foo/postgresql.conf
./target/debug/neon_local endpoint start foo
pushd test_runner; poetry run python3 -m bin.neon_local_create_deep_l0_stack 10; popd
```
"""
import sys
import psycopg2
from fixtures.common_types import TenantShardId, TimelineId
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.makelayers.l0stack import L0StackShape, make_l0_stack_standalone
ps_http = PageserverHttpClient(port=9898, is_testing_enabled_or_skip=lambda: None)
vps_http = PageserverHttpClient(port=1234, is_testing_enabled_or_skip=lambda: None)
tenants = ps_http.tenant_list()
assert len(tenants) == 1
tenant_shard_id = TenantShardId.parse(tenants[0]["id"])
timlines = ps_http.timeline_list(tenant_shard_id)
assert len(timlines) == 1
timeline_id = TimelineId(timlines[0]["timeline_id"])
connstr = "postgresql://cloud_admin@localhost:55432/postgres"
conn = psycopg2.connect(connstr)
shape = L0StackShape(logical_table_size_mib=50, delta_stack_height=int(sys.argv[1]))
make_l0_stack_standalone(vps_http, ps_http, tenant_shard_id, timeline_id, conn, shape)

View File

@@ -103,7 +103,7 @@ class AbstractNeonCli:
else:
stdout = ""
log.warn(f"CLI timeout: stderr={stderr}, stdout={stdout}")
log.warning(f"CLI timeout: stderr={stderr}, stdout={stdout}")
raise
indent = " "
@@ -557,7 +557,7 @@ class NeonLocalCli(AbstractNeonCli):
endpoint_id: str,
safekeepers_generation: int | None = None,
safekeepers: list[int] | None = None,
remote_ext_config: str | None = None,
remote_ext_base_url: str | None = None,
pageserver_id: int | None = None,
allow_multiple: bool = False,
create_test_user: bool = False,
@@ -572,8 +572,8 @@ class NeonLocalCli(AbstractNeonCli):
extra_env_vars = env or {}
if basebackup_request_tries is not None:
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries)
if remote_ext_config is not None:
args.extend(["--remote-ext-config", remote_ext_config])
if remote_ext_base_url is not None:
args.extend(["--remote-ext-base-url", remote_ext_base_url])
if safekeepers_generation is not None:
args.extend(["--safekeepers-generation", str(safekeepers_generation)])

View File

@@ -1255,6 +1255,12 @@ class NeonEnv:
"no_sync": True,
# Look for gaps in WAL received from safekeepeers
"validate_wal_contiguity": True,
# TODO(vlad): make these configurable through the builder
"timeline_import_config": {
"import_job_concurrency": 4,
"import_job_soft_size_limit": 512 * 1024,
"import_job_checkpoint_threshold": 4,
},
}
# Batching (https://github.com/neondatabase/neon/issues/9377):
@@ -1274,6 +1280,8 @@ class NeonEnv:
if self.pageserver_virtual_file_io_engine is not None:
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
if self.pageserver_virtual_file_io_mode is not None:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
if config.pageserver_default_tenant_config_compaction_algorithm is not None:
tenant_config = ps_cfg.setdefault("tenant_config", {})
tenant_config["compaction_algorithm"] = (
@@ -1299,13 +1307,6 @@ class NeonEnv:
for key, value in override.items():
ps_cfg[key] = value
if self.pageserver_virtual_file_io_mode is not None:
# TODO(christian): https://github.com/neondatabase/neon/issues/11598
if not config.test_may_use_compatibility_snapshot_binaries:
ps_cfg["virtual_file_io_mode"] = self.pageserver_virtual_file_io_mode
else:
log.info("ignoring virtual_file_io_mode parametrization for compatibility test")
if self.pageserver_wal_receiver_protocol is not None:
key, value = PageserverWalReceiverProtocol.to_config_key_value(
self.pageserver_wal_receiver_protocol
@@ -1376,7 +1377,11 @@ class NeonEnv:
force=config.config_init_force,
)
def start(self, timeout_in_seconds: int | None = None):
def start(
self,
timeout_in_seconds: int | None = None,
extra_ps_env_vars: dict[str, str] | None = None,
):
# Storage controller starts first, so that pageserver /re-attach calls don't
# bounce through retries on startup
self.storage_controller.start(timeout_in_seconds=timeout_in_seconds)
@@ -1395,7 +1400,10 @@ class NeonEnv:
for pageserver in self.pageservers:
futs.append(
executor.submit(
lambda ps=pageserver: ps.start(timeout_in_seconds=timeout_in_seconds) # type: ignore[misc]
lambda ps=pageserver: ps.start( # type: ignore[misc]
extra_env_vars=extra_ps_env_vars or {},
timeout_in_seconds=timeout_in_seconds,
),
)
)
@@ -1409,30 +1417,6 @@ class NeonEnv:
for f in futs:
f.result()
# Last step: register safekeepers at the storage controller
if (
self.storage_controller_config is not None
and self.storage_controller_config.get("timelines_onto_safekeepers") is True
):
for sk_id, sk in enumerate(self.safekeepers):
# 0 is an invalid safekeeper id
sk_id = sk_id + 1
body = {
"id": sk_id,
"created_at": "2023-10-25T09:11:25Z",
"updated_at": "2024-08-28T11:32:43Z",
"region_id": "aws-us-east-2",
"host": "127.0.0.1",
"port": sk.port.pg,
"http_port": sk.port.http,
"https_port": None,
"version": 5957,
"availability_zone_id": f"us-east-2b-{sk_id}",
}
self.storage_controller.on_safekeeper_deploy(sk_id, body)
self.storage_controller.safekeeper_scheduling_policy(sk_id, "Active")
self.endpoint_storage.start(timeout_in_seconds=timeout_in_seconds)
def stop(self, immediate=False, ps_assert_metric_no_errors=False, fail_on_endpoint_errors=True):
@@ -3636,6 +3620,8 @@ class NeonProxy(PgProtocol):
http_port: int,
mgmt_port: int,
external_http_port: int,
router_port: int,
router_tls_port: int,
auth_backend: NeonProxy.AuthBackend,
metric_collection_endpoint: str | None = None,
metric_collection_interval: str | None = None,
@@ -3652,6 +3638,8 @@ class NeonProxy(PgProtocol):
self.test_output_dir = test_output_dir
self.proxy_port = proxy_port
self.mgmt_port = mgmt_port
self.router_port = router_port
self.router_tls_port = router_tls_port
self.auth_backend = auth_backend
self.metric_collection_endpoint = metric_collection_endpoint
self.metric_collection_interval = metric_collection_interval
@@ -3666,6 +3654,14 @@ class NeonProxy(PgProtocol):
key_path = self.test_output_dir / "proxy.key"
generate_proxy_tls_certs("*.local.neon.build", key_path, crt_path)
# generate key for pg-sni-router.
# endpoint.namespace.local.neon.build resolves to 127.0.0.1
generate_proxy_tls_certs(
"endpoint.namespace.local.neon.build",
self.test_output_dir / "router.key",
self.test_output_dir / "router.crt",
)
args = [
str(self.neon_binpath / "proxy"),
*["--http", f"{self.host}:{self.http_port}"],
@@ -3675,6 +3671,11 @@ class NeonProxy(PgProtocol):
*["--sql-over-http-timeout", f"{self.http_timeout_seconds}s"],
*["-c", str(crt_path)],
*["-k", str(key_path)],
*["--sni-router-listen", f"{self.host}:{self.router_port}"],
*["--sni-router-listen-tls", f"{self.host}:{self.router_tls_port}"],
*["--sni-router-tls-cert", str(self.test_output_dir / "router.crt")],
*["--sni-router-tls-key", str(self.test_output_dir / "router.key")],
*["--sni-router-destination", "local.neon.build"],
*self.auth_backend.extra_args(),
]
@@ -3866,7 +3867,7 @@ class NeonAuthBroker:
external_http_port: int,
auth_backend: NeonAuthBroker.ProxyV1,
):
self.domain = "apiauth.local.neon.build" # resolves to 127.0.0.1
self.domain = "local.neon.build" # resolves to 127.0.0.1
self.host = "127.0.0.1"
self.http_port = http_port
self.external_http_port = external_http_port
@@ -3883,7 +3884,7 @@ class NeonAuthBroker:
# generate key of it doesn't exist
crt_path = self.test_output_dir / "proxy.crt"
key_path = self.test_output_dir / "proxy.key"
generate_proxy_tls_certs("apiauth.local.neon.build", key_path, crt_path)
generate_proxy_tls_certs(f"apiauth.{self.domain}", key_path, crt_path)
args = [
str(self.neon_binpath / "proxy"),
@@ -3927,10 +3928,10 @@ class NeonAuthBroker:
log.info(f"Executing http query: {query}")
connstr = f"postgresql://{user}@{self.domain}/postgres"
connstr = f"postgresql://{user}@ep-foo-bar-1234.{self.domain}/postgres"
async with httpx.AsyncClient(verify=str(self.test_output_dir / "proxy.crt")) as client:
response = await client.post(
f"https://{self.domain}:{self.external_http_port}/sql",
f"https://apiauth.{self.domain}:{self.external_http_port}/sql",
json={"query": query, "params": args},
headers={
"Neon-Connection-String": connstr,
@@ -3974,6 +3975,8 @@ def link_proxy(
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
router_port = port_distributor.get_port()
router_tls_port = port_distributor.get_port()
with NeonProxy(
neon_binpath=neon_binpath,
@@ -3981,6 +3984,8 @@ def link_proxy(
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
router_port=router_port,
router_tls_port=router_tls_port,
external_http_port=external_http_port,
auth_backend=NeonProxy.Link(),
) as proxy:
@@ -4014,6 +4019,8 @@ def static_proxy(
mgmt_port = port_distributor.get_port()
http_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
router_port = port_distributor.get_port()
router_tls_port = port_distributor.get_port()
with NeonProxy(
neon_binpath=neon_binpath,
@@ -4021,6 +4028,8 @@ def static_proxy(
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
router_port=router_port,
router_tls_port=router_tls_port,
external_http_port=external_http_port,
auth_backend=NeonProxy.Postgres(auth_endpoint),
) as proxy:
@@ -4226,7 +4235,7 @@ class Endpoint(PgProtocol, LogUtils):
def start(
self,
remote_ext_config: str | None = None,
remote_ext_base_url: str | None = None,
pageserver_id: int | None = None,
safekeeper_generation: int | None = None,
safekeepers: list[int] | None = None,
@@ -4252,7 +4261,7 @@ class Endpoint(PgProtocol, LogUtils):
self.endpoint_id,
safekeepers_generation=safekeeper_generation,
safekeepers=self.active_safekeepers,
remote_ext_config=remote_ext_config,
remote_ext_base_url=remote_ext_base_url,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
create_test_user=create_test_user,
@@ -4467,7 +4476,7 @@ class Endpoint(PgProtocol, LogUtils):
hot_standby: bool = False,
lsn: Lsn | None = None,
config_lines: list[str] | None = None,
remote_ext_config: str | None = None,
remote_ext_base_url: str | None = None,
pageserver_id: int | None = None,
allow_multiple: bool = False,
basebackup_request_tries: int | None = None,
@@ -4486,7 +4495,7 @@ class Endpoint(PgProtocol, LogUtils):
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
).start(
remote_ext_config=remote_ext_config,
remote_ext_base_url=remote_ext_base_url,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
basebackup_request_tries=basebackup_request_tries,
@@ -4570,7 +4579,7 @@ class EndpointFactory:
lsn: Lsn | None = None,
hot_standby: bool = False,
config_lines: list[str] | None = None,
remote_ext_config: str | None = None,
remote_ext_base_url: str | None = None,
pageserver_id: int | None = None,
basebackup_request_tries: int | None = None,
) -> Endpoint:
@@ -4590,7 +4599,7 @@ class EndpointFactory:
hot_standby=hot_standby,
config_lines=config_lines,
lsn=lsn,
remote_ext_config=remote_ext_config,
remote_ext_base_url=remote_ext_base_url,
pageserver_id=pageserver_id,
basebackup_request_tries=basebackup_request_tries,
)
@@ -4644,7 +4653,10 @@ class EndpointFactory:
return self
def new_replica(
self, origin: Endpoint, endpoint_id: str, config_lines: list[str] | None = None
self,
origin: Endpoint,
endpoint_id: str | None = None,
config_lines: list[str] | None = None,
):
branch_name = origin.branch_name
assert origin in self.endpoints
@@ -4660,7 +4672,10 @@ class EndpointFactory:
)
def new_replica_start(
self, origin: Endpoint, endpoint_id: str, config_lines: list[str] | None = None
self,
origin: Endpoint,
endpoint_id: str | None = None,
config_lines: list[str] | None = None,
):
branch_name = origin.branch_name
assert origin in self.endpoints
@@ -5477,6 +5492,13 @@ def wait_for_last_flush_lsn(
if last_flush_lsn is None:
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
# The last_flush_lsn may not correspond to a record boundary.
# For example, if the compute flushed WAL on a page boundary,
# the remaining part of the record might not be flushed for a long time.
# This would prevent the pageserver from reaching last_flush_lsn promptly.
# To ensure the rest of the record reaches the pageserver quickly,
# we forcibly flush the WAL by using CHECKPOINT.
endpoint.safe_psql("CHECKPOINT")
results = []
for tenant_shard_id, pageserver in shards:

View File

@@ -111,6 +111,13 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
".*stalling layer flushes for compaction backpressure.*",
".*layer roll waiting for flush due to compaction backpressure.*",
".*BatchSpanProcessor.*",
*(
[
r".*your platform is not a supported production platform, ignoing request for O_DIRECT; this could hide alignment bugs.*"
]
if sys.platform != "linux"
else []
),
)
@@ -122,6 +129,10 @@ DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS = [
".*Call to node.*management API.*failed.*Timeout.*",
".*Failed to update node .+ after heartbeat round.*error sending request for url.*",
".*background_reconcile: failed to fetch top tenants:.*client error \\(Connect\\).*",
# Many tests will take safekeepers offline
".*Call to safekeeper.*management API.*failed.*receive body.*",
".*Call to safekeeper.*management API.*failed.*ReceiveBody.*",
".*Call to safekeeper.*management API.*failed.*Timeout.*",
# Many tests will start up with a node offline
".*startup_reconcile: Could not scan node.*",
# Tests run in dev mode

View File

@@ -0,0 +1,148 @@
from dataclasses import dataclass
from psycopg2.extensions import connection as PgConnection
from fixtures.common_types import Lsn, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn
@dataclass
class L0StackShape:
logical_table_size_mib: int = 50
delta_stack_height: int = 20
def make_l0_stack(endpoint: Endpoint, shape: L0StackShape):
"""
Creates stack of L0 deltas each of which should have 1 Value::Delta per page in table `data`.
"""
env = endpoint.env
# TDOO: wait for storcon to finish any reonciles before jumping to action here?
description = env.storage_controller.tenant_describe(endpoint.tenant_id)
shards = description["shards"]
assert len(shards) == 1, "does not support sharding"
tenant_shard_id = TenantShardId.parse(shards[0]["tenant_shard_id"])
endpoint.config(["full_page_writes=off"])
endpoint.reconfigure()
ps = env.get_pageserver(shards[0]["node_attached"])
timeline_id = endpoint.show_timeline_id()
vps_http = env.storage_controller.pageserver_api()
ps_http = ps.http_client()
endpoint_conn = endpoint.connect()
make_l0_stack_standalone(vps_http, ps_http, tenant_shard_id, timeline_id, endpoint_conn, shape)
def make_l0_stack_standalone(
vps_http: PageserverHttpClient,
ps_http: PageserverHttpClient,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
endpoint_conn: PgConnection,
shape: L0StackShape,
):
"""
See make_l0_stack for details.
This function is a standalone version of make_l0_stack, usable from not-test code.
"""
assert not tenant_shard_id.shard_index.is_sharded, (
"the current implementation only supports unsharded tenants"
)
tenant_id = tenant_shard_id.tenant_id
conn = endpoint_conn
desired_size = shape.logical_table_size_mib * 1024 * 1024
config = {
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "1h", # doesn't matter, but 0 value will kill walredo every 10s
"compaction_threshold": 100000, # we just want L0s
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 100000, # we just want L0s
}
vps_http.set_tenant_config(tenant_id, config)
conn.autocommit = True
cur = conn.cursor()
# Ensure full_page_writes are disabled so that all Value::Delta in
# pageserver are !will_init, and therefore a getpage needs to read
# the entire delta stack.
cur.execute("SHOW full_page_writes")
assert cur.fetchall()[0][0] == "off", "full_page_writes should be off"
# each tuple is 23 (header) + 100 bytes = 123 bytes
# page header si 24 bytes
# 8k page size
# (8k-24bytes) / 123 bytes = 63 tuples per page
# set fillfactor to 10 to have 6 tuples per page
cur.execute("DROP TABLE IF EXISTS data")
cur.execute("CREATE TABLE data(id bigint, row char(92)) with (fillfactor=10)")
need_pages = desired_size // 8192
need_rows = need_pages * 6
log.info(f"Need {need_pages} pages, {need_rows} rows")
cur.execute(f"INSERT INTO data SELECT i,'row'||i FROM generate_series(1, {need_rows}) as i")
# Raise fillfactor to 100% so that all updates are HOT updates.
# We assert they're hot updates by checking fetch_id_to_page_mapping remains the same.
cur.execute("ALTER TABLE data SET (fillfactor=100)")
def settle_and_flush():
cur.execute("SELECT pg_current_wal_flush_lsn()")
flush_lsn = Lsn(cur.fetchall()[0][0])
wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, flush_lsn)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# create an L0 for the initial data we just inserted
settle_and_flush()
# assert we wrote what we think we wrote
cur.execute("""
with ntuples_per_page as (
select (ctid::text::point)[0]::bigint pageno,count(*) ntuples from data group by pageno
)
select ntuples, count(*) npages from ntuples_per_page group by ntuples order by ntuples;
""")
rows = cur.fetchall()
log.info(f"initial table layout: {rows}")
assert len(rows) == 1
assert rows[0][0] == 6, f"expected 6 tuples per page, got {rows[0][0]}"
assert rows[0][1] == need_pages, f"expected {need_pages} pages, got {rows[0][1]}"
def fetch_id_to_page_mapping():
cur.execute("""
SELECT id,(ctid::text::point)[0]::bigint pageno FROM data ORDER BY id
""")
return cur.fetchall()
initial_mapping = fetch_id_to_page_mapping()
# every iteration updates one tuple in each page
delta_stack_height = shape.delta_stack_height
for i in range(0, delta_stack_height):
log.info(i)
cur.execute(f"UPDATE data set row = row||',u' where id % 6 = {i % 6}")
log.info(f"modified rows: {cur.rowcount}")
assert cur.rowcount == need_pages
settle_and_flush()
post_update_mapping = fetch_id_to_page_mapping()
assert initial_mapping == post_update_mapping, "Postgres should be doing HOT updates"
# Assert the layer count is what we expect it is
layer_map = vps_http.layer_map_info(tenant_id, timeline_id)
assert (
len(layer_map.delta_l0_layers()) == delta_stack_height + 1 + 1
) # +1 for the initdb layer + 1 for the table creation & fill
assert len(layer_map.delta_l0_layers()) == len(layer_map.delta_layers()) # it's all L0s
assert len(layer_map.image_layers()) == 0 # no images

View File

@@ -15,7 +15,8 @@ Some handy pytest flags for local development:
- `-k` selects a test to run
- `--timeout=0` disables our default timeout of 300s (see `setup.cfg`)
- `--preserve-database-files` to skip cleanup
- `--out-dir` to produce a JSON with the recorded test metrics
- `--out-dir` to produce a JSON with the recorded test metrics.
There is a post-processing tool at `test_runner/performance/out_dir_to_csv.py`.
# What performance tests do we have and how we run them

View File

@@ -0,0 +1,57 @@
# Tool to convert the JSON output from running a perf test with `--out-dir` to a CSV that
# can be easily pasted into a spreadsheet for quick viz & analysis.
# Check the `./README.md` in this directory for `--out-dir`.
#
# TODO: add the pytest.mark.parametrize to the json and make them columns here
# https://github.com/neondatabase/neon/issues/11878
import csv
import json
import os
import sys
def json_to_csv(json_file):
with open(json_file) as f:
data = json.load(f)
# Collect all possible metric names to form headers
all_metrics = set()
for result in data.get("result", []):
for metric in result.get("data", []):
all_metrics.add(metric["name"])
# Sort metrics for consistent output
metrics = sorted(list(all_metrics))
# Create headers
headers = ["suit"] + metrics
# Prepare rows
rows = []
for result in data.get("result", []):
row = {"suit": result["suit"]}
# Initialize all metrics to empty
for metric in metrics:
row[metric] = ""
# Fill in available metrics
for item in result.get("data", []):
row[item["name"]] = item["value"]
rows.append(row)
# Write to stdout as CSV
writer = csv.DictWriter(sys.stdout, fieldnames=headers)
writer.writeheader()
writer.writerows(rows)
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"Usage: python {os.path.basename(__file__)} <json_file>")
sys.exit(1)
json_file = sys.argv[1]
json_to_csv(json_file)

View File

@@ -10,7 +10,8 @@ from typing import Any
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin
from fixtures.pageserver.makelayers import l0stack
from fixtures.utils import humantime_to_ms
TARGET_RUNTIME = 30
@@ -34,28 +35,18 @@ class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig):
mode: str = "pipelined"
EXECUTION = ["concurrent-futures"]
BATCHING = ["uniform-lsn", "scattered-lsn"]
NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
for batching in BATCHING:
NON_BATCHABLE.append(
PageServicePipeliningConfigPipelined(max_batch_size, execution, batching)
)
BATCHABLE: list[PageServicePipeliningConfig] = []
PS_IO_CONCURRENCY = ["sidecar-task"]
PIPELINING_CONFIGS: list[PageServicePipeliningConfig] = []
for max_batch_size in [32]:
for execution in EXECUTION:
for batching in BATCHING:
BATCHABLE.append(
for execution in ["concurrent-futures"]:
for batching in ["scattered-lsn"]:
PIPELINING_CONFIGS.append(
PageServicePipeliningConfigPipelined(max_batch_size, execution, batching)
)
@pytest.mark.parametrize(
"tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
"tablesize_mib, pipelining_config, target_runtime, ps_io_concurrency, effective_io_concurrency, readhead_buffer_size, name",
[
# batchable workloads should show throughput and CPU efficiency improvements
*[
@@ -63,20 +54,23 @@ for max_batch_size in [32]:
50,
config,
TARGET_RUNTIME,
ps_io_concurrency,
100,
128,
f"batchable {dataclasses.asdict(config)}",
)
for config in BATCHABLE
for config in PIPELINING_CONFIGS
for ps_io_concurrency in PS_IO_CONCURRENCY
],
],
)
def test_throughput(
def test_postgres_seqscan(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
tablesize_mib: int,
pipelining_config: PageServicePipeliningConfig,
target_runtime: int,
ps_io_concurrency: str,
effective_io_concurrency: int,
readhead_buffer_size: int,
name: str,
@@ -97,6 +91,10 @@ def test_throughput(
If the compute provides pipeline depth (effective_io_concurrency=100), then
pipelining configs, especially with max_batch_size>1 should yield dramatic improvements
in all performance metrics.
We advance the LSN from a disruptor thread to simulate the effect of a workload with concurrent writes
in another table. The `scattered-lsn` batching mode handles this well whereas the
initial implementatin (`uniform-lsn`) would break the batch.
"""
#
@@ -114,7 +112,19 @@ def test_throughput(
}
)
# For storing configuration as a metric, insert a fake 0 with labels with actual data
params.update({"pipelining_config": (0, {"labels": dataclasses.asdict(pipelining_config)})})
params.update(
{
"config": (
0,
{
"labels": {
"pipelining_config": dataclasses.asdict(pipelining_config),
"ps_io_concurrency": ps_io_concurrency,
}
},
)
}
)
log.info("params: %s", params)
@@ -266,7 +276,10 @@ def test_throughput(
return iters
env.pageserver.patch_config_toml_nonrecursive(
{"page_service_pipelining": dataclasses.asdict(pipelining_config)}
{
"page_service_pipelining": dataclasses.asdict(pipelining_config),
"get_vectored_concurrent_io": {"mode": ps_io_concurrency},
}
)
# set trace for log analysis below
@@ -318,77 +331,63 @@ def test_throughput(
)
PRECISION_CONFIGS: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
for batching in BATCHING:
PRECISION_CONFIGS.append(
PageServicePipeliningConfigPipelined(max_batch_size, execution, batching)
)
@pytest.mark.parametrize(
"pipelining_config,name",
[(config, f"{dataclasses.asdict(config)}") for config in PRECISION_CONFIGS],
"pipelining_config,ps_io_concurrency,l0_stack_height,queue_depth,name",
[
(config, ps_io_concurrency, l0_stack_height, queue_depth, f"{dataclasses.asdict(config)}")
for config in PIPELINING_CONFIGS
for ps_io_concurrency in PS_IO_CONCURRENCY
for queue_depth in [1, 2, 32]
for l0_stack_height in [0, 20]
],
)
def test_latency(
def test_random_reads(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
pipelining_config: PageServicePipeliningConfig,
ps_io_concurrency: str,
l0_stack_height: int,
queue_depth: int,
name: str,
):
"""
Measure the latency impact of pipelining in an un-batchable workloads.
An ideal implementation should not increase average or tail latencies for such workloads.
We don't have support in pagebench to create queue depth yet.
=> https://github.com/neondatabase/neon/issues/9837
Throw pagebench random getpage at latest lsn workload from a single client against pageserver.
"""
#
# Setup
#
def build_snapshot_cb(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
l0stack.make_l0_stack(
endpoint,
l0stack.L0StackShape(logical_table_size_mib=50, delta_stack_height=l0_stack_height),
)
return env
env = neon_env_builder.build_and_use_snapshot(
f"test_page_service_batching--test_pagebench-{l0_stack_height}", build_snapshot_cb
)
def patch_ps_config(ps_config):
if pipelining_config is not None:
ps_config["page_service_pipelining"] = dataclasses.asdict(pipelining_config)
ps_config["page_service_pipelining"] = dataclasses.asdict(pipelining_config)
ps_config["get_vectored_concurrent_io"] = {"mode": ps_io_concurrency}
neon_env_builder.pageserver_config_override = patch_ps_config
env.pageserver.edit_config_toml(patch_ps_config)
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
conn = endpoint.connect()
cur = conn.cursor()
env.start()
cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends
cur.execute("SET effective_io_concurrency=1")
cur.execute("CREATE EXTENSION IF NOT EXISTS neon;")
cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
log.info("Filling the table")
cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)")
tablesize = 50 * 1024 * 1024
npages = tablesize // (8 * 1024)
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
# TODO: can we force postgres to do sequential scans?
cur.close()
conn.close()
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
endpoint.stop()
lsn = env.safekeepers[0].get_commit_lsn(env.initial_tenant, env.initial_timeline)
ep = env.endpoints.create_start("main", lsn=lsn)
data_table_relnode_oid = ep.safe_psql_scalar("SELECT 'data'::regclass::oid")
ep.stop_and_destroy()
for sk in env.safekeepers:
sk.stop()
#
# Run single-threaded pagebench (TODO: dedup with other benchmark code)
#
env.pageserver.allowed_errors.append(
# https://github.com/neondatabase/neon/issues/6925
r".*query handler for.*pagestream.*failed: unexpected message: CopyFail during COPY.*"
@@ -396,6 +395,8 @@ def test_latency(
ps_http = env.pageserver.http_client()
metrics_before = ps_http.get_metrics()
cmd = [
str(env.neon_binpath / "pagebench"),
"get-page-latest-lsn",
@@ -405,6 +406,10 @@ def test_latency(
env.pageserver.connstr(password=None),
"--num-clients",
"1",
"--queue-depth",
str(queue_depth),
"--only-relnode",
str(data_table_relnode_oid),
"--runtime",
"10s",
]
@@ -413,12 +418,22 @@ def test_latency(
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")
metrics_after = ps_http.get_metrics()
with open(results_path) as f:
results = json.load(f)
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
total = results["total"]
metric = "request_count"
zenbenchmark.record(
metric,
metric_value=total[metric],
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "latency_mean"
zenbenchmark.record(
metric,
@@ -435,3 +450,17 @@ def test_latency(
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)
reads_before = metrics_before.query_one(
"pageserver_io_operations_seconds_count", filter={"operation": "read"}
)
reads_after = metrics_after.query_one(
"pageserver_io_operations_seconds_count", filter={"operation": "read"}
)
zenbenchmark.record(
"virtual_file_reads",
metric_value=reads_after.value - reads_before.value,
unit="",
report=MetricReport.LOWER_IS_BETTER,
)

View File

@@ -221,7 +221,7 @@ def test_remote_extensions(
endpoint.create_remote_extension_spec(spec)
endpoint.start(remote_ext_config=extensions_endpoint)
endpoint.start(remote_ext_base_url=extensions_endpoint)
with endpoint.connect() as conn:
with conn.cursor() as cur:
@@ -249,7 +249,7 @@ def test_remote_extensions(
# Remove the extension files to force a redownload of the extension.
extension.remove(test_output_dir, pg_version)
endpoint.start(remote_ext_config=extensions_endpoint)
endpoint.start(remote_ext_base_url=extensions_endpoint)
# Test that ALTER EXTENSION UPDATE statements also fetch remote extensions.
with endpoint.connect() as conn:

View File

@@ -24,6 +24,7 @@ from fixtures.utils import (
skip_in_debug_build,
wait_until,
)
from fixtures.workload import Workload
from mypy_boto3_kms import KMSClient
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
from mypy_boto3_s3 import S3Client
@@ -97,6 +98,10 @@ def test_pgdata_import_smoke(
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
if neon_env_builder.storage_controller_config is None:
neon_env_builder.storage_controller_config = {}
neon_env_builder.storage_controller_config["timelines_onto_safekeepers"] = True
env = neon_env_builder.init_start()
# The test needs LocalFs support, which is only built in testing mode.
@@ -125,9 +130,8 @@ def test_pgdata_import_smoke(
elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD:
target_relblock_size = (shard_count or 1) * stripe_size * 8192 * 2
elif rel_block_size == RelBlockSize.MULTIPLE_RELATION_SEGMENTS:
# Postgres uses a 1GiB segment size, fixed at compile time, so we must use >2GB of data
# to exercise multiple segments.
target_relblock_size = int(((2.333 * 1024 * 1024 * 1024) // 8192) * 8192)
segment_size = 16 * 1024 * 1024
target_relblock_size = segment_size * 8
else:
raise ValueError
@@ -286,34 +290,28 @@ def test_pgdata_import_smoke(
#
# validate that we can write
#
rw_endpoint = env.endpoints.create_start(
branch_name=import_branch_name,
endpoint_id="rw",
tenant_id=tenant_id,
config_lines=ep_config,
)
rw_endpoint.safe_psql("create table othertable(values text)")
rw_lsn = Lsn(rw_endpoint.safe_psql_scalar("select pg_current_wal_flush_lsn()"))
workload = Workload(env, tenant_id, timeline_id, branch_name=import_branch_name)
workload.init()
workload.write_rows(64)
workload.validate()
# TODO: consider using `class Workload` here
# to do compaction and whatnot?
rw_lsn = Lsn(workload.endpoint().safe_psql_scalar("select pg_current_wal_flush_lsn()"))
#
# validate that we can branch (important use case)
#
# ... at the tip
_ = env.create_branch(
child_timeline_id = env.create_branch(
new_branch_name="br-tip",
ancestor_branch_name=import_branch_name,
tenant_id=tenant_id,
ancestor_start_lsn=rw_lsn,
)
br_tip_endpoint = env.endpoints.create_start(
branch_name="br-tip", endpoint_id="br-tip-ro", tenant_id=tenant_id, config_lines=ep_config
)
validate_vanilla_equivalence(br_tip_endpoint)
br_tip_endpoint.safe_psql("select * from othertable")
child_workload = workload.branch(timeline_id=child_timeline_id, branch_name="br-tip")
child_workload.validate()
validate_vanilla_equivalence(child_workload.endpoint())
# ... at the initdb lsn
_ = env.create_branch(
@@ -330,7 +328,7 @@ def test_pgdata_import_smoke(
)
validate_vanilla_equivalence(br_initdb_endpoint)
with pytest.raises(psycopg2.errors.UndefinedTable):
br_initdb_endpoint.safe_psql("select * from othertable")
br_initdb_endpoint.safe_psql(f"select * from {workload.table}")
@run_only_on_default_postgres(reason="PG version is irrelevant here")
@@ -414,6 +412,88 @@ def test_import_completion_on_restart(
wait_until(cplane_notified)
@run_only_on_default_postgres(reason="PG version is irrelevant here")
def test_import_respects_tenant_shutdown(
neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer
):
"""
Validate that importing timelines respect the usual timeline life cycle:
1. Shut down on tenant shut-down and resumes upon re-attach
2. Deletion on timeline deletion (TODO)
"""
# Set up mock control plane HTTP server to listen for import completions
import_completion_signaled = Event()
def handler(request: Request) -> Response:
log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
# Plug the cplane mock in
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
# The import will specifiy a local filesystem path mocking remote storage
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
vanilla_pg.start()
vanilla_pg.stop()
env = neon_env_builder.init_configs()
env.start()
importbucket_path = neon_env_builder.repo_dir / "test_import_completion_bucket"
mock_import_bucket(vanilla_pg, importbucket_path)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
idempotency = ImportPgdataIdemptencyKey.random()
# Pause before sending the notification
failpoint_name = "import-timeline-pre-execute-pausable"
env.pageserver.http_client().configure_failpoints((failpoint_name, "pause"))
env.storage_controller.tenant_create(tenant_id)
env.storage_controller.timeline_create(
tenant_id,
{
"new_timeline_id": str(timeline_id),
"import_pgdata": {
"idempotency_key": str(idempotency),
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
},
},
)
def hit_failpoint():
log.info("Checking log for pattern...")
try:
assert env.pageserver.log_contains(f".*at failpoint {failpoint_name}.*")
except Exception:
log.exception("Failed to find pattern in log")
raise
wait_until(hit_failpoint)
assert not import_completion_signaled.is_set()
# Restart the pageserver while an import job is in progress.
# This clears the failpoint and we expect that the import starts up afresh
# after the restart and eventually completes.
env.pageserver.stop()
env.pageserver.start()
def cplane_notified():
assert import_completion_signaled.is_set()
wait_until(cplane_notified)
def test_fast_import_with_pageserver_ingest(
test_output_dir,
vanilla_pg: VanillaPostgres,
@@ -521,7 +601,9 @@ def test_fast_import_with_pageserver_ingest(
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
# Run fast_import
fast_import.set_aws_creds(mock_s3_server, {"RUST_LOG": "aws_config=debug,aws_sdk_kms=debug"})
fast_import.set_aws_creds(
mock_s3_server, {"RUST_LOG": "info,aws_config=debug,aws_sdk_kms=debug"}
)
pg_port = port_distributor.get_port()
fast_import.run_pgdata(pg_port=pg_port, s3prefix=f"s3://{bucket}/{key_prefix}")
@@ -641,6 +723,55 @@ def test_fast_import_binary(
assert res[0][0] == 10
def test_fast_import_event_triggers(
test_output_dir,
vanilla_pg: VanillaPostgres,
port_distributor: PortDistributor,
fast_import: FastImport,
):
vanilla_pg.start()
vanilla_pg.safe_psql("""
CREATE FUNCTION test_event_trigger_for_drops()
RETURNS event_trigger LANGUAGE plpgsql AS $$
DECLARE
obj record;
BEGIN
FOR obj IN SELECT * FROM pg_event_trigger_dropped_objects()
LOOP
RAISE NOTICE '% dropped object: % %.% %',
tg_tag,
obj.object_type,
obj.schema_name,
obj.object_name,
obj.object_identity;
END LOOP;
END
$$;
CREATE EVENT TRIGGER test_event_trigger_for_drops
ON sql_drop
EXECUTE PROCEDURE test_event_trigger_for_drops();
""")
pg_port = port_distributor.get_port()
p = fast_import.run_pgdata(pg_port=pg_port, source_connection_string=vanilla_pg.connstr())
assert p.returncode == 0
vanilla_pg.stop()
pgbin = PgBin(test_output_dir, fast_import.pg_distrib_dir, fast_import.pg_version)
with VanillaPostgres(
fast_import.workdir / "pgdata", pgbin, pg_port, False
) as new_pgdata_vanilla_pg:
new_pgdata_vanilla_pg.start()
# database name and user are hardcoded in fast_import binary, and they are different from normal vanilla postgres
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb")
res = conn.safe_psql("SELECT count(*) FROM pg_event_trigger;")
log.info(f"Result: {res}")
assert res[0][0] == 0, f"Neon does not support importing event triggers, got: {res[0][0]}"
def test_fast_import_restore_to_connstring(
test_output_dir,
vanilla_pg: VanillaPostgres,

View File

@@ -510,7 +510,7 @@ def list_elegible_layers(
except KeyError:
# Unexpected: tests should call this when pageservers are in a quiet state such that the layer map
# matches what's on disk.
log.warn(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
log.warning(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
raise
return list(c for c in candidates if is_visible(c))
@@ -636,7 +636,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
except:
# On assertion failures, log some details to help with debugging
heatmap = env.pageserver_remote_storage.heatmap_content(tenant_id)
log.warn(f"heatmap contents: {json.dumps(heatmap, indent=2)}")
log.warning(f"heatmap contents: {json.dumps(heatmap, indent=2)}")
raise
# Scrub the remote storage

View File

@@ -52,6 +52,8 @@ def proxy_with_metric_collector(
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
router_port = port_distributor.get_port()
router_tls_port = port_distributor.get_port()
(host, port) = httpserver_listen_address
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
@@ -63,6 +65,8 @@ def proxy_with_metric_collector(
proxy_port=proxy_port,
http_port=http_port,
mgmt_port=mgmt_port,
router_port=router_port,
router_tls_port=router_tls_port,
external_http_port=external_http_port,
metric_collection_endpoint=metric_collection_endpoint,
metric_collection_interval=metric_collection_interval,

View File

@@ -39,3 +39,10 @@ def test_role_grants(neon_simple_env: NeonEnv):
res = cur.fetchall()
assert res == [(1,)], "select should not succeed"
# confirm that replicas can also ensure the grants are correctly set.
replica = env.endpoints.new_replica_start(endpoint)
replica_client = replica.http_client()
replica_client.set_role_grants(
"test_role_grants", "test_role", "test_schema", ["CREATE", "USAGE"]
)

View File

@@ -6,7 +6,7 @@ from typing import TYPE_CHECKING
import backoff
from fixtures.log_helper import log
from fixtures.neon_fixtures import PgProtocol, VanillaPostgres
from fixtures.neon_fixtures import NeonProxy, PgProtocol, VanillaPostgres
if TYPE_CHECKING:
from pathlib import Path
@@ -41,6 +41,7 @@ class PgSniRouter(PgProtocol):
self,
neon_binpath: Path,
port: int,
tls_port: int,
destination: str,
tls_cert: Path,
tls_key: Path,
@@ -53,6 +54,7 @@ class PgSniRouter(PgProtocol):
self.host = host
self.neon_binpath = neon_binpath
self.port = port
self.tls_port = tls_port
self.destination = destination
self.tls_cert = tls_cert
self.tls_key = tls_key
@@ -64,6 +66,7 @@ class PgSniRouter(PgProtocol):
args = [
str(self.neon_binpath / "pg_sni_router"),
*["--listen", f"127.0.0.1:{self.port}"],
*["--listen-tls", f"127.0.0.1:{self.tls_port}"],
*["--tls-cert", str(self.tls_cert)],
*["--tls-key", str(self.tls_key)],
*["--destination", self.destination],
@@ -127,10 +130,12 @@ def test_pg_sni_router(
pg_port = vanilla_pg.default_options["port"]
router_port = port_distributor.get_port()
router_tls_port = port_distributor.get_port()
with PgSniRouter(
neon_binpath=neon_binpath,
port=router_port,
tls_port=router_tls_port,
destination="local.neon.build",
tls_cert=test_output_dir / "router.crt",
tls_key=test_output_dir / "router.key",
@@ -146,3 +151,22 @@ def test_pg_sni_router(
hostaddr="127.0.0.1",
)
assert out[0][0] == 1
def test_pg_sni_router_in_proxy(
static_proxy: NeonProxy,
vanilla_pg: VanillaPostgres,
):
# static_proxy starts this.
assert vanilla_pg.is_running()
pg_port = vanilla_pg.default_options["port"]
out = static_proxy.safe_psql(
"select 1",
dbname="postgres",
sslmode="require",
host=f"endpoint--namespace--{pg_port}.local.neon.build",
hostaddr="127.0.0.1",
port=static_proxy.router_port,
)
assert out[0][0] == 1

View File

@@ -1822,7 +1822,7 @@ def test_timeline_detach_with_aux_files_with_detach_v1(
endpoint2.safe_psql(
"SELECT pg_create_logical_replication_slot('test_slot_restore', 'pgoutput')"
)
lsn3 = wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, branch_timeline_id)
lsn3 = wait_for_last_flush_lsn(env, endpoint2, env.initial_tenant, branch_timeline_id)
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn1).keys()) == set([])
assert set(http.list_aux_files(env.initial_tenant, branch_timeline_id, lsn3).keys()) == set(
["pg_replslot/test_slot_restore/state"]
@@ -1839,7 +1839,7 @@ def test_timeline_detach_with_aux_files_with_detach_v1(
assert all_reparented == set([])
# We need to ensure all safekeeper data are ingested before checking aux files: the API does not wait for LSN.
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, branch_timeline_id)
wait_for_last_flush_lsn(env, endpoint2, env.initial_tenant, branch_timeline_id)
assert set(http.list_aux_files(env.initial_tenant, env.initial_timeline, lsn2).keys()) == set(
["pg_replslot/test_slot_parent_1/state", "pg_replslot/test_slot_parent_2/state"]
), "main branch unaffected"