mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
Move logic for ingest benchmark from GitHub workflow into python testcase (#9762)
## Problem The first version of the ingest benchmark had some parsing and reporting logic in shell script inside GitHub workflow. it is better to move that logic into a python testcase so that we can also run it locally. ## Summary of changes - Create new python testcase - invoke pgcopydb inside python test case - move the following logic into python testcase - determine backpressure - invoke pgcopydb and report its progress - parse pgcopydb log and extract metrics - insert metrics into perf test database - add additional column to perf test database that can receive endpoint ID used for pgcopydb run to have it available in grafana dashboard when retrieving other metrics for an endpoint ## Example run https://github.com/neondatabase/neon/actions/runs/11860622170/job/33056264386
This commit is contained in:
267
test_runner/performance/test_perf_ingest_using_pgcopydb.py
Normal file
267
test_runner/performance/test_perf_ingest_using_pgcopydb.py
Normal file
@@ -0,0 +1,267 @@
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.utils import humantime_to_ms
|
||||
|
||||
|
||||
def setup_environment():
|
||||
"""Set up necessary environment variables for pgcopydb execution.
|
||||
|
||||
Expects the following variables to be set in the environment:
|
||||
- PG_CONFIG: e.g. /tmp/neon/pg_install/v16/bin/pg_config
|
||||
- PSQL: e.g. /tmp/neon/pg_install/v16/bin/psql
|
||||
- PG_16_LIB_PATH: e.g. /tmp/neon/pg_install/v16/lib
|
||||
- PGCOPYDB: e.g. /pgcopydb/bin/pgcopydb
|
||||
- PGCOPYDB_LIB_PATH: e.g. /pgcopydb/lib
|
||||
- BENCHMARK_INGEST_SOURCE_CONNSTR
|
||||
- BENCHMARK_INGEST_TARGET_CONNSTR
|
||||
- PERF_TEST_RESULT_CONNSTR
|
||||
- TARGET_PROJECT_TYPE
|
||||
|
||||
"""
|
||||
# Ensure required environment variables are set
|
||||
required_env_vars = [
|
||||
"PGCOPYDB",
|
||||
"PGCOPYDB_LIB_PATH",
|
||||
"PG_CONFIG",
|
||||
"PSQL",
|
||||
"PG_16_LIB_PATH",
|
||||
"BENCHMARK_INGEST_SOURCE_CONNSTR",
|
||||
"BENCHMARK_INGEST_TARGET_CONNSTR",
|
||||
"PERF_TEST_RESULT_CONNSTR",
|
||||
"TARGET_PROJECT_TYPE",
|
||||
]
|
||||
for var in required_env_vars:
|
||||
if not os.getenv(var):
|
||||
raise OSError(f"Required environment variable '{var}' is not set.")
|
||||
|
||||
|
||||
def build_pgcopydb_command(pgcopydb_filter_file: Path, test_output_dir: Path):
|
||||
"""Builds the pgcopydb command to execute using existing environment variables."""
|
||||
pgcopydb_executable = os.getenv("PGCOPYDB")
|
||||
if not pgcopydb_executable:
|
||||
raise OSError("PGCOPYDB environment variable is not set.")
|
||||
|
||||
return [
|
||||
pgcopydb_executable,
|
||||
"clone",
|
||||
"--dir",
|
||||
str(test_output_dir),
|
||||
"--skip-vacuum",
|
||||
"--no-owner",
|
||||
"--no-acl",
|
||||
"--skip-db-properties",
|
||||
"--table-jobs",
|
||||
"4",
|
||||
"--index-jobs",
|
||||
"4",
|
||||
"--restore-jobs",
|
||||
"4",
|
||||
"--split-tables-larger-than",
|
||||
"10GB",
|
||||
"--skip-extensions",
|
||||
"--use-copy-binary",
|
||||
"--filters",
|
||||
str(pgcopydb_filter_file),
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture() # must be function scoped because test_output_dir is function scoped
|
||||
def pgcopydb_filter_file(test_output_dir: Path) -> Path:
|
||||
"""Creates the pgcopydb_filter.txt file required by pgcopydb."""
|
||||
filter_content = textwrap.dedent("""\
|
||||
[include-only-table]
|
||||
public.events
|
||||
public.emails
|
||||
public.email_transmissions
|
||||
public.payments
|
||||
public.editions
|
||||
public.edition_modules
|
||||
public.sp_content
|
||||
public.email_broadcasts
|
||||
public.user_collections
|
||||
public.devices
|
||||
public.user_accounts
|
||||
public.lessons
|
||||
public.lesson_users
|
||||
public.payment_methods
|
||||
public.orders
|
||||
public.course_emails
|
||||
public.modules
|
||||
public.users
|
||||
public.module_users
|
||||
public.courses
|
||||
public.payment_gateway_keys
|
||||
public.accounts
|
||||
public.roles
|
||||
public.payment_gateways
|
||||
public.management
|
||||
public.event_names
|
||||
""")
|
||||
filter_path = test_output_dir / "pgcopydb_filter.txt"
|
||||
filter_path.write_text(filter_content)
|
||||
return filter_path
|
||||
|
||||
|
||||
def get_backpressure_time(connstr):
|
||||
"""Executes a query to get the backpressure throttling time in seconds."""
|
||||
query = "select backpressure_throttling_time()/1000000;"
|
||||
psql_path = os.getenv("PSQL")
|
||||
if psql_path is None:
|
||||
raise OSError("The PSQL environment variable is not set.")
|
||||
result = subprocess.run(
|
||||
[psql_path, connstr, "-t", "-c", query], capture_output=True, text=True, check=True
|
||||
)
|
||||
return float(result.stdout.strip())
|
||||
|
||||
|
||||
def run_command_and_log_output(command, log_file_path: Path):
|
||||
"""
|
||||
Runs a command and logs output to both a file and GitHub Actions console.
|
||||
|
||||
Args:
|
||||
command (list): The command to execute.
|
||||
log_file_path (Path): Path object for the log file where output is written.
|
||||
"""
|
||||
# Define a list of necessary environment variables for pgcopydb
|
||||
custom_env_vars = {
|
||||
"LD_LIBRARY_PATH": f"{os.getenv('PGCOPYDB_LIB_PATH')}:{os.getenv('PG_16_LIB_PATH')}",
|
||||
"PGCOPYDB_SOURCE_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_SOURCE_CONNSTR")),
|
||||
"PGCOPYDB_TARGET_PGURI": cast(str, os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR")),
|
||||
"PGOPTIONS": "-c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7",
|
||||
}
|
||||
# Combine the current environment with custom variables
|
||||
env = os.environ.copy()
|
||||
env.update(custom_env_vars)
|
||||
|
||||
with log_file_path.open("w") as log_file:
|
||||
process = subprocess.Popen(
|
||||
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env
|
||||
)
|
||||
|
||||
assert process.stdout is not None, "process.stdout should not be None"
|
||||
|
||||
# Stream output to both log file and console
|
||||
for line in process.stdout:
|
||||
print(line, end="") # Stream to GitHub Actions log
|
||||
sys.stdout.flush()
|
||||
log_file.write(line) # Write to log file
|
||||
|
||||
process.wait() # Wait for the process to finish
|
||||
if process.returncode != 0:
|
||||
raise subprocess.CalledProcessError(process.returncode, command)
|
||||
|
||||
|
||||
def parse_log_and_report_metrics(
|
||||
zenbenchmark: NeonBenchmarker, log_file_path: Path, backpressure_time_diff: float
|
||||
):
|
||||
"""Parses the pgcopydb log file for performance metrics and reports them to the database."""
|
||||
metrics = {"backpressure_time": backpressure_time_diff}
|
||||
|
||||
# Define regex patterns to capture metrics
|
||||
metric_patterns = {
|
||||
"COPY_INDEX_CONSTRAINTS_VACUUM": re.compile(
|
||||
r"COPY, INDEX, CONSTRAINTS, VACUUM \(wall clock\).*"
|
||||
),
|
||||
"COPY_CUMULATIVE": re.compile(r"COPY \(cumulative\).*"),
|
||||
"CREATE_INDEX_CUMULATIVE": re.compile(r"CREATE INDEX \(cumulative\).*"),
|
||||
"CONSTRAINTS_CUMULATIVE": re.compile(r"CONSTRAINTS \(cumulative\).*"),
|
||||
"FINALIZE_SCHEMA": re.compile(r"Finalize Schema.*"),
|
||||
"TOTAL_DURATION": re.compile(r"Total Wall Clock Duration.*"),
|
||||
}
|
||||
|
||||
# Parse log file
|
||||
with log_file_path.open("r") as log_file:
|
||||
for line in log_file:
|
||||
for metric_name, pattern in metric_patterns.items():
|
||||
if pattern.search(line):
|
||||
# Extract duration and convert it to seconds
|
||||
duration_match = re.search(r"\d+h\d+m|\d+s|\d+ms|\d+\.\d+s", line)
|
||||
if duration_match:
|
||||
duration_str = duration_match.group(0)
|
||||
parts = re.findall(r"\d+[a-zA-Z]+", duration_str)
|
||||
rust_like_humantime = " ".join(parts)
|
||||
duration_seconds = humantime_to_ms(rust_like_humantime) / 1000.0
|
||||
metrics[metric_name] = duration_seconds
|
||||
|
||||
endpoint_id = {"endpoint_id": get_endpoint_id()}
|
||||
for metric_name, duration_seconds in metrics.items():
|
||||
zenbenchmark.record(
|
||||
metric_name, duration_seconds, "s", MetricReport.LOWER_IS_BETTER, endpoint_id
|
||||
)
|
||||
|
||||
|
||||
def get_endpoint_id():
|
||||
"""Extracts and returns the first segment of the hostname from the PostgreSQL URI stored in BENCHMARK_INGEST_TARGET_CONNSTR."""
|
||||
connstr = os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR")
|
||||
if connstr is None:
|
||||
raise OSError("BENCHMARK_INGEST_TARGET_CONNSTR environment variable is not set.")
|
||||
|
||||
# Parse the URI
|
||||
parsed_url = urlparse(connstr)
|
||||
|
||||
# Extract the hostname and split to get the first segment
|
||||
hostname = parsed_url.hostname
|
||||
if hostname is None:
|
||||
raise ValueError("Unable to parse hostname from BENCHMARK_INGEST_TARGET_CONNSTR")
|
||||
|
||||
# Split the hostname by dots and take the first segment
|
||||
endpoint_id = hostname.split(".")[0]
|
||||
|
||||
return endpoint_id
|
||||
|
||||
|
||||
@pytest.fixture() # must be function scoped because test_output_dir is function scoped
|
||||
def log_file_path(test_output_dir):
|
||||
"""Fixture to provide a temporary log file path."""
|
||||
if not os.getenv("TARGET_PROJECT_TYPE"):
|
||||
raise OSError("Required environment variable 'TARGET_PROJECT_TYPE' is not set.")
|
||||
return (test_output_dir / os.getenv("TARGET_PROJECT_TYPE")).with_suffix(".log")
|
||||
|
||||
|
||||
@pytest.mark.remote_cluster
|
||||
def test_ingest_performance_using_pgcopydb(
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
log_file_path: Path,
|
||||
pgcopydb_filter_file: Path,
|
||||
test_output_dir: Path,
|
||||
):
|
||||
"""
|
||||
Simulate project migration from another PostgreSQL provider to Neon.
|
||||
|
||||
Measure performance for Neon ingest steps
|
||||
- COPY
|
||||
- CREATE INDEX
|
||||
- CREATE CONSTRAINT
|
||||
- VACUUM ANALYZE
|
||||
- create foreign keys
|
||||
|
||||
Use pgcopydb to copy data from the source database to the destination database.
|
||||
"""
|
||||
# Set up environment and create filter file
|
||||
setup_environment()
|
||||
|
||||
# Get backpressure time before ingest
|
||||
backpressure_time_before = get_backpressure_time(os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR"))
|
||||
|
||||
# Build and run the pgcopydb command
|
||||
command = build_pgcopydb_command(pgcopydb_filter_file, test_output_dir)
|
||||
try:
|
||||
run_command_and_log_output(command, log_file_path)
|
||||
except subprocess.CalledProcessError as e:
|
||||
pytest.fail(f"pgcopydb command failed with error: {e}")
|
||||
|
||||
# Get backpressure time after ingest and calculate the difference
|
||||
backpressure_time_after = get_backpressure_time(os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR"))
|
||||
backpressure_time_diff = backpressure_time_after - backpressure_time_before
|
||||
|
||||
# Parse log file and report metrics, including backpressure time difference
|
||||
parse_log_and_report_metrics(zenbenchmark, log_file_path, backpressure_time_diff)
|
||||
Reference in New Issue
Block a user