mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-24 14:49:59 +00:00
## Problem `TYPE_CHECKING` is used inconsistently across Python tests. ## Summary of changes - Update `ruff`: 0.7.0 -> 0.11.2 - Enable TC (flake8-type-checking): https://docs.astral.sh/ruff/rules/#flake8-type-checking-tc - (auto)fix all new issues
268 lines
9.5 KiB
Python
268 lines
9.5 KiB
Python
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import textwrap
|
|
from pathlib import Path
|
|
from typing import cast
|
|
from urllib.parse import urlparse
|
|
|
|
import pytest
|
|
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
|
from fixtures.utils import humantime_to_ms
|
|
|
|
|
|
def setup_environment():
|
|
"""Set up necessary environment variables for pgcopydb execution.
|
|
|
|
Expects the following variables to be set in the environment:
|
|
- PG_CONFIG: e.g. /tmp/neon/pg_install/v16/bin/pg_config
|
|
- PSQL: e.g. /tmp/neon/pg_install/v16/bin/psql
|
|
- PG_16_LIB_PATH: e.g. /tmp/neon/pg_install/v16/lib
|
|
- PGCOPYDB: e.g. /pgcopydb/bin/pgcopydb
|
|
- PGCOPYDB_LIB_PATH: e.g. /pgcopydb/lib
|
|
- BENCHMARK_INGEST_SOURCE_CONNSTR
|
|
- BENCHMARK_INGEST_TARGET_CONNSTR
|
|
- PERF_TEST_RESULT_CONNSTR
|
|
- TARGET_PROJECT_TYPE
|
|
|
|
"""
|
|
# Ensure required environment variables are set
|
|
required_env_vars = [
|
|
"PGCOPYDB",
|
|
"PGCOPYDB_LIB_PATH",
|
|
"PG_CONFIG",
|
|
"PSQL",
|
|
"PG_16_LIB_PATH",
|
|
"BENCHMARK_INGEST_SOURCE_CONNSTR",
|
|
"BENCHMARK_INGEST_TARGET_CONNSTR",
|
|
"PERF_TEST_RESULT_CONNSTR",
|
|
"TARGET_PROJECT_TYPE",
|
|
]
|
|
for var in required_env_vars:
|
|
if not os.getenv(var):
|
|
raise OSError(f"Required environment variable '{var}' is not set.")
|
|
|
|
|
|
def build_pgcopydb_command(pgcopydb_filter_file: Path, test_output_dir: Path):
|
|
"""Builds the pgcopydb command to execute using existing environment variables."""
|
|
pgcopydb_executable = os.getenv("PGCOPYDB")
|
|
if not pgcopydb_executable:
|
|
raise OSError("PGCOPYDB environment variable is not set.")
|
|
|
|
return [
|
|
pgcopydb_executable,
|
|
"clone",
|
|
"--dir",
|
|
str(test_output_dir),
|
|
"--skip-vacuum",
|
|
"--no-owner",
|
|
"--no-acl",
|
|
"--skip-db-properties",
|
|
"--table-jobs",
|
|
"4",
|
|
"--index-jobs",
|
|
"4",
|
|
"--restore-jobs",
|
|
"4",
|
|
"--split-tables-larger-than",
|
|
"10GB",
|
|
"--skip-extensions",
|
|
"--use-copy-binary",
|
|
"--filters",
|
|
str(pgcopydb_filter_file),
|
|
]
|
|
|
|
|
|
@pytest.fixture() # must be function scoped because test_output_dir is function scoped
|
|
def pgcopydb_filter_file(test_output_dir: Path) -> Path:
|
|
"""Creates the pgcopydb_filter.txt file required by pgcopydb."""
|
|
filter_content = textwrap.dedent("""\
|
|
[include-only-table]
|
|
public.events
|
|
public.emails
|
|
public.email_transmissions
|
|
public.payments
|
|
public.editions
|
|
public.edition_modules
|
|
public.sp_content
|
|
public.email_broadcasts
|
|
public.user_collections
|
|
public.devices
|
|
public.user_accounts
|
|
public.lessons
|
|
public.lesson_users
|
|
public.payment_methods
|
|
public.orders
|
|
public.course_emails
|
|
public.modules
|
|
public.users
|
|
public.module_users
|
|
public.courses
|
|
public.payment_gateway_keys
|
|
public.accounts
|
|
public.roles
|
|
public.payment_gateways
|
|
public.management
|
|
public.event_names
|
|
""")
|
|
filter_path = test_output_dir / "pgcopydb_filter.txt"
|
|
filter_path.write_text(filter_content)
|
|
return filter_path
|
|
|
|
|
|
def get_backpressure_time(connstr):
|
|
"""Executes a query to get the backpressure throttling time in seconds."""
|
|
query = "select backpressure_throttling_time()/1000000;"
|
|
psql_path = os.getenv("PSQL")
|
|
if psql_path is None:
|
|
raise OSError("The PSQL environment variable is not set.")
|
|
result = subprocess.run(
|
|
[psql_path, connstr, "-t", "-c", query], capture_output=True, text=True, check=True
|
|
)
|
|
return float(result.stdout.strip())
|
|
|
|
|
|
def run_command_and_log_output(command, log_file_path: Path):
|
|
"""
|
|
Runs a command and logs output to both a file and GitHub Actions console.
|
|
|
|
Args:
|
|
command (list): The command to execute.
|
|
log_file_path (Path): Path object for the log file where output is written.
|
|
"""
|
|
# Define a list of necessary environment variables for pgcopydb
|
|
custom_env_vars = {
|
|
"LD_LIBRARY_PATH": f"{os.getenv('PGCOPYDB_LIB_PATH')}:{os.getenv('PG_16_LIB_PATH')}",
|
|
"PGCOPYDB_SOURCE_PGURI": cast("str", os.getenv("BENCHMARK_INGEST_SOURCE_CONNSTR")),
|
|
"PGCOPYDB_TARGET_PGURI": cast("str", os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR")),
|
|
"PGOPTIONS": "-c idle_in_transaction_session_timeout=0 -c maintenance_work_mem=8388608 -c max_parallel_maintenance_workers=7",
|
|
}
|
|
# Combine the current environment with custom variables
|
|
env = os.environ.copy()
|
|
env.update(custom_env_vars)
|
|
|
|
with log_file_path.open("w") as log_file:
|
|
process = subprocess.Popen(
|
|
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env
|
|
)
|
|
|
|
assert process.stdout is not None, "process.stdout should not be None"
|
|
|
|
# Stream output to both log file and console
|
|
for line in process.stdout:
|
|
print(line, end="") # Stream to GitHub Actions log
|
|
sys.stdout.flush()
|
|
log_file.write(line) # Write to log file
|
|
|
|
process.wait() # Wait for the process to finish
|
|
if process.returncode != 0:
|
|
raise subprocess.CalledProcessError(process.returncode, command)
|
|
|
|
|
|
def parse_log_and_report_metrics(
|
|
zenbenchmark: NeonBenchmarker, log_file_path: Path, backpressure_time_diff: float
|
|
):
|
|
"""Parses the pgcopydb log file for performance metrics and reports them to the database."""
|
|
metrics = {"backpressure_time": backpressure_time_diff}
|
|
|
|
# Define regex patterns to capture metrics
|
|
metric_patterns = {
|
|
"COPY_INDEX_CONSTRAINTS_VACUUM": re.compile(
|
|
r"COPY, INDEX, CONSTRAINTS, VACUUM \(wall clock\).*"
|
|
),
|
|
"COPY_CUMULATIVE": re.compile(r"COPY \(cumulative\).*"),
|
|
"CREATE_INDEX_CUMULATIVE": re.compile(r"CREATE INDEX \(cumulative\).*"),
|
|
"CONSTRAINTS_CUMULATIVE": re.compile(r"CONSTRAINTS \(cumulative\).*"),
|
|
"FINALIZE_SCHEMA": re.compile(r"Finalize Schema.*"),
|
|
"TOTAL_DURATION": re.compile(r"Total Wall Clock Duration.*"),
|
|
}
|
|
|
|
# Parse log file
|
|
with log_file_path.open("r") as log_file:
|
|
for line in log_file:
|
|
for metric_name, pattern in metric_patterns.items():
|
|
if pattern.search(line):
|
|
# Extract duration and convert it to seconds
|
|
duration_match = re.search(r"\d+h\d+m|\d+m\d+s|\d+s|\d+ms|\d+\.\d+s", line)
|
|
if duration_match:
|
|
duration_str = duration_match.group(0)
|
|
parts = re.findall(r"\d+[a-zA-Z]+", duration_str)
|
|
rust_like_humantime = " ".join(parts)
|
|
duration_seconds = humantime_to_ms(rust_like_humantime) / 1000.0
|
|
metrics[metric_name] = duration_seconds
|
|
|
|
endpoint_id = {"endpoint_id": get_endpoint_id()}
|
|
for metric_name, duration_seconds in metrics.items():
|
|
zenbenchmark.record(
|
|
metric_name, duration_seconds, "s", MetricReport.LOWER_IS_BETTER, endpoint_id
|
|
)
|
|
|
|
|
|
def get_endpoint_id():
|
|
"""Extracts and returns the first segment of the hostname from the PostgreSQL URI stored in BENCHMARK_INGEST_TARGET_CONNSTR."""
|
|
connstr = os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR")
|
|
if connstr is None:
|
|
raise OSError("BENCHMARK_INGEST_TARGET_CONNSTR environment variable is not set.")
|
|
|
|
# Parse the URI
|
|
parsed_url = urlparse(connstr)
|
|
|
|
# Extract the hostname and split to get the first segment
|
|
hostname = parsed_url.hostname
|
|
if hostname is None:
|
|
raise ValueError("Unable to parse hostname from BENCHMARK_INGEST_TARGET_CONNSTR")
|
|
|
|
# Split the hostname by dots and take the first segment
|
|
endpoint_id = hostname.split(".")[0]
|
|
|
|
return endpoint_id
|
|
|
|
|
|
@pytest.fixture() # must be function scoped because test_output_dir is function scoped
|
|
def log_file_path(test_output_dir):
|
|
"""Fixture to provide a temporary log file path."""
|
|
if not os.getenv("TARGET_PROJECT_TYPE"):
|
|
raise OSError("Required environment variable 'TARGET_PROJECT_TYPE' is not set.")
|
|
return (test_output_dir / os.getenv("TARGET_PROJECT_TYPE")).with_suffix(".log")
|
|
|
|
|
|
@pytest.mark.remote_cluster
|
|
def test_ingest_performance_using_pgcopydb(
|
|
zenbenchmark: NeonBenchmarker,
|
|
log_file_path: Path,
|
|
pgcopydb_filter_file: Path,
|
|
test_output_dir: Path,
|
|
):
|
|
"""
|
|
Simulate project migration from another PostgreSQL provider to Neon.
|
|
|
|
Measure performance for Neon ingest steps
|
|
- COPY
|
|
- CREATE INDEX
|
|
- CREATE CONSTRAINT
|
|
- VACUUM ANALYZE
|
|
- create foreign keys
|
|
|
|
Use pgcopydb to copy data from the source database to the destination database.
|
|
"""
|
|
# Set up environment and create filter file
|
|
setup_environment()
|
|
|
|
# Get backpressure time before ingest
|
|
backpressure_time_before = get_backpressure_time(os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR"))
|
|
|
|
# Build and run the pgcopydb command
|
|
command = build_pgcopydb_command(pgcopydb_filter_file, test_output_dir)
|
|
try:
|
|
run_command_and_log_output(command, log_file_path)
|
|
except subprocess.CalledProcessError as e:
|
|
pytest.fail(f"pgcopydb command failed with error: {e}")
|
|
|
|
# Get backpressure time after ingest and calculate the difference
|
|
backpressure_time_after = get_backpressure_time(os.getenv("BENCHMARK_INGEST_TARGET_CONNSTR"))
|
|
backpressure_time_diff = backpressure_time_after - backpressure_time_before
|
|
|
|
# Parse log file and report metrics, including backpressure time difference
|
|
parse_log_and_report_metrics(zenbenchmark, log_file_path, backpressure_time_diff)
|