Merge branch 'thesuhas/brc-3051' into thesuhas/brc-3082

This commit is contained in:
Suhas Thalanki
2025-07-30 10:36:04 -04:00
139 changed files with 5421 additions and 1981 deletions

View File

@@ -16,6 +16,7 @@ from typing_extensions import override
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
PgBin,
PgProtocol,
@@ -129,6 +130,10 @@ class NeonCompare(PgCompare):
# Start pg
self._pg = self.env.endpoints.create_start("main", "main", self.tenant)
@property
def endpoint(self) -> Endpoint:
return self._pg
@property
@override
def pg(self) -> PgProtocol:

View File

@@ -79,18 +79,28 @@ class EndpointHttpClient(requests.Session):
return json
def prewarm_lfc(self, from_endpoint_id: str | None = None):
"""
Prewarm LFC cache from given endpoint and wait till it finishes or errors
"""
params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict()
self.post(self.prewarm_url, params=params).raise_for_status()
self.prewarm_lfc_wait()
def prewarm_lfc_wait(self):
"""
Wait till LFC prewarm returns with error or success.
If prewarm was not requested before calling this function, it will error
"""
statuses = "failed", "completed", "skipped"
def prewarmed():
json = self.prewarm_lfc_status()
status, err = json["status"], json.get("error")
assert status in ["failed", "completed", "skipped"], f"{status}, {err=}"
assert status in statuses, f"{status}, {err=}"
wait_until(prewarmed, timeout=60)
assert self.prewarm_lfc_status()["status"] != "failed"
res = self.prewarm_lfc_status()
assert res["status"] != "failed", res
def offload_lfc_status(self) -> dict[str, str]:
res = self.get(self.offload_url)
@@ -99,17 +109,26 @@ class EndpointHttpClient(requests.Session):
return json
def offload_lfc(self):
"""
Offload LFC cache to endpoint storage and wait till offload finishes or errors
"""
self.post(self.offload_url).raise_for_status()
self.offload_lfc_wait()
def offload_lfc_wait(self):
"""
Wait till LFC offload returns with error or success.
If offload was not requested before calling this function, it will error
"""
def offloaded():
json = self.offload_lfc_status()
status, err = json["status"], json.get("error")
assert status in ["failed", "completed"], f"{status}, {err=}"
wait_until(offloaded)
assert self.offload_lfc_status()["status"] != "failed"
wait_until(offloaded, timeout=60)
res = self.offload_lfc_status()
assert res["status"] != "failed", res
def promote(self, promote_spec: dict[str, Any], disconnect: bool = False):
url = f"http://localhost:{self.external_port}/promote"

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import re
import time
from typing import TYPE_CHECKING, cast, final
@@ -13,6 +14,17 @@ if TYPE_CHECKING:
from fixtures.pg_version import PgVersion
def connstr_to_env(connstr: str) -> dict[str, str]:
# postgresql://neondb_owner:npg_kuv6Rqi1cB@ep-old-silence-w26pxsvz-pooler.us-east-2.aws.neon.build/neondb?sslmode=require&channel_binding=...'
parts = re.split(r":|@|\/|\?", connstr.removeprefix("postgresql://"))
return {
"PGUSER": parts[0],
"PGPASSWORD": parts[1],
"PGHOST": parts[2],
"PGDATABASE": parts[3],
}
def connection_parameters_to_env(params: dict[str, str]) -> dict[str, str]:
return {
"PGHOST": params["host"],

View File

@@ -587,7 +587,9 @@ class NeonLocalCli(AbstractNeonCli):
]
extra_env_vars = env or {}
if basebackup_request_tries is not None:
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_TRIES"] = str(basebackup_request_tries)
extra_env_vars["NEON_COMPUTE_TESTING_BASEBACKUP_RETRIES"] = str(
basebackup_request_tries
)
if remote_ext_base_url is not None:
args.extend(["--remote-ext-base-url", remote_ext_base_url])
@@ -623,6 +625,7 @@ class NeonLocalCli(AbstractNeonCli):
pageserver_id: int | None = None,
safekeepers: list[int] | None = None,
check_return_code=True,
timeout_sec: float | None = None,
) -> subprocess.CompletedProcess[str]:
args = ["endpoint", "reconfigure", endpoint_id]
if tenant_id is not None:
@@ -631,7 +634,7 @@ class NeonLocalCli(AbstractNeonCli):
args.extend(["--pageserver-id", str(pageserver_id)])
if safekeepers is not None:
args.extend(["--safekeepers", (",".join(map(str, safekeepers)))])
return self.raw_cli(args, check_return_code=check_return_code)
return self.raw_cli(args, check_return_code=check_return_code, timeout=timeout_sec)
def endpoint_refresh_configuration(
self,

View File

@@ -4930,15 +4930,34 @@ class Endpoint(PgProtocol, LogUtils):
def is_running(self):
return self._running._value > 0
def reconfigure(self, pageserver_id: int | None = None, safekeepers: list[int] | None = None):
def reconfigure(
self,
pageserver_id: int | None = None,
safekeepers: list[int] | None = None,
timeout_sec: float = 120,
):
assert self.endpoint_id is not None
# If `safekeepers` is not None, they are remember them as active and use
# in the following commands.
if safekeepers is not None:
self.active_safekeepers = safekeepers
self.env.neon_cli.endpoint_reconfigure(
self.endpoint_id, self.tenant_id, pageserver_id, self.active_safekeepers
)
start_time = time.time()
while True:
try:
self.env.neon_cli.endpoint_reconfigure(
self.endpoint_id,
self.tenant_id,
pageserver_id,
self.active_safekeepers,
timeout_sec=timeout_sec,
)
return
except RuntimeError as e:
if time.time() - start_time > timeout_sec:
raise e
log.warning(f"Reconfigure failed with error: {e}. Retrying...")
time.sleep(5)
def refresh_configuration(self):
assert self.endpoint_id is not None

View File

@@ -78,6 +78,9 @@ class Workload:
"""
if self._endpoint is not None:
with ENDPOINT_LOCK:
# It's important that we update config.json before issuing the reconfigure request to make sure
# that PG-initiated spec refresh doesn't mess things up by reverting to the old spec.
self._endpoint.update_pageservers_in_config()
self._endpoint.reconfigure()
def endpoint(self, pageserver_id: int | None = None) -> Endpoint:
@@ -97,10 +100,10 @@ class Workload:
self._endpoint.start(pageserver_id=pageserver_id)
self._configured_pageserver = pageserver_id
else:
if self._configured_pageserver != pageserver_id:
self._configured_pageserver = pageserver_id
self._endpoint.reconfigure(pageserver_id=pageserver_id)
self._endpoint_config = pageserver_id
# It's important that we update config.json before issuing the reconfigure request to make sure
# that PG-initiated spec refresh doesn't mess things up by reverting to the old spec.
self._endpoint.update_pageservers_in_config(pageserver_id=pageserver_id)
self._endpoint.reconfigure(pageserver_id=pageserver_id)
connstring = self._endpoint.safe_psql(
"SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'"

View File

@@ -9,9 +9,10 @@
```bash
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
export CLICKHOUSE_PASSWORD=ch_password123
docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml up -d
./scripts/pytest -m remote_cluster -k test_clickhouse
./scripts/pytest -m remote_cluster -k 'test_clickhouse[release-pg17]'
docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml down
```
@@ -21,6 +22,6 @@ docker compose -f test_runner/logical_repl/clickhouse/docker-compose.yml down
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
docker compose -f test_runner/logical_repl/debezium/docker-compose.yml up -d
./scripts/pytest -m remote_cluster -k test_debezium
./scripts/pytest -m remote_cluster -k 'test_debezium[release-pg17]'
docker compose -f test_runner/logical_repl/debezium/docker-compose.yml down
```

View File

@@ -1,9 +1,11 @@
services:
clickhouse:
image: clickhouse/clickhouse-server
image: clickhouse/clickhouse-server:25.6
user: "101:101"
container_name: clickhouse
hostname: clickhouse
environment:
- CLICKHOUSE_PASSWORD=${CLICKHOUSE_PASSWORD:-ch_password123}
ports:
- 127.0.0.1:8123:8123
- 127.0.0.1:9000:9000

View File

@@ -1,18 +1,28 @@
services:
zookeeper:
image: quay.io/debezium/zookeeper:2.7
image: quay.io/debezium/zookeeper:3.1.3.Final
ports:
- 127.0.0.1:2181:2181
- 127.0.0.1:2888:2888
- 127.0.0.1:3888:3888
kafka:
image: quay.io/debezium/kafka:2.7
image: quay.io/debezium/kafka:3.1.3.Final
depends_on: [zookeeper]
environment:
ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL://:29092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- 127.0.0.1:9092:9092
- 9092:9092
- 29092:29092
debezium:
image: quay.io/debezium/connect:2.7
image: quay.io/debezium/connect:3.1.3.Final
depends_on: [kafka]
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1

View File

@@ -53,8 +53,13 @@ def test_clickhouse(remote_pg: RemotePostgres):
cur.execute("CREATE TABLE table1 (id integer primary key, column1 varchar(10));")
cur.execute("INSERT INTO table1 (id, column1) VALUES (1, 'abc'), (2, 'def');")
conn.commit()
client = clickhouse_connect.get_client(host=clickhouse_host)
if "CLICKHOUSE_PASSWORD" not in os.environ:
raise RuntimeError("CLICKHOUSE_PASSWORD is not set")
client = clickhouse_connect.get_client(
host=clickhouse_host, password=os.environ["CLICKHOUSE_PASSWORD"]
)
client.command("SET allow_experimental_database_materialized_postgresql=1")
client.command("DROP DATABASE IF EXISTS db1_postgres")
client.command(
"CREATE DATABASE db1_postgres ENGINE = "
f"MaterializedPostgreSQL('{conn_options['host']}', "

View File

@@ -17,6 +17,7 @@ from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import RemotePostgres
from kafka import KafkaConsumer
class DebeziumAPI:
@@ -101,9 +102,13 @@ def debezium(remote_pg: RemotePostgres):
assert len(dbz.list_connectors()) == 1
from kafka import KafkaConsumer
kafka_host = "kafka" if (os.getenv("CI", "false") == "true") else "127.0.0.1"
kafka_port = 9092 if (os.getenv("CI", "false") == "true") else 29092
log.info("Connecting to Kafka: %s:%s", kafka_host, kafka_port)
consumer = KafkaConsumer(
"dbserver1.inventory.customers",
bootstrap_servers=["kafka:9092"],
bootstrap_servers=[f"{kafka_host}:{kafka_port}"],
auto_offset_reset="earliest",
enable_auto_commit=False,
)
@@ -112,7 +117,7 @@ def debezium(remote_pg: RemotePostgres):
assert resp.status_code == 204
def get_kafka_msg(consumer, ts_ms, before=None, after=None) -> None:
def get_kafka_msg(consumer: KafkaConsumer, ts_ms, before=None, after=None) -> None:
"""
Gets the message from Kafka and checks its validity
Arguments:
@@ -124,6 +129,7 @@ def get_kafka_msg(consumer, ts_ms, before=None, after=None) -> None:
after: a dictionary, if not None, the after field from the kafka message must
have the same values for the same keys
"""
log.info("Bootstrap servers: %s", consumer.config["bootstrap_servers"])
msg = consumer.poll()
assert msg, "Empty message"
for val in msg.values():

View File

@@ -0,0 +1,152 @@
#!/usr/bin/env python3
"""
Generate TPS and latency charts from BenchBase TPC-C results CSV files.
This script reads a CSV file containing BenchBase results and generates two charts:
1. TPS (requests per second) over time
2. P95 and P99 latencies over time
Both charts are combined in a single SVG file.
"""
import argparse
import sys
from pathlib import Path
import matplotlib.pyplot as plt # type: ignore[import-not-found]
import pandas as pd # type: ignore[import-untyped]
def load_results_csv(csv_file_path):
"""Load BenchBase results CSV file into a pandas DataFrame."""
try:
df = pd.read_csv(csv_file_path)
# Validate required columns exist
required_columns = [
"Time (seconds)",
"Throughput (requests/second)",
"95th Percentile Latency (millisecond)",
"99th Percentile Latency (millisecond)",
]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"Error: Missing required columns: {missing_columns}")
sys.exit(1)
return df
except FileNotFoundError:
print(f"Error: CSV file not found: {csv_file_path}")
sys.exit(1)
except pd.errors.EmptyDataError:
print(f"Error: CSV file is empty: {csv_file_path}")
sys.exit(1)
except Exception as e:
print(f"Error reading CSV file: {e}")
sys.exit(1)
def generate_charts(df, input_filename, output_svg_path, title_suffix=None):
"""Generate combined TPS and latency charts and save as SVG."""
# Get the filename without extension for chart titles
file_label = Path(input_filename).stem
# Build title ending with optional suffix
if title_suffix:
title_ending = f"{title_suffix} - {file_label}"
else:
title_ending = file_label
# Create figure with two subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# Chart 1: Time vs TPS
ax1.plot(
df["Time (seconds)"],
df["Throughput (requests/second)"],
linewidth=1,
color="blue",
alpha=0.7,
)
ax1.set_xlabel("Time (seconds)")
ax1.set_ylabel("TPS (Requests Per Second)")
ax1.set_title(f"Benchbase TPC-C Like Throughput (TPS) - {title_ending}")
ax1.grid(True, alpha=0.3)
ax1.set_xlim(0, df["Time (seconds)"].max())
# Chart 2: Time vs P95 and P99 Latencies
ax2.plot(
df["Time (seconds)"],
df["95th Percentile Latency (millisecond)"],
linewidth=1,
color="orange",
alpha=0.7,
label="Latency P95",
)
ax2.plot(
df["Time (seconds)"],
df["99th Percentile Latency (millisecond)"],
linewidth=1,
color="red",
alpha=0.7,
label="Latency P99",
)
ax2.set_xlabel("Time (seconds)")
ax2.set_ylabel("Latency (ms)")
ax2.set_title(f"Benchbase TPC-C Like Latency - {title_ending}")
ax2.grid(True, alpha=0.3)
ax2.set_xlim(0, df["Time (seconds)"].max())
ax2.legend()
plt.tight_layout()
# Save as SVG
try:
plt.savefig(output_svg_path, format="svg", dpi=300, bbox_inches="tight")
print(f"Charts saved to: {output_svg_path}")
except Exception as e:
print(f"Error saving SVG file: {e}")
sys.exit(1)
def main():
"""Main function to parse arguments and generate charts."""
parser = argparse.ArgumentParser(
description="Generate TPS and latency charts from BenchBase TPC-C results CSV"
)
parser.add_argument(
"--input-csv", type=str, required=True, help="Path to the input CSV results file"
)
parser.add_argument(
"--output-svg", type=str, required=True, help="Path for the output SVG chart file"
)
parser.add_argument(
"--title-suffix",
type=str,
required=False,
help="Optional suffix to add to chart titles (e.g., 'Warmup', 'Benchmark Phase')",
)
args = parser.parse_args()
# Validate input file exists
if not Path(args.input_csv).exists():
print(f"Error: Input CSV file does not exist: {args.input_csv}")
sys.exit(1)
# Create output directory if it doesn't exist
output_path = Path(args.output_svg)
output_path.parent.mkdir(parents=True, exist_ok=True)
# Load data and generate charts
df = load_results_csv(args.input_csv)
generate_charts(df, args.input_csv, args.output_svg, args.title_suffix)
print(f"Successfully generated charts from {len(df)} data points")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,339 @@
import argparse
import html
import math
import os
import sys
from pathlib import Path
CONFIGS_DIR = Path("../configs")
SCRIPTS_DIR = Path("../scripts")
# Constants
## TODO increase times after testing
WARMUP_TIME_SECONDS = 1200 # 20 minutes
BENCHMARK_TIME_SECONDS = 3600 # 1 hour
RAMP_STEP_TIME_SECONDS = 300 # 5 minutes
BASE_TERMINALS = 130
TERMINALS_PER_WAREHOUSE = 0.2
OPTIMAL_RATE_FACTOR = 0.7 # 70% of max rate
BATCH_SIZE = 1000
LOADER_THREADS = 4
TRANSACTION_WEIGHTS = "45,43,4,4,4" # NewOrder, Payment, OrderStatus, Delivery, StockLevel
# Ramp-up rate multipliers
RAMP_RATE_FACTORS = [1.5, 1.1, 0.9, 0.7, 0.6, 0.4, 0.6, 0.7, 0.9, 1.1]
# Templates for XML configs
WARMUP_XML = """<?xml version="1.0"?>
<parameters>
<type>POSTGRES</type>
<driver>org.postgresql.Driver</driver>
<url>jdbc:postgresql://{hostname}/neondb?sslmode=require&amp;ApplicationName=tpcc&amp;reWriteBatchedInserts=true</url>
<username>neondb_owner</username>
<password>{password}</password>
<reconnectOnConnectionFailure>true</reconnectOnConnectionFailure>
<isolation>TRANSACTION_READ_COMMITTED</isolation>
<batchsize>{batch_size}</batchsize>
<scalefactor>{warehouses}</scalefactor>
<loaderThreads>0</loaderThreads>
<terminals>{terminals}</terminals>
<works>
<work>
<time>{warmup_time}</time>
<weights>{transaction_weights}</weights>
<rate>unlimited</rate>
<arrival>POISSON</arrival>
<distribution>ZIPFIAN</distribution>
</work>
</works>
<transactiontypes>
<transactiontype><name>NewOrder</name></transactiontype>
<transactiontype><name>Payment</name></transactiontype>
<transactiontype><name>OrderStatus</name></transactiontype>
<transactiontype><name>Delivery</name></transactiontype>
<transactiontype><name>StockLevel</name></transactiontype>
</transactiontypes>
</parameters>
"""
MAX_RATE_XML = """<?xml version="1.0"?>
<parameters>
<type>POSTGRES</type>
<driver>org.postgresql.Driver</driver>
<url>jdbc:postgresql://{hostname}/neondb?sslmode=require&amp;ApplicationName=tpcc&amp;reWriteBatchedInserts=true</url>
<username>neondb_owner</username>
<password>{password}</password>
<reconnectOnConnectionFailure>true</reconnectOnConnectionFailure>
<isolation>TRANSACTION_READ_COMMITTED</isolation>
<batchsize>{batch_size}</batchsize>
<scalefactor>{warehouses}</scalefactor>
<loaderThreads>0</loaderThreads>
<terminals>{terminals}</terminals>
<works>
<work>
<time>{benchmark_time}</time>
<weights>{transaction_weights}</weights>
<rate>unlimited</rate>
<arrival>POISSON</arrival>
<distribution>ZIPFIAN</distribution>
</work>
</works>
<transactiontypes>
<transactiontype><name>NewOrder</name></transactiontype>
<transactiontype><name>Payment</name></transactiontype>
<transactiontype><name>OrderStatus</name></transactiontype>
<transactiontype><name>Delivery</name></transactiontype>
<transactiontype><name>StockLevel</name></transactiontype>
</transactiontypes>
</parameters>
"""
OPT_RATE_XML = """<?xml version="1.0"?>
<parameters>
<type>POSTGRES</type>
<driver>org.postgresql.Driver</driver>
<url>jdbc:postgresql://{hostname}/neondb?sslmode=require&amp;ApplicationName=tpcc&amp;reWriteBatchedInserts=true</url>
<username>neondb_owner</username>
<password>{password}</password>
<reconnectOnConnectionFailure>true</reconnectOnConnectionFailure>
<isolation>TRANSACTION_READ_COMMITTED</isolation>
<batchsize>{batch_size}</batchsize>
<scalefactor>{warehouses}</scalefactor>
<loaderThreads>0</loaderThreads>
<terminals>{terminals}</terminals>
<works>
<work>
<time>{benchmark_time}</time>
<rate>{opt_rate}</rate>
<weights>{transaction_weights}</weights>
<arrival>POISSON</arrival>
<distribution>ZIPFIAN</distribution>
</work>
</works>
<transactiontypes>
<transactiontype><name>NewOrder</name></transactiontype>
<transactiontype><name>Payment</name></transactiontype>
<transactiontype><name>OrderStatus</name></transactiontype>
<transactiontype><name>Delivery</name></transactiontype>
<transactiontype><name>StockLevel</name></transactiontype>
</transactiontypes>
</parameters>
"""
RAMP_UP_XML = """<?xml version="1.0"?>
<parameters>
<type>POSTGRES</type>
<driver>org.postgresql.Driver</driver>
<url>jdbc:postgresql://{hostname}/neondb?sslmode=require&amp;ApplicationName=tpcc&amp;reWriteBatchedInserts=true</url>
<username>neondb_owner</username>
<password>{password}</password>
<reconnectOnConnectionFailure>true</reconnectOnConnectionFailure>
<isolation>TRANSACTION_READ_COMMITTED</isolation>
<batchsize>{batch_size}</batchsize>
<scalefactor>{warehouses}</scalefactor>
<loaderThreads>0</loaderThreads>
<terminals>{terminals}</terminals>
<works>
{works}
</works>
<transactiontypes>
<transactiontype><name>NewOrder</name></transactiontype>
<transactiontype><name>Payment</name></transactiontype>
<transactiontype><name>OrderStatus</name></transactiontype>
<transactiontype><name>Delivery</name></transactiontype>
<transactiontype><name>StockLevel</name></transactiontype>
</transactiontypes>
</parameters>
"""
WORK_TEMPLATE = f""" <work>\n <time>{RAMP_STEP_TIME_SECONDS}</time>\n <rate>{{rate}}</rate>\n <weights>{TRANSACTION_WEIGHTS}</weights>\n <arrival>POISSON</arrival>\n <distribution>ZIPFIAN</distribution>\n </work>\n"""
# Templates for shell scripts
EXECUTE_SCRIPT = """# Create results directories
mkdir -p results_warmup
mkdir -p results_{suffix}
chmod 777 results_warmup results_{suffix}
# Run warmup phase
docker run --network=host --rm \
-v $(pwd)/configs:/configs \
-v $(pwd)/results_warmup:/results \
{docker_image}\
-b tpcc \
-c /configs/execute_{warehouses}_warehouses_warmup.xml \
-d /results \
--create=false --load=false --execute=true
# Run benchmark phase
docker run --network=host --rm \
-v $(pwd)/configs:/configs \
-v $(pwd)/results_{suffix}:/results \
{docker_image}\
-b tpcc \
-c /configs/execute_{warehouses}_warehouses_{suffix}.xml \
-d /results \
--create=false --load=false --execute=true\n"""
LOAD_XML = """<?xml version="1.0"?>
<parameters>
<type>POSTGRES</type>
<driver>org.postgresql.Driver</driver>
<url>jdbc:postgresql://{hostname}/neondb?sslmode=require&amp;ApplicationName=tpcc&amp;reWriteBatchedInserts=true</url>
<username>neondb_owner</username>
<password>{password}</password>
<reconnectOnConnectionFailure>true</reconnectOnConnectionFailure>
<isolation>TRANSACTION_READ_COMMITTED</isolation>
<batchsize>{batch_size}</batchsize>
<scalefactor>{warehouses}</scalefactor>
<loaderThreads>{loader_threads}</loaderThreads>
</parameters>
"""
LOAD_SCRIPT = """# Create results directory for loading
mkdir -p results_load
chmod 777 results_load
docker run --network=host --rm \
-v $(pwd)/configs:/configs \
-v $(pwd)/results_load:/results \
{docker_image}\
-b tpcc \
-c /configs/load_{warehouses}_warehouses.xml \
-d /results \
--create=true --load=true --execute=false\n"""
def write_file(path, content):
path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(path, "w") as f:
f.write(content)
except OSError as e:
print(f"Error writing {path}: {e}")
sys.exit(1)
# If it's a shell script, set executable permission
if str(path).endswith(".sh"):
os.chmod(path, 0o755)
def escape_xml_password(password):
"""Escape XML special characters in password."""
return html.escape(password, quote=True)
def get_docker_arch_tag(runner_arch):
"""Map GitHub Actions runner.arch to Docker image architecture tag."""
arch_mapping = {"X64": "amd64", "ARM64": "arm64"}
return arch_mapping.get(runner_arch, "amd64") # Default to amd64
def main():
parser = argparse.ArgumentParser(description="Generate BenchBase workload configs and scripts.")
parser.add_argument("--warehouses", type=int, required=True, help="Number of warehouses")
parser.add_argument("--max-rate", type=int, required=True, help="Max rate (TPS)")
parser.add_argument("--hostname", type=str, required=True, help="Database hostname")
parser.add_argument("--password", type=str, required=True, help="Database password")
parser.add_argument(
"--runner-arch", type=str, required=True, help="GitHub Actions runner architecture"
)
args = parser.parse_args()
warehouses = args.warehouses
max_rate = args.max_rate
hostname = args.hostname
password = args.password
runner_arch = args.runner_arch
# Escape password for safe XML insertion
escaped_password = escape_xml_password(password)
# Get the appropriate Docker architecture tag
docker_arch = get_docker_arch_tag(runner_arch)
docker_image = f"ghcr.io/neondatabase-labs/benchbase-postgres:latest-{docker_arch}"
opt_rate = math.ceil(max_rate * OPTIMAL_RATE_FACTOR)
# Calculate terminals as next rounded integer of 40% of warehouses
terminals = math.ceil(BASE_TERMINALS + warehouses * TERMINALS_PER_WAREHOUSE)
ramp_rates = [math.ceil(max_rate * factor) for factor in RAMP_RATE_FACTORS]
# Write configs
write_file(
CONFIGS_DIR / f"execute_{warehouses}_warehouses_warmup.xml",
WARMUP_XML.format(
warehouses=warehouses,
hostname=hostname,
password=escaped_password,
terminals=terminals,
batch_size=BATCH_SIZE,
warmup_time=WARMUP_TIME_SECONDS,
transaction_weights=TRANSACTION_WEIGHTS,
),
)
write_file(
CONFIGS_DIR / f"execute_{warehouses}_warehouses_max_rate.xml",
MAX_RATE_XML.format(
warehouses=warehouses,
hostname=hostname,
password=escaped_password,
terminals=terminals,
batch_size=BATCH_SIZE,
benchmark_time=BENCHMARK_TIME_SECONDS,
transaction_weights=TRANSACTION_WEIGHTS,
),
)
write_file(
CONFIGS_DIR / f"execute_{warehouses}_warehouses_opt_rate.xml",
OPT_RATE_XML.format(
warehouses=warehouses,
opt_rate=opt_rate,
hostname=hostname,
password=escaped_password,
terminals=terminals,
batch_size=BATCH_SIZE,
benchmark_time=BENCHMARK_TIME_SECONDS,
transaction_weights=TRANSACTION_WEIGHTS,
),
)
ramp_works = "".join([WORK_TEMPLATE.format(rate=rate) for rate in ramp_rates])
write_file(
CONFIGS_DIR / f"execute_{warehouses}_warehouses_ramp_up.xml",
RAMP_UP_XML.format(
warehouses=warehouses,
works=ramp_works,
hostname=hostname,
password=escaped_password,
terminals=terminals,
batch_size=BATCH_SIZE,
),
)
# Loader config
write_file(
CONFIGS_DIR / f"load_{warehouses}_warehouses.xml",
LOAD_XML.format(
warehouses=warehouses,
hostname=hostname,
password=escaped_password,
batch_size=BATCH_SIZE,
loader_threads=LOADER_THREADS,
),
)
# Write scripts
for suffix in ["max_rate", "opt_rate", "ramp_up"]:
script = EXECUTE_SCRIPT.format(
warehouses=warehouses, suffix=suffix, docker_image=docker_image
)
write_file(SCRIPTS_DIR / f"execute_{warehouses}_warehouses_{suffix}.sh", script)
# Loader script
write_file(
SCRIPTS_DIR / f"load_{warehouses}_warehouses.sh",
LOAD_SCRIPT.format(warehouses=warehouses, docker_image=docker_image),
)
print(f"Generated configs and scripts for {warehouses} warehouses and max rate {max_rate}.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,591 @@
#!/usr/bin/env python3
# ruff: noqa
# we exclude the file from ruff because on the github runner we have python 3.9 and ruff
# is running with newer python 3.12 which suggests changes incompatible with python 3.9
"""
Upload BenchBase TPC-C results from summary.json and results.csv files to perf_test_results database.
This script extracts metrics from BenchBase *.summary.json and *.results.csv files and uploads them
to a PostgreSQL database table for performance tracking and analysis.
"""
import argparse
import json
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
import pandas as pd # type: ignore[import-untyped]
import psycopg2
def load_summary_json(json_file_path):
"""Load summary.json file and return parsed data."""
try:
with open(json_file_path) as f:
return json.load(f)
except FileNotFoundError:
print(f"Error: Summary JSON file not found: {json_file_path}")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in file {json_file_path}: {e}")
sys.exit(1)
except Exception as e:
print(f"Error loading JSON file {json_file_path}: {e}")
sys.exit(1)
def get_metric_info(metric_name):
"""Get metric unit and report type for a given metric name."""
metrics_config = {
"Throughput": {"unit": "req/s", "report_type": "higher_is_better"},
"Goodput": {"unit": "req/s", "report_type": "higher_is_better"},
"Measured Requests": {"unit": "requests", "report_type": "higher_is_better"},
"95th Percentile Latency": {"unit": "µs", "report_type": "lower_is_better"},
"Maximum Latency": {"unit": "µs", "report_type": "lower_is_better"},
"Median Latency": {"unit": "µs", "report_type": "lower_is_better"},
"Minimum Latency": {"unit": "µs", "report_type": "lower_is_better"},
"25th Percentile Latency": {"unit": "µs", "report_type": "lower_is_better"},
"90th Percentile Latency": {"unit": "µs", "report_type": "lower_is_better"},
"99th Percentile Latency": {"unit": "µs", "report_type": "lower_is_better"},
"75th Percentile Latency": {"unit": "µs", "report_type": "lower_is_better"},
"Average Latency": {"unit": "µs", "report_type": "lower_is_better"},
}
return metrics_config.get(metric_name, {"unit": "", "report_type": "higher_is_better"})
def extract_metrics(summary_data):
"""Extract relevant metrics from summary JSON data."""
metrics = []
# Direct top-level metrics
direct_metrics = {
"Throughput (requests/second)": "Throughput",
"Goodput (requests/second)": "Goodput",
"Measured Requests": "Measured Requests",
}
for json_key, clean_name in direct_metrics.items():
if json_key in summary_data:
metrics.append((clean_name, summary_data[json_key]))
# Latency metrics from nested "Latency Distribution" object
if "Latency Distribution" in summary_data:
latency_data = summary_data["Latency Distribution"]
latency_metrics = {
"95th Percentile Latency (microseconds)": "95th Percentile Latency",
"Maximum Latency (microseconds)": "Maximum Latency",
"Median Latency (microseconds)": "Median Latency",
"Minimum Latency (microseconds)": "Minimum Latency",
"25th Percentile Latency (microseconds)": "25th Percentile Latency",
"90th Percentile Latency (microseconds)": "90th Percentile Latency",
"99th Percentile Latency (microseconds)": "99th Percentile Latency",
"75th Percentile Latency (microseconds)": "75th Percentile Latency",
"Average Latency (microseconds)": "Average Latency",
}
for json_key, clean_name in latency_metrics.items():
if json_key in latency_data:
metrics.append((clean_name, latency_data[json_key]))
return metrics
def build_labels(summary_data, project_id):
"""Build labels JSON object from summary data and project info."""
labels = {}
# Extract required label keys from summary data
label_keys = [
"DBMS Type",
"DBMS Version",
"Benchmark Type",
"Final State",
"isolation",
"scalefactor",
"terminals",
]
for key in label_keys:
if key in summary_data:
labels[key] = summary_data[key]
# Add project_id from workflow
labels["project_id"] = project_id
return labels
def build_suit_name(scalefactor, terminals, run_type, min_cu, max_cu):
"""Build the suit name according to specification."""
return f"benchbase-tpc-c-{scalefactor}-{terminals}-{run_type}-{min_cu}-{max_cu}"
def convert_timestamp_to_utc(timestamp_ms):
"""Convert millisecond timestamp to PostgreSQL-compatible UTC timestamp."""
try:
dt = datetime.fromtimestamp(timestamp_ms / 1000.0, tz=timezone.utc)
return dt.isoformat()
except (ValueError, TypeError) as e:
print(f"Warning: Could not convert timestamp {timestamp_ms}: {e}")
return datetime.now(timezone.utc).isoformat()
def insert_metrics(conn, metrics_data):
"""Insert metrics data into the perf_test_results table."""
insert_query = """
INSERT INTO perf_test_results
(suit, revision, platform, metric_name, metric_value, metric_unit,
metric_report_type, recorded_at_timestamp, labels)
VALUES (%(suit)s, %(revision)s, %(platform)s, %(metric_name)s, %(metric_value)s,
%(metric_unit)s, %(metric_report_type)s, %(recorded_at_timestamp)s, %(labels)s)
"""
try:
with conn.cursor() as cursor:
cursor.executemany(insert_query, metrics_data)
conn.commit()
print(f"Successfully inserted {len(metrics_data)} metrics into perf_test_results")
# Log some sample data for verification
if metrics_data:
print(
f"Sample metric: {metrics_data[0]['metric_name']} = {metrics_data[0]['metric_value']} {metrics_data[0]['metric_unit']}"
)
except Exception as e:
print(f"Error inserting metrics into database: {e}")
sys.exit(1)
def create_benchbase_results_details_table(conn):
"""Create benchbase_results_details table if it doesn't exist."""
create_table_query = """
CREATE TABLE IF NOT EXISTS benchbase_results_details (
id BIGSERIAL PRIMARY KEY,
suit TEXT,
revision CHAR(40),
platform TEXT,
recorded_at_timestamp TIMESTAMP WITH TIME ZONE,
requests_per_second NUMERIC,
average_latency_ms NUMERIC,
minimum_latency_ms NUMERIC,
p25_latency_ms NUMERIC,
median_latency_ms NUMERIC,
p75_latency_ms NUMERIC,
p90_latency_ms NUMERIC,
p95_latency_ms NUMERIC,
p99_latency_ms NUMERIC,
maximum_latency_ms NUMERIC
);
CREATE INDEX IF NOT EXISTS benchbase_results_details_recorded_at_timestamp_idx
ON benchbase_results_details USING BRIN (recorded_at_timestamp);
CREATE INDEX IF NOT EXISTS benchbase_results_details_suit_idx
ON benchbase_results_details USING BTREE (suit text_pattern_ops);
"""
try:
with conn.cursor() as cursor:
cursor.execute(create_table_query)
conn.commit()
print("Successfully created/verified benchbase_results_details table")
except Exception as e:
print(f"Error creating benchbase_results_details table: {e}")
sys.exit(1)
def process_csv_results(csv_file_path, start_timestamp_ms, suit, revision, platform):
"""Process CSV results and return data for database insertion."""
try:
# Read CSV file
df = pd.read_csv(csv_file_path)
# Validate required columns exist
required_columns = [
"Time (seconds)",
"Throughput (requests/second)",
"Average Latency (millisecond)",
"Minimum Latency (millisecond)",
"25th Percentile Latency (millisecond)",
"Median Latency (millisecond)",
"75th Percentile Latency (millisecond)",
"90th Percentile Latency (millisecond)",
"95th Percentile Latency (millisecond)",
"99th Percentile Latency (millisecond)",
"Maximum Latency (millisecond)",
]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"Error: Missing required columns in CSV: {missing_columns}")
return []
csv_data = []
for _, row in df.iterrows():
# Calculate timestamp: start_timestamp_ms + (time_seconds * 1000)
time_seconds = row["Time (seconds)"]
row_timestamp_ms = start_timestamp_ms + (time_seconds * 1000)
# Convert to UTC timestamp
row_timestamp = datetime.fromtimestamp(
row_timestamp_ms / 1000.0, tz=timezone.utc
).isoformat()
csv_row = {
"suit": suit,
"revision": revision,
"platform": platform,
"recorded_at_timestamp": row_timestamp,
"requests_per_second": float(row["Throughput (requests/second)"]),
"average_latency_ms": float(row["Average Latency (millisecond)"]),
"minimum_latency_ms": float(row["Minimum Latency (millisecond)"]),
"p25_latency_ms": float(row["25th Percentile Latency (millisecond)"]),
"median_latency_ms": float(row["Median Latency (millisecond)"]),
"p75_latency_ms": float(row["75th Percentile Latency (millisecond)"]),
"p90_latency_ms": float(row["90th Percentile Latency (millisecond)"]),
"p95_latency_ms": float(row["95th Percentile Latency (millisecond)"]),
"p99_latency_ms": float(row["99th Percentile Latency (millisecond)"]),
"maximum_latency_ms": float(row["Maximum Latency (millisecond)"]),
}
csv_data.append(csv_row)
print(f"Processed {len(csv_data)} rows from CSV file")
return csv_data
except FileNotFoundError:
print(f"Error: CSV file not found: {csv_file_path}")
return []
except Exception as e:
print(f"Error processing CSV file {csv_file_path}: {e}")
return []
def insert_csv_results(conn, csv_data):
"""Insert CSV results into benchbase_results_details table."""
if not csv_data:
print("No CSV data to insert")
return
insert_query = """
INSERT INTO benchbase_results_details
(suit, revision, platform, recorded_at_timestamp, requests_per_second,
average_latency_ms, minimum_latency_ms, p25_latency_ms, median_latency_ms,
p75_latency_ms, p90_latency_ms, p95_latency_ms, p99_latency_ms, maximum_latency_ms)
VALUES (%(suit)s, %(revision)s, %(platform)s, %(recorded_at_timestamp)s, %(requests_per_second)s,
%(average_latency_ms)s, %(minimum_latency_ms)s, %(p25_latency_ms)s, %(median_latency_ms)s,
%(p75_latency_ms)s, %(p90_latency_ms)s, %(p95_latency_ms)s, %(p99_latency_ms)s, %(maximum_latency_ms)s)
"""
try:
with conn.cursor() as cursor:
cursor.executemany(insert_query, csv_data)
conn.commit()
print(
f"Successfully inserted {len(csv_data)} detailed results into benchbase_results_details"
)
# Log some sample data for verification
sample = csv_data[0]
print(
f"Sample detail: {sample['requests_per_second']} req/s at {sample['recorded_at_timestamp']}"
)
except Exception as e:
print(f"Error inserting CSV results into database: {e}")
sys.exit(1)
def parse_load_log(log_file_path, scalefactor):
"""Parse load log file and extract load metrics."""
try:
with open(log_file_path) as f:
log_content = f.read()
# Regex patterns to match the timestamp lines
loading_pattern = r"\[INFO \] (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),\d{3}.*Loading data into TPCC database"
finished_pattern = r"\[INFO \] (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),\d{3}.*Finished loading data into TPCC database"
loading_match = re.search(loading_pattern, log_content)
finished_match = re.search(finished_pattern, log_content)
if not loading_match or not finished_match:
print(f"Warning: Could not find loading timestamps in log file {log_file_path}")
return None
# Parse timestamps
loading_time = datetime.strptime(loading_match.group(1), "%Y-%m-%d %H:%M:%S")
finished_time = datetime.strptime(finished_match.group(1), "%Y-%m-%d %H:%M:%S")
# Calculate duration in seconds
duration_seconds = (finished_time - loading_time).total_seconds()
# Calculate throughput: scalefactor/warehouses: 10 warehouses is approx. 1 GB of data
load_throughput = (scalefactor * 1024 / 10.0) / duration_seconds
# Convert end time to UTC timestamp for database
finished_time_utc = finished_time.replace(tzinfo=timezone.utc).isoformat()
print(f"Load metrics: Duration={duration_seconds}s, Throughput={load_throughput:.2f} MB/s")
return {
"duration_seconds": duration_seconds,
"throughput_mb_per_sec": load_throughput,
"end_timestamp": finished_time_utc,
}
except FileNotFoundError:
print(f"Warning: Load log file not found: {log_file_path}")
return None
except Exception as e:
print(f"Error parsing load log file {log_file_path}: {e}")
return None
def insert_load_metrics(conn, load_metrics, suit, revision, platform, labels_json):
"""Insert load metrics into perf_test_results table."""
if not load_metrics:
print("No load metrics to insert")
return
load_metrics_data = [
{
"suit": suit,
"revision": revision,
"platform": platform,
"metric_name": "load_duration_seconds",
"metric_value": load_metrics["duration_seconds"],
"metric_unit": "seconds",
"metric_report_type": "lower_is_better",
"recorded_at_timestamp": load_metrics["end_timestamp"],
"labels": labels_json,
},
{
"suit": suit,
"revision": revision,
"platform": platform,
"metric_name": "load_throughput",
"metric_value": load_metrics["throughput_mb_per_sec"],
"metric_unit": "MB/second",
"metric_report_type": "higher_is_better",
"recorded_at_timestamp": load_metrics["end_timestamp"],
"labels": labels_json,
},
]
insert_query = """
INSERT INTO perf_test_results
(suit, revision, platform, metric_name, metric_value, metric_unit,
metric_report_type, recorded_at_timestamp, labels)
VALUES (%(suit)s, %(revision)s, %(platform)s, %(metric_name)s, %(metric_value)s,
%(metric_unit)s, %(metric_report_type)s, %(recorded_at_timestamp)s, %(labels)s)
"""
try:
with conn.cursor() as cursor:
cursor.executemany(insert_query, load_metrics_data)
conn.commit()
print(f"Successfully inserted {len(load_metrics_data)} load metrics into perf_test_results")
except Exception as e:
print(f"Error inserting load metrics into database: {e}")
sys.exit(1)
def main():
"""Main function to parse arguments and upload results."""
parser = argparse.ArgumentParser(
description="Upload BenchBase TPC-C results to perf_test_results database"
)
parser.add_argument(
"--summary-json", type=str, required=False, help="Path to the summary.json file"
)
parser.add_argument(
"--run-type",
type=str,
required=True,
choices=["warmup", "opt-rate", "ramp-up", "load"],
help="Type of benchmark run",
)
parser.add_argument("--min-cu", type=float, required=True, help="Minimum compute units")
parser.add_argument("--max-cu", type=float, required=True, help="Maximum compute units")
parser.add_argument("--project-id", type=str, required=True, help="Neon project ID")
parser.add_argument(
"--revision", type=str, required=True, help="Git commit hash (40 characters)"
)
parser.add_argument(
"--connection-string", type=str, required=True, help="PostgreSQL connection string"
)
parser.add_argument(
"--results-csv",
type=str,
required=False,
help="Path to the results.csv file for detailed metrics upload",
)
parser.add_argument(
"--load-log",
type=str,
required=False,
help="Path to the load log file for load phase metrics",
)
parser.add_argument(
"--warehouses",
type=int,
required=False,
help="Number of warehouses (scalefactor) for load metrics calculation",
)
args = parser.parse_args()
# Validate inputs
if args.summary_json and not Path(args.summary_json).exists():
print(f"Error: Summary JSON file does not exist: {args.summary_json}")
sys.exit(1)
if not args.summary_json and not args.load_log:
print("Error: Either summary JSON or load log file must be provided")
sys.exit(1)
if len(args.revision) != 40:
print(f"Warning: Revision should be 40 characters, got {len(args.revision)}")
# Load and process summary data if provided
summary_data = None
metrics = []
if args.summary_json:
summary_data = load_summary_json(args.summary_json)
metrics = extract_metrics(summary_data)
if not metrics:
print("Warning: No metrics found in summary JSON")
# Build common data for all metrics
if summary_data:
scalefactor = summary_data.get("scalefactor", "unknown")
terminals = summary_data.get("terminals", "unknown")
labels = build_labels(summary_data, args.project_id)
else:
# For load-only processing, use warehouses argument as scalefactor
scalefactor = args.warehouses if args.warehouses else "unknown"
terminals = "unknown"
labels = {"project_id": args.project_id}
suit = build_suit_name(scalefactor, terminals, args.run_type, args.min_cu, args.max_cu)
platform = f"prod-us-east-2-{args.project_id}"
# Convert timestamp - only needed for summary metrics and CSV processing
current_timestamp_ms = None
start_timestamp_ms = None
recorded_at = None
if summary_data:
current_timestamp_ms = summary_data.get("Current Timestamp (milliseconds)")
start_timestamp_ms = summary_data.get("Start timestamp (milliseconds)")
if current_timestamp_ms:
recorded_at = convert_timestamp_to_utc(current_timestamp_ms)
else:
print("Warning: No timestamp found in JSON, using current time")
recorded_at = datetime.now(timezone.utc).isoformat()
if not start_timestamp_ms:
print("Warning: No start timestamp found in JSON, CSV upload may be incorrect")
start_timestamp_ms = (
current_timestamp_ms or datetime.now(timezone.utc).timestamp() * 1000
)
# Print Grafana dashboard link for cross-service endpoint debugging
if start_timestamp_ms and current_timestamp_ms:
grafana_url = (
f"https://neonprod.grafana.net/d/cdya0okb81zwga/cross-service-endpoint-debugging"
f"?orgId=1&from={int(start_timestamp_ms)}&to={int(current_timestamp_ms)}"
f"&timezone=utc&var-env=prod&var-input_project_id={args.project_id}"
)
print(f'Cross service endpoint dashboard for "{args.run_type}" phase: {grafana_url}')
# Prepare metrics data for database insertion (only if we have summary metrics)
metrics_data = []
if metrics and recorded_at:
for metric_name, metric_value in metrics:
metric_info = get_metric_info(metric_name)
row = {
"suit": suit,
"revision": args.revision,
"platform": platform,
"metric_name": metric_name,
"metric_value": float(metric_value), # Ensure numeric type
"metric_unit": metric_info["unit"],
"metric_report_type": metric_info["report_type"],
"recorded_at_timestamp": recorded_at,
"labels": json.dumps(labels), # Convert to JSON string for JSONB column
}
metrics_data.append(row)
print(f"Prepared {len(metrics_data)} summary metrics for upload to database")
print(f"Suit: {suit}")
print(f"Platform: {platform}")
# Connect to database and insert metrics
try:
conn = psycopg2.connect(args.connection_string)
# Insert summary metrics into perf_test_results (if any)
if metrics_data:
insert_metrics(conn, metrics_data)
else:
print("No summary metrics to upload")
# Process and insert detailed CSV results if provided
if args.results_csv:
print(f"Processing detailed CSV results from: {args.results_csv}")
# Create table if it doesn't exist
create_benchbase_results_details_table(conn)
# Process CSV data
csv_data = process_csv_results(
args.results_csv, start_timestamp_ms, suit, args.revision, platform
)
# Insert CSV data
if csv_data:
insert_csv_results(conn, csv_data)
else:
print("No CSV data to upload")
else:
print("No CSV file provided, skipping detailed results upload")
# Process and insert load metrics if provided
if args.load_log:
print(f"Processing load metrics from: {args.load_log}")
# Parse load log and extract metrics
load_metrics = parse_load_log(args.load_log, scalefactor)
# Insert load metrics
if load_metrics:
insert_load_metrics(
conn, load_metrics, suit, args.revision, platform, json.dumps(labels)
)
else:
print("No load metrics to upload")
else:
print("No load log file provided, skipping load metrics upload")
conn.close()
print("Database upload completed successfully")
except psycopg2.Error as e:
print(f"Database connection/query error: {e}")
sys.exit(1)
except Exception as e:
print(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -2,45 +2,48 @@ from __future__ import annotations
import os
import timeit
import traceback
from concurrent.futures import ThreadPoolExecutor as Exec
from pathlib import Path
from threading import Thread
from time import sleep
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, cast
import pytest
from fixtures.benchmark_fixture import NeonBenchmarker, PgBenchRunResult
from fixtures.log_helper import log
from fixtures.neon_api import NeonAPI, connection_parameters_to_env
from fixtures.neon_api import NeonAPI, connstr_to_env
from performance.test_perf_pgbench import utc_now_timestamp
if TYPE_CHECKING:
from fixtures.compare_fixtures import NeonCompare
from fixtures.neon_fixtures import Endpoint, PgBin
from fixtures.pg_version import PgVersion
from performance.test_perf_pgbench import utc_now_timestamp
# These tests compare performance for a write-heavy and read-heavy workloads of an ordinary endpoint
# compared to the endpoint which saves its LFC and prewarms using it on startup.
# compared to the endpoint which saves its LFC and prewarms using it on startup
def test_compare_prewarmed_pgbench_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.create_branch("normal")
env.create_branch("prewarmed")
pg_bin = neon_compare.pg_bin
ep_normal: Endpoint = env.endpoints.create_start("normal")
ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True)
ep_ordinary: Endpoint = neon_compare.endpoint
ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed")
for ep in [ep_normal, ep_prewarmed]:
for ep in [ep_ordinary, ep_prewarmed]:
connstr: str = ep.connstr()
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", connstr, "-s100"])
ep.safe_psql("CREATE EXTENSION neon")
client = ep.http_client()
client.offload_lfc()
ep.stop()
ep.start()
client.prewarm_lfc_wait()
ep.safe_psql("CREATE SCHEMA neon; CREATE EXTENSION neon WITH SCHEMA neon")
if ep == ep_prewarmed:
client = ep.http_client()
client.offload_lfc()
ep.stop()
ep.start(autoprewarm=True)
client.prewarm_lfc_wait()
else:
ep.stop()
ep.start()
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
@@ -59,6 +62,36 @@ def test_compare_prewarmed_pgbench_perf(neon_compare: NeonCompare):
neon_compare.zenbenchmark.record_pg_bench_result(name, res)
def test_compare_prewarmed_read_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.create_branch("prewarmed")
ep_ordinary: Endpoint = neon_compare.endpoint
ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed")
sql = [
"CREATE SCHEMA neon",
"CREATE EXTENSION neon WITH SCHEMA neon",
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
"INSERT INTO foo SELECT FROM generate_series(1,1000000)",
]
sql_check = "SELECT count(*) from foo"
ep_ordinary.safe_psql_many(sql)
ep_ordinary.stop()
ep_ordinary.start()
with neon_compare.record_duration("ordinary_run_duration"):
ep_ordinary.safe_psql(sql_check)
ep_prewarmed.safe_psql_many(sql)
client = ep_prewarmed.http_client()
client.offload_lfc()
ep_prewarmed.stop()
ep_prewarmed.start(autoprewarm=True)
client.prewarm_lfc_wait()
with neon_compare.record_duration("prewarmed_run_duration"):
ep_prewarmed.safe_psql(sql_check)
@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_compare_prewarmed_pgbench_perf_benchmark(
@@ -67,67 +100,66 @@ def test_compare_prewarmed_pgbench_perf_benchmark(
pg_version: PgVersion,
zenbenchmark: NeonBenchmarker,
):
name = f"Test prewarmed pgbench performance, GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
project = neon_api.create_project(pg_version, name)
project_id = project["project"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
err = False
try:
benchmark_impl(pg_bin, neon_api, project, zenbenchmark)
except Exception as e:
err = True
log.error(f"Caught exception: {e}")
log.error(traceback.format_exc())
finally:
assert not err
neon_api.delete_project(project_id)
"""
Prewarm API is not public, so this test relies on a pre-created project
with pgbench size of 3424, pgbench -i -IdtGvp -s3424. Sleeping and
offloading constants are hardcoded to this size as well
"""
project_id = os.getenv("PROJECT_ID")
assert project_id
ordinary_branch_id = ""
prewarmed_branch_id = ""
for branch in neon_api.get_branches(project_id)["branches"]:
if branch["name"] == "ordinary":
ordinary_branch_id = branch["id"]
if branch["name"] == "prewarmed":
prewarmed_branch_id = branch["id"]
assert len(ordinary_branch_id) > 0
assert len(prewarmed_branch_id) > 0
ep_ordinary = None
ep_prewarmed = None
for ep in neon_api.get_endpoints(project_id)["endpoints"]:
if ep["branch_id"] == ordinary_branch_id:
ep_ordinary = ep
if ep["branch_id"] == prewarmed_branch_id:
ep_prewarmed = ep
assert ep_ordinary
assert ep_prewarmed
ordinary_id = ep_ordinary["id"]
prewarmed_id = ep_prewarmed["id"]
def benchmark_impl(
pg_bin: PgBin, neon_api: NeonAPI, project: dict[str, Any], zenbenchmark: NeonBenchmarker
):
pgbench_size = int(os.getenv("PGBENCH_SIZE") or "3424") # 50GB
offload_secs = 20
test_duration_min = 5
test_duration_min = 3
pgbench_duration = f"-T{test_duration_min * 60}"
# prewarm API is not publicly exposed. In order to test performance of a
# fully prewarmed endpoint, wait after it restarts.
# The number here is empirical, based on manual runs on staging
pgbench_init_cmd = ["pgbench", "-P10", "-n", "-c10", pgbench_duration, "-Mprepared"]
pgbench_perf_cmd = pgbench_init_cmd + ["-S"]
prewarmed_sleep_secs = 180
branch_id = project["branch"]["id"]
project_id = project["project"]["id"]
normal_env = connection_parameters_to_env(
project["connection_uris"][0]["connection_parameters"]
)
normal_id = project["endpoints"][0]["id"]
prewarmed_branch_id = neon_api.create_branch(
project_id, "prewarmed", parent_id=branch_id, add_endpoint=False
)["branch"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
ep_prewarmed = neon_api.create_endpoint(
project_id,
prewarmed_branch_id,
endpoint_type="read_write",
settings={"autoprewarm": True, "offload_lfc_interval_seconds": offload_secs},
)
neon_api.wait_for_operation_to_finish(project_id)
prewarmed_env = normal_env.copy()
prewarmed_env["PGHOST"] = ep_prewarmed["endpoint"]["host"]
prewarmed_id = ep_prewarmed["endpoint"]["id"]
ordinary_uri = neon_api.get_connection_uri(project_id, ordinary_branch_id, ordinary_id)["uri"]
prewarmed_uri = neon_api.get_connection_uri(project_id, prewarmed_branch_id, prewarmed_id)[
"uri"
]
def bench(endpoint_name, endpoint_id, env):
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", f"-s{pgbench_size}"], env)
sleep(offload_secs * 2) # ensure LFC is offloaded after pgbench finishes
neon_api.restart_endpoint(project_id, endpoint_id)
sleep(prewarmed_sleep_secs)
log.info(f"Running pgbench for {pgbench_duration}s to warm up the cache")
pg_bin.run_capture(pgbench_init_cmd, env) # capture useful for debugging
log.info(f"Initialized {endpoint_name}")
if endpoint_name == "prewarmed":
log.info(f"sleeping {offload_secs * 2} to ensure LFC is offloaded")
sleep(offload_secs * 2)
neon_api.restart_endpoint(project_id, endpoint_id)
log.info(f"sleeping {prewarmed_sleep_secs} to ensure LFC is prewarmed")
sleep(prewarmed_sleep_secs)
else:
neon_api.restart_endpoint(project_id, endpoint_id)
log.info(f"Starting benchmark for {endpoint_name}")
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = pg_bin.run_capture(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env)
out = pg_bin.run_capture(pgbench_perf_cmd, env)
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
@@ -140,29 +172,9 @@ def benchmark_impl(
)
zenbenchmark.record_pg_bench_result(endpoint_name, res)
with Exec(max_workers=2) as exe:
exe.submit(bench, "normal", normal_id, normal_env)
exe.submit(bench, "prewarmed", prewarmed_id, prewarmed_env)
prewarmed_args = ("prewarmed", prewarmed_id, connstr_to_env(prewarmed_uri))
prewarmed_thread = Thread(target=bench, args=prewarmed_args)
prewarmed_thread.start()
def test_compare_prewarmed_read_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.create_branch("normal")
env.create_branch("prewarmed")
ep_normal: Endpoint = env.endpoints.create_start("normal")
ep_prewarmed: Endpoint = env.endpoints.create_start("prewarmed", autoprewarm=True)
sql = [
"CREATE EXTENSION neon",
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
"INSERT INTO foo SELECT FROM generate_series(1,1000000)",
]
for ep in [ep_normal, ep_prewarmed]:
ep.safe_psql_many(sql)
client = ep.http_client()
client.offload_lfc()
ep.stop()
ep.start()
client.prewarm_lfc_wait()
with neon_compare.record_duration(f"{ep.branch_name}_run_duration"):
ep.safe_psql("SELECT count(*) from foo")
bench("ordinary", ordinary_id, connstr_to_env(ordinary_uri))
prewarmed_thread.join()

View File

@@ -17,7 +17,7 @@ def reconfigure_endpoint(endpoint: Endpoint, pageserver_id: int, use_explicit_re
# to make sure that PG-initiated config refresh doesn't mess things up by reverting to the old config.
endpoint.update_pageservers_in_config(pageserver_id=pageserver_id)
# PG will eventually automatically refresh its configuration if it detects connectivity issues with pageservers.
# PG will automatically refresh its configuration if it detects connectivity issues with pageservers.
# We also allow the test to explicitly request a reconfigure so that the test can be sure that the
# endpoint is running with the latest configuration.
#

View File

@@ -0,0 +1,369 @@
from __future__ import annotations
import json
import os
import shutil
import subprocess
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import TYPE_CHECKING
import requests
from fixtures.log_helper import log
from typing_extensions import override
if TYPE_CHECKING:
from typing import Any
from fixtures.common_types import TenantId, TimelineId
from fixtures.neon_fixtures import NeonEnv
from fixtures.port_distributor import PortDistributor
def launch_compute_ctl(
env: NeonEnv,
endpoint_name: str,
external_http_port: int,
internal_http_port: int,
pg_port: int,
control_plane_port: int,
) -> subprocess.Popen[str]:
"""
Helper function to launch compute_ctl process with common configuration.
Returns the Popen process object.
"""
# Create endpoint directory structure following the standard pattern
endpoint_path = env.repo_dir / "endpoints" / endpoint_name
# Clean up any existing endpoint directory to avoid conflicts
if endpoint_path.exists():
shutil.rmtree(endpoint_path)
endpoint_path.mkdir(mode=0o755, parents=True, exist_ok=True)
# pgdata path - compute_ctl will create this directory during basebackup
pgdata_path = endpoint_path / "pgdata"
# Create log file in endpoint directory
log_file = endpoint_path / "compute.log"
log_handle = open(log_file, "w")
# Start compute_ctl pointing to our control plane
compute_ctl_path = env.neon_binpath / "compute_ctl"
connstr = f"postgresql://cloud_admin@localhost:{pg_port}/postgres"
# Find postgres binary path
pg_bin_path = env.pg_distrib_dir / env.pg_version.v_prefixed / "bin" / "postgres"
pg_lib_path = env.pg_distrib_dir / env.pg_version.v_prefixed / "lib"
env_vars = {
"INSTANCE_ID": "lakebase-instance-id",
"LD_LIBRARY_PATH": str(pg_lib_path), # Linux, etc.
"DYLD_LIBRARY_PATH": str(pg_lib_path), # macOS
}
cmd = [
str(compute_ctl_path),
"--external-http-port",
str(external_http_port),
"--internal-http-port",
str(internal_http_port),
"--pgdata",
str(pgdata_path),
"--connstr",
connstr,
"--pgbin",
str(pg_bin_path),
"--compute-id",
endpoint_name, # Use endpoint_name as compute-id
"--control-plane-uri",
f"http://127.0.0.1:{control_plane_port}",
"--lakebase-mode",
"true",
]
print(f"Launching compute_ctl with command: {cmd}")
# Start compute_ctl
process = subprocess.Popen(
cmd,
env=env_vars,
stdout=log_handle,
stderr=subprocess.STDOUT, # Combine stderr with stdout
text=True,
)
return process
def wait_for_compute_status(
compute_process: subprocess.Popen[str],
http_port: int,
expected_status: str,
timeout_seconds: int = 10,
) -> None:
"""
Wait for compute_ctl to reach the expected status.
Raises an exception if timeout is reached or process exits unexpectedly.
"""
start_time = time.time()
while time.time() - start_time < timeout_seconds:
try:
# Try to connect to the HTTP endpoint
response = requests.get(f"http://localhost:{http_port}/status", timeout=0.5)
if response.status_code == 200:
status_json = response.json()
# Check if it's in expected status
if status_json.get("status") == expected_status:
return
except (requests.ConnectionError, requests.Timeout):
pass
# Check if process has exited
if compute_process.poll() is not None:
raise Exception(
f"compute_ctl exited unexpectedly with code {compute_process.returncode}."
)
time.sleep(0.5)
# Timeout reached
compute_process.terminate()
raise Exception(
f"compute_ctl failed to reach {expected_status} status within {timeout_seconds} seconds."
)
class EmptySpecHandler(BaseHTTPRequestHandler):
"""HTTP handler that returns an Empty compute spec response"""
def do_GET(self):
if self.path.startswith("/compute/api/v2/computes/") and self.path.endswith("/spec"):
# Return empty status which will put compute in Empty state
response: dict[str, Any] = {
"status": "empty",
"spec": None,
"compute_ctl_config": {"jwks": {"keys": []}},
}
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(response).encode())
else:
self.send_error(404)
@override
def log_message(self, format: str, *args: Any):
# Suppress request logging
pass
def test_compute_terminate_empty(neon_simple_env: NeonEnv, port_distributor: PortDistributor):
"""
Test that terminating a compute in Empty status works correctly.
This tests the bug fix where terminating an Empty compute would hang
waiting for a non-existent postgres process to terminate.
"""
env = neon_simple_env
# Get ports for our test
control_plane_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
internal_http_port = port_distributor.get_port()
pg_port = port_distributor.get_port()
# Start a simple HTTP server that will serve the Empty spec
server = HTTPServer(("127.0.0.1", control_plane_port), EmptySpecHandler)
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
compute_process = None
try:
# Start compute_ctl with ephemeral tenant ID
compute_process = launch_compute_ctl(
env,
"test-empty-compute",
external_http_port,
internal_http_port,
pg_port,
control_plane_port,
)
# Wait for compute_ctl to start and report "empty" status
wait_for_compute_status(compute_process, external_http_port, "empty")
# Now send terminate request
response = requests.post(f"http://localhost:{external_http_port}/terminate")
# Verify that the termination request sends back a 200 OK response and is not abruptly terminated.
assert response.status_code == 200, (
f"Expected 200 OK, got {response.status_code}: {response.text}"
)
# Wait for compute_ctl to exit
exit_code = compute_process.wait(timeout=10)
assert exit_code == 0, f"compute_ctl exited with non-zero code: {exit_code}"
finally:
# Clean up
server.shutdown()
if compute_process and compute_process.poll() is None:
compute_process.terminate()
compute_process.wait()
class SwitchableConfigHandler(BaseHTTPRequestHandler):
"""HTTP handler that can switch between normal compute configs and compute configs without specs"""
return_empty_spec: bool = False
tenant_id: TenantId | None = None
timeline_id: TimelineId | None = None
pageserver_port: int | None = None
safekeeper_connstrs: list[str] | None = None
def do_GET(self):
if self.path.startswith("/compute/api/v2/computes/") and self.path.endswith("/spec"):
if self.return_empty_spec:
# Return empty status
response: dict[str, object | None] = {
"status": "empty",
"spec": None,
"compute_ctl_config": {
"jwks": {"keys": []},
},
}
else:
# Return normal attached spec
response = {
"status": "attached",
"spec": {
"format_version": 1.0,
"cluster": {
"roles": [],
"databases": [],
"postgresql_conf": "shared_preload_libraries='neon'",
},
"tenant_id": str(self.tenant_id) if self.tenant_id else "",
"timeline_id": str(self.timeline_id) if self.timeline_id else "",
"pageserver_connstring": f"postgres://no_user@localhost:{self.pageserver_port}"
if self.pageserver_port
else "",
"safekeeper_connstrings": self.safekeeper_connstrs or [],
"mode": "Primary",
"skip_pg_catalog_updates": True,
"reconfigure_concurrency": 1,
"suspend_timeout_seconds": -1,
},
"compute_ctl_config": {
"jwks": {"keys": []},
},
}
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(response).encode())
else:
self.send_error(404)
@override
def log_message(self, format: str, *args: Any):
# Suppress request logging
pass
def test_compute_empty_spec_during_refresh_configuration(
neon_simple_env: NeonEnv, port_distributor: PortDistributor
):
"""
Test that compute exits when it receives an empty spec during refresh configuration state.
This test:
1. Start compute with a normal spec
2. Change the spec handler to return empty spec
3. Trigger some condition to force compute to refresh configuration
4. Verify that compute_ctl exits
"""
env = neon_simple_env
# Get ports for our test
control_plane_port = port_distributor.get_port()
external_http_port = port_distributor.get_port()
internal_http_port = port_distributor.get_port()
pg_port = port_distributor.get_port()
# Set up handler class variables
SwitchableConfigHandler.tenant_id = env.initial_tenant
SwitchableConfigHandler.timeline_id = env.initial_timeline
SwitchableConfigHandler.pageserver_port = env.pageserver.service_port.pg
# Convert comma-separated string to list
safekeeper_connstrs = env.get_safekeeper_connstrs()
if safekeeper_connstrs:
SwitchableConfigHandler.safekeeper_connstrs = safekeeper_connstrs.split(",")
else:
SwitchableConfigHandler.safekeeper_connstrs = []
SwitchableConfigHandler.return_empty_spec = False # Start with normal spec
# Start HTTP server with switchable spec handler
server = HTTPServer(("127.0.0.1", control_plane_port), SwitchableConfigHandler)
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
compute_process = None
try:
# Start compute_ctl with tenant and timeline IDs
# Use a unique endpoint name to avoid conflicts
endpoint_name = f"test-refresh-compute-{os.getpid()}"
compute_process = launch_compute_ctl(
env,
endpoint_name,
external_http_port,
internal_http_port,
pg_port,
control_plane_port,
)
# Wait for compute_ctl to start and report "running" status
wait_for_compute_status(compute_process, external_http_port, "running", timeout_seconds=30)
log.info("Compute is running. Now returning empty spec and trigger configuration refresh.")
# Switch spec fetch handler to return empty spec
SwitchableConfigHandler.return_empty_spec = True
# Trigger a configuration refresh
try:
requests.post(f"http://localhost:{internal_http_port}/refresh_configuration")
except requests.RequestException as e:
log.info(f"Call to /refresh_configuration failed: {e}")
log.info(
"Ignoring the error, assuming that compute_ctl is already refreshing or has exited"
)
# Wait for compute_ctl to exit (it should exit when it gets an empty spec during refresh)
exit_start_time = time.time()
while time.time() - exit_start_time < 30:
if compute_process.poll() is not None:
# Process exited
break
time.sleep(0.5)
# Verify that compute_ctl exited
exit_code = compute_process.poll()
if exit_code is None:
compute_process.terminate()
raise Exception("compute_ctl did not exit after receiving empty spec.")
# The exit code might not be 0 in this case since it's an unexpected termination
# but we mainly care that it did exit
assert exit_code is not None, "compute_ctl should have exited"
finally:
# Clean up
server.shutdown()
if compute_process and compute_process.poll() is None:
compute_process.terminate()
compute_process.wait()

View File

@@ -0,0 +1,137 @@
import json
import shutil
from fixtures.common_types import TenantShardId
from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import Endpoint, NeonEnvBuilder, NeonPageserver
from requests.exceptions import ConnectionError
# Helper function to attempt reconfiguration of the compute to point to a new pageserver. Note that in these tests,
# we don't expect the reconfiguration attempts to go through, as we will be pointing the compute at a "wrong" pageserver.
def _attempt_reconfiguration(endpoint: Endpoint, new_pageserver_id: int, timeout_sec: float):
try:
endpoint.reconfigure(pageserver_id=new_pageserver_id, timeout_sec=timeout_sec)
except Exception as e:
log.info(f"reconfiguration failed with exception {e}")
pass
def read_misrouted_metric_value(pageserver: NeonPageserver) -> float:
return (
pageserver.http_client()
.get_metrics()
.query_one("pageserver_misrouted_pagestream_requests_total")
.value
)
def read_request_error_metric_value(endpoint: Endpoint) -> float:
return (
parse_metrics(endpoint.http_client().metrics())
.query_one("pg_cctl_pagestream_request_errors_total")
.value
)
def test_misrouted_to_secondary(
neon_env_builder: NeonEnvBuilder,
):
"""
Tests that the following metrics are incremented when compute tries to talk to a secondary pageserver:
- On pageserver receiving the request: pageserver_misrouted_pagestream_requests_total
- On compute: pg_cctl_pagestream_request_errors_total
"""
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.broker.start()
env.storage_controller.start()
for ps in env.pageservers:
ps.start()
for sk in env.safekeepers:
sk.start()
# Create a tenant that has one primary and one secondary. Due to primary/secondary placement constraints,
# the primary and secondary pageservers will be different.
tenant_id, _ = env.create_tenant(shard_count=1, placement_policy=json.dumps({"Attached": 1}))
endpoint = env.endpoints.create(
"main", tenant_id=tenant_id, config_lines=["neon.lakebase_mode = true"]
)
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()
# Get the primary pageserver serving the zero shard of the tenant, and detach it from the primary pageserver.
# This test operation configures tenant directly on the pageserver/does not go through the storage controller,
# so the compute does not get any notifications and will keep pointing at the detached pageserver.
tenant_zero_shard = TenantShardId(tenant_id, shard_number=0, shard_count=1)
primary_ps = env.get_tenant_pageserver(tenant_zero_shard)
secondary_ps = (
env.pageservers[1] if primary_ps.id == env.pageservers[0].id else env.pageservers[0]
)
# Now try to point the compute at the pageserver that is acting as secondary for the tenant. Test that the metrics
# on both compute_ctl and the pageserver register the misrouted requests following the reconfiguration attempt.
assert read_misrouted_metric_value(secondary_ps) == 0
assert read_request_error_metric_value(endpoint) == 0
_attempt_reconfiguration(endpoint, new_pageserver_id=secondary_ps.id, timeout_sec=2.0)
assert read_misrouted_metric_value(secondary_ps) > 0
try:
assert read_request_error_metric_value(endpoint) > 0
except ConnectionError:
# When configuring PG to use misconfigured pageserver, PG will cancel the query after certain number of failed
# reconfigure attempts. This will cause compute_ctl to exit.
log.info("Cannot connect to PG, ignoring")
pass
def test_misrouted_to_ps_not_hosting_tenant(
neon_env_builder: NeonEnvBuilder,
):
"""
Tests that the following metrics are incremented when compute tries to talk to a pageserver that does not host the tenant:
- On pageserver receiving the request: pageserver_misrouted_pagestream_requests_total
- On compute: pg_cctl_pagestream_request_errors_total
"""
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.broker.start()
env.storage_controller.start(handle_ps_local_disk_loss=False)
for ps in env.pageservers:
ps.start()
for sk in env.safekeepers:
sk.start()
tenant_id, _ = env.create_tenant(shard_count=1)
endpoint = env.endpoints.create(
"main", tenant_id=tenant_id, config_lines=["neon.lakebase_mode = true"]
)
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()
tenant_ps_id = env.get_tenant_pageserver(
TenantShardId(tenant_id, shard_number=0, shard_count=1)
).id
non_hosting_ps = (
env.pageservers[1] if tenant_ps_id == env.pageservers[0].id else env.pageservers[0]
)
# Clear the disk of the non-hosting PS to make sure that it indeed doesn't have any information about the tenant.
non_hosting_ps.stop(immediate=True)
shutil.rmtree(non_hosting_ps.tenant_dir())
non_hosting_ps.start()
# Now try to point the compute to the non-hosting pageserver. Test that the metrics
# on both compute_ctl and the pageserver register the misrouted requests following the reconfiguration attempt.
assert read_misrouted_metric_value(non_hosting_ps) == 0
assert read_request_error_metric_value(endpoint) == 0
_attempt_reconfiguration(endpoint, new_pageserver_id=non_hosting_ps.id, timeout_sec=2.0)
assert read_misrouted_metric_value(non_hosting_ps) > 0
try:
assert read_request_error_metric_value(endpoint) > 0
except ConnectionError:
# When configuring PG to use misconfigured pageserver, PG will cancel the query after certain number of failed
# reconfigure attempts. This will cause compute_ctl to exit.
log.info("Cannot connect to PG, ignoring")
pass

View File

@@ -129,7 +129,10 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
Test static endpoint is protected from GC by acquiring and renewing lsn leases.
"""
LSN_LEASE_LENGTH = 8
LSN_LEASE_LENGTH = (
14 # This value needs to be large enough for compute_ctl to send two lease requests.
)
neon_env_builder.num_pageservers = 2
# GC is manual triggered.
env = neon_env_builder.init_start(
@@ -230,6 +233,15 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
log.info(f"`SELECT` query succeed after GC, {ctx=}")
return offset
# It's not reliable to let the compute renew the lease in this test case as we have a very tight
# lease timeout. Therefore, the test case itself will renew the lease.
#
# This is a workaround to make the test case more deterministic.
def renew_lease(env: NeonEnv, lease_lsn: Lsn):
env.storage_controller.pageserver_api().timeline_lsn_lease(
env.initial_tenant, env.initial_timeline, lease_lsn
)
# Insert some records on main branch
with env.endpoints.create_start("main", config_lines=["shared_buffers=1MB"]) as ep_main:
with ep_main.cursor() as cur:
@@ -242,6 +254,9 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
XLOG_BLCKSZ = 8192
lsn = Lsn((int(lsn) // XLOG_BLCKSZ) * XLOG_BLCKSZ)
# We need to mock the way cplane works: it gets a lease for a branch before starting the compute.
renew_lease(env, lsn)
with env.endpoints.create_start(
branch_name="main",
endpoint_id="static",
@@ -251,9 +266,6 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
cur.execute("SELECT count(*) FROM t0")
assert cur.fetchone() == (ROW_COUNT,)
# Wait for static compute to renew lease at least once.
time.sleep(LSN_LEASE_LENGTH / 2)
generate_updates_on_main(env, ep_main, 3, end=100)
offset = trigger_gc_and_select(
@@ -263,10 +275,10 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
# Trigger Pageserver restarts
for ps in env.pageservers:
ps.stop()
# Static compute should have at least one lease request failure due to connection.
time.sleep(LSN_LEASE_LENGTH / 2)
ps.start()
renew_lease(env, lsn)
trigger_gc_and_select(
env,
ep_static,
@@ -282,6 +294,9 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
)
env.storage_controller.reconcile_until_idle()
# Wait for static compute to renew lease on the new pageserver.
time.sleep(LSN_LEASE_LENGTH + 3)
trigger_gc_and_select(
env,
ep_static,
@@ -292,7 +307,6 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
# Do some update so we can increment gc_cutoff
generate_updates_on_main(env, ep_main, i, end=100)
# Wait for the existing lease to expire.
time.sleep(LSN_LEASE_LENGTH + 1)
# Now trigger GC again, layers should be removed.

View File

@@ -1751,14 +1751,15 @@ def test_back_pressure_per_shard(neon_env_builder: NeonEnvBuilder):
"max_replication_apply_lag = 0",
"max_replication_flush_lag = 15MB",
"neon.max_cluster_size = 10GB",
"neon.lakebase_mode = true",
],
)
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()
# generate 10MB of data
# generate 20MB of data
endpoint.safe_psql(
"CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 10000) s;"
"CREATE TABLE usertable AS SELECT s AS KEY, repeat('a', 1000) as VALUE from generate_series(1, 20000) s;"
)
res = endpoint.safe_psql("SELECT neon.backpressure_throttling_time() as throttling_time")[0]
assert res[0] == 0, f"throttling_time should be 0, but got {res[0]}"

View File

@@ -3309,6 +3309,7 @@ def test_ps_unavailable_after_delete(
ps.allowed_errors.append(".*request was dropped before completing.*")
env.storage_controller.node_delete(ps.id, force=True)
wait_until(lambda: assert_nodes_count(2))
env.storage_controller.reconcile_until_idle()
elif deletion_api == DeletionAPIKind.OLD:
env.storage_controller.node_delete_old(ps.id)
assert_nodes_count(2)
@@ -4959,3 +4960,49 @@ def test_storage_controller_forward_404(neon_env_builder: NeonEnvBuilder):
env.storage_controller.configure_failpoints(
("reconciler-live-migrate-post-generation-inc", "off")
)
def test_re_attach_with_stuck_secondary(neon_env_builder: NeonEnvBuilder):
"""
This test assumes that the secondary location cannot be configured for whatever reason.
It then attempts to detach and and attach the tenant back again and, finally, checks
for observed state consistency by attempting to create a timeline.
See LKB-204 for more details.
"""
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
env.storage_controller.allowed_errors.append(".*failpoint.*")
tenant_id, _ = env.create_tenant(shard_count=1, placement_policy='{"Attached":1}')
env.storage_controller.reconcile_until_idle()
locations = env.storage_controller.locate(tenant_id)
assert len(locations) == 1
primary: int = locations[0]["node_id"]
not_primary = [ps.id for ps in env.pageservers if ps.id != primary]
assert len(not_primary) == 1
secondary = not_primary[0]
env.get_pageserver(secondary).http_client().configure_failpoints(
("put-location-conf-handler", "return(1)")
)
env.storage_controller.tenant_policy_update(tenant_id, {"placement": "Detached"})
with pytest.raises(Exception, match="failpoint"):
env.storage_controller.reconcile_all()
env.storage_controller.tenant_policy_update(tenant_id, {"placement": {"Attached": 1}})
with pytest.raises(Exception, match="failpoint"):
env.storage_controller.reconcile_all()
env.storage_controller.pageserver_api().timeline_create(
pg_version=PgVersion.NOT_SET, tenant_id=tenant_id, new_timeline_id=TimelineId.generate()
)

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import os
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING
@@ -768,6 +769,14 @@ def test_lsn_lease_storcon(neon_env_builder: NeonEnvBuilder):
"compaction_period": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=conf)
# ShardSplit is slow in debug builds, so ignore the warning
if os.getenv("BUILD_TYPE", "debug") == "debug":
env.storage_controller.allowed_errors.extend(
[
".*Exclusive lock by ShardSplit was held.*",
]
)
with env.endpoints.create_start(
"main",
) as ep:

View File

@@ -298,15 +298,26 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
assert post_detach_samples == set()
def test_pageserver_metrics_removed_after_offload(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize("compaction", ["compaction_enabled", "compaction_disabled"])
def test_pageserver_metrics_removed_after_offload(
neon_env_builder: NeonEnvBuilder, compaction: str
):
"""Tests that when a timeline is offloaded, the tenant specific metrics are not left behind"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
tenant_1, _ = env.create_tenant()
tenant_1, _ = env.create_tenant(
conf={
# disable background compaction and GC so that we don't have leftover tasks
# after offloading.
"gc_period": "0s",
"compaction_period": "0s",
}
if compaction == "compaction_disabled"
else None
)
timeline_1 = env.create_timeline("test_metrics_removed_after_offload_1", tenant_id=tenant_1)
timeline_2 = env.create_timeline("test_metrics_removed_after_offload_2", tenant_id=tenant_1)
@@ -351,6 +362,23 @@ def test_pageserver_metrics_removed_after_offload(neon_env_builder: NeonEnvBuild
state=TimelineArchivalState.ARCHIVED,
)
env.pageserver.http_client().timeline_offload(tenant_1, timeline)
# We need to wait until all background jobs are finished before we can check the metrics.
# There're many of them: compaction, GC, etc.
wait_until(
lambda: all(
sample.value == 0
for sample in env.pageserver.http_client()
.get_metrics()
.query_all("pageserver_background_loop_semaphore_waiting_tasks")
)
and all(
sample.value == 0
for sample in env.pageserver.http_client()
.get_metrics()
.query_all("pageserver_background_loop_semaphore_running_tasks")
)
)
post_offload_samples = set(
[x.name for x in get_ps_metric_samples_for_timeline(tenant_1, timeline)]
)