Files
neon/test_runner/performance/test_perf_ingest_using_pgcopydb.py
Alexander Bayandin 30a7dd630c ruff: enable TC — flake8-type-checking (#11368)
## Problem

`TYPE_CHECKING` is used inconsistently across Python tests.

## Summary of changes
- Update `ruff`: 0.7.0 -> 0.11.2
- Enable TC (flake8-type-checking):
https://docs.astral.sh/ruff/rules/#flake8-type-checking-tc
- (auto)fix all new issues
2025-03-30 18:58:33 +00:00

268 lines
9.5 KiB
Python

import os
import re
import subprocess
import sys
import textwrap
from pathlib import Path
from typing import cast
from urllib.parse import urlparse
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.utils import humantime_to_ms
def setup_environment():
"""Set up necessary environment variables for pgcopydb execution.
Expects the following variables to be set in the environment:
- PG_CONFIG: e.g. /tmp/neon/pg_install/v16/bin/pg_config
- PSQL: e.g. /tmp/neon/pg_install/v16/bin/psql
- PG_16_LIB_PATH: e.g. /tmp/neon/pg_install/v16/lib
- PGCOPYDB: e.g. /pgcopydb/bin/pgcopydb
- PGCOPYDB_LIB_PATH: e.g. /pgcopydb/lib
- BENCHMARK_INGEST_SOURCE_CONNSTR
- BENCHMARK_INGEST_TARGET_CONNSTR
- PERF_TEST_RESULT_CONNSTR
- TARGET_PROJECT_TYPE
"""
# Ensure required environment variables are set
required_env_vars = [
"PGCOPYDB",
"PGCOPYDB_LIB_PATH",
"PG_CONFIG",
"PSQL",
"PG_16_LIB_PATH",
"BENCHMARK_INGEST_SOURCE_CONNSTR",
"BENCHMARK_INGEST_TARGET_CONNSTR",
"PERF_TEST_RESULT_CONNSTR",
"TARGET_PROJECT_TYPE",
]
for var in required_env_vars:
if not os.getenv(var):
raise OSError(f"Required environment variable '{var}' is not set.")
def build_pgcopydb_command(pgcopydb_filter_file: Path, test_output_dir: Path):
"""Builds the pgcopydb command to execute using existing environment variables."""
pgcopydb_executable = os.getenv("PGCOPYDB")
if not pgcopydb_executable:
raise OSError("PGCOPYDB environment variable is not set.")
return [
pgcopydb_executable,
"clone",
"--dir",
str(test_output_dir),
"--skip-vacuum",
"--no-owner",
"--no-acl",
"--skip-db-properties",
"--table-jobs",
"4",
"--index-jobs",
"4",
"--restore-jobs",
"4",
"--split-tables-larger-than",
"10GB",
"--skip-extensions",
"--use-copy-binary",
"--filters",
str(pgcopydb_filter_file),
]
@pytest.fixture() # must be function scoped because test_output_dir is function scoped
def pgcopydb_filter_file(test_output_dir: Path) -> Path:
"""Creates the pgcopydb_filter.txt file required by pgcopydb."""
filter_content = textwrap.dedent("""\
[include-only-table]
public.events
public.emails
public.email_transmissions
public.payments
public.editions
public.edition_modules
public.sp_content
public.email_broadcasts
public.user_collections
public.devices
public.user_accounts
public.lessons
public.lesson_users
public.payment_methods
public.orders
public.course_emails
public.modules
public.users
public.module_users
public.courses
public.payment_gateway_keys
public.accounts
public.roles
public.payment_gateways
public.management
public.event_names
""")
filter_path = test_output_dir / "pgcopydb_filter.txt"
filter_path.write_text(filter_content)
return filter_path
def get_backpressure_time(connstr):
"""Executes a query to get the backpressure throttling time in seconds."""
query = "select backpressure_throttling_time()/1000000;"
psql_path = os.getenv("PSQL")
if psql_path is None:
raise OSError("The PSQL environment variable is not set.")
result = subprocess.run(
[psql_path, connstr, "-t", "-c", query], capture_output=True, text=True, check=True
)
return float(result.stdout.strip())
def run_command_and_log_output(command, log_file_path: Path):
"""
Runs a command and logs output to both a file and GitHub Actions console.
Args:
command (list): The command to execute.
log_file_path (Path): Path object for the log file where output is written.
"""
# Define a list of necessary environment variables for pgcopydb
custom_env_vars = {
"LD_LIBRARY_PATH": f"{os.getenv('PGCOPYDB_LIB_PATH')}:{os.getenv('PG_16_LIB_PATH')}",
"PGCOPYDB_SOURCE_PGURI": cast("str", os.getenv("BENCHMARK_INGEST_SOURCE_CONNSTR")),
"PGCOPYDB_TARGET_PGURI": cast("str", os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR")),
"PGOPTIONS": "-c idle_in_transaction_session_timeout=0 -c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7",
}
# Combine the current environment with custom variables
env = os.environ.copy()
env.update(custom_env_vars)
with log_file_path.open("w") as log_file:
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env
)
assert process.stdout is not None, "process.stdout should not be None"
# Stream output to both log file and console
for line in process.stdout:
print(line, end="") # Stream to GitHub Actions log
sys.stdout.flush()
log_file.write(line) # Write to log file
process.wait() # Wait for the process to finish
if process.returncode != 0:
raise subprocess.CalledProcessError(process.returncode, command)
def parse_log_and_report_metrics(
zenbenchmark: NeonBenchmarker, log_file_path: Path, backpressure_time_diff: float
):
"""Parses the pgcopydb log file for performance metrics and reports them to the database."""
metrics = {"backpressure_time": backpressure_time_diff}
# Define regex patterns to capture metrics
metric_patterns = {
"COPY_INDEX_CONSTRAINTS_VACUUM": re.compile(
r"COPY, INDEX, CONSTRAINTS, VACUUM \(wall clock\).*"
),
"COPY_CUMULATIVE": re.compile(r"COPY \(cumulative\).*"),
"CREATE_INDEX_CUMULATIVE": re.compile(r"CREATE INDEX \(cumulative\).*"),
"CONSTRAINTS_CUMULATIVE": re.compile(r"CONSTRAINTS \(cumulative\).*"),
"FINALIZE_SCHEMA": re.compile(r"Finalize Schema.*"),
"TOTAL_DURATION": re.compile(r"Total Wall Clock Duration.*"),
}
# Parse log file
with log_file_path.open("r") as log_file:
for line in log_file:
for metric_name, pattern in metric_patterns.items():
if pattern.search(line):
# Extract duration and convert it to seconds
duration_match = re.search(r"\d+h\d+m|\d+m\d+s|\d+s|\d+ms|\d+\.\d+s", line)
if duration_match:
duration_str = duration_match.group(0)
parts = re.findall(r"\d+[a-zA-Z]+", duration_str)
rust_like_humantime = " ".join(parts)
duration_seconds = humantime_to_ms(rust_like_humantime) / 1000.0
metrics[metric_name] = duration_seconds
endpoint_id = {"endpoint_id": get_endpoint_id()}
for metric_name, duration_seconds in metrics.items():
zenbenchmark.record(
metric_name, duration_seconds, "s", MetricReport.LOWER_IS_BETTER, endpoint_id
)
def get_endpoint_id():
"""Extracts and returns the first segment of the hostname from the PostgreSQL URI stored in BENCHMARK_INGEST_TARGET_CONNSTR."""
connstr = os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR")
if connstr is None:
raise OSError("BENCHMARK_INGEST_TARGET_CONNSTR environment variable is not set.")
# Parse the URI
parsed_url = urlparse(connstr)
# Extract the hostname and split to get the first segment
hostname = parsed_url.hostname
if hostname is None:
raise ValueError("Unable to parse hostname from BENCHMARK_INGEST_TARGET_CONNSTR")
# Split the hostname by dots and take the first segment
endpoint_id = hostname.split(".")[0]
return endpoint_id
@pytest.fixture() # must be function scoped because test_output_dir is function scoped
def log_file_path(test_output_dir):
"""Fixture to provide a temporary log file path."""
if not os.getenv("TARGET_PROJECT_TYPE"):
raise OSError("Required environment variable 'TARGET_PROJECT_TYPE' is not set.")
return (test_output_dir / os.getenv("TARGET_PROJECT_TYPE")).with_suffix(".log")
@pytest.mark.remote_cluster
def test_ingest_performance_using_pgcopydb(
zenbenchmark: NeonBenchmarker,
log_file_path: Path,
pgcopydb_filter_file: Path,
test_output_dir: Path,
):
"""
Simulate project migration from another PostgreSQL provider to Neon.
Measure performance for Neon ingest steps
- COPY
- CREATE INDEX
- CREATE CONSTRAINT
- VACUUM ANALYZE
- create foreign keys
Use pgcopydb to copy data from the source database to the destination database.
"""
# Set up environment and create filter file
setup_environment()
# Get backpressure time before ingest
backpressure_time_before = get_backpressure_time(os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR"))
# Build and run the pgcopydb command
command = build_pgcopydb_command(pgcopydb_filter_file, test_output_dir)
try:
run_command_and_log_output(command, log_file_path)
except subprocess.CalledProcessError as e:
pytest.fail(f"pgcopydb command failed with error: {e}")
# Get backpressure time after ingest and calculate the difference
backpressure_time_after = get_backpressure_time(os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR"))
backpressure_time_diff = backpressure_time_after - backpressure_time_before
# Parse log file and report metrics, including backpressure time difference
parse_log_and_report_metrics(zenbenchmark, log_file_path, backpressure_time_diff)