TPC-C like periodic benchmark using benchbase (#12665)

## Problem

We don't have a well-documented, periodic benchmark for TPC-C like OLTP
workload.

## Summary of changes

# Benchbase TPC-C-like Performance Results

Runs TPC-C-like benchmarks on Neon databases using
[Benchbase](https://github.com/cmu-db/benchbase).
Docker images are built
[here](https://github.com/neondatabase-labs/benchbase-docker-images)

We run the benchmarks at different scale factors aligned with different
compute sizes we offer to customers.
For each scale factor, we determine a max rate (see Throughput in warmup
phase) and then run the benchmark at a target rate of approx. 70 % of
the max rate.
We use different warehouse sizes which determine the working set size -
it is optimized for LFC size of the respected pricing tier.
Usually we should get LFC hit rates above 70 % for this setup and quite
good, consistent (non-flaky) latencies.

## Expected performance as of first testing this

| Tier | CU | Warehouses | Terminals | Max TPS | LFC size | Working set
size | LFC hit rate | Median latency | p95 latency |

|------------|------------|---------------|-----------|---------|----------|------------------|--------------|----------------|-------------|
| free | 0.25-2 | 50 - 5 GB | 150 | 800 | 5 GB | 6.3 GB | 95 % | 170 ms
| 600 ms |
| serverless | 2-8 | 500 - 50 GB | 230 | 2000 | 26 GB | ?? GB | 91 % |
50 ms | 200 ms |
| business | 2-16 | 1000 - 100 GB | 330 | 2900 | 51 GB | 50 GB | 72 % |
40 ms | 180 ms |

Each run 
- first loads the database (not shown in the dashboard). 
- Then we run a warmup phase for 20 minutes to warm up the database and
the LFC at unlimited target rate (max rate) (highest throughput but
flaky latencies).
The warmup phase can be used to determine the max rate and adjust it in
the github workflow in case Neon is faster in the future.
- Then we run the benchmark at a target rate of approx. 70 % of the max
rate for 1 hour (expecting consistent latencies and throughput).

## Important notes on implementation:
- we want to eventually publish the process how to reproduce these
benchmarks
- thus we want to reduce all dependencies necessary to run the
benchmark, the only thing needed are
   - docker
   - the docker images referenced above for benchbase
- python >= 3.9 to run some config generation steps and create diagrams
- to reduce dependencies we deliberatly do NOT use some of our python
fixture test infrastructure to make the dependency chain really small -
so pls don't add a review comment "should reuse fixture xy"
- we also upload all generator python scripts, generated bash shell
scripts and configs as well as raw results to S3 bucket that we later
want to publish once this benchmark is reviewed and approved.
This commit is contained in:
Peter Bendel
2025-07-24 18:26:54 +02:00
committed by GitHub
parent 94b41b531b
commit f391186aa7
4 changed files with 1466 additions and 0 deletions

384
.github/workflows/benchbase_tpcc.yml vendored Normal file
View File

@@ -0,0 +1,384 @@
name: TPC-C like benchmark using benchbase
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 6 * * *' # run once a day at 6 AM UTC
workflow_dispatch: # adds ability to run this manually
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow globally because we do not want to be too noisy in production environment
group: benchbase-tpcc-workflow
cancel-in-progress: false
permissions:
contents: read
jobs:
benchbase-tpcc:
strategy:
fail-fast: false # allow other variants to continue even if one fails
matrix:
include:
- warehouses: 50 # defines number of warehouses and is used to compute number of terminals
max_rate: 800 # measured max TPS at scale factor based on experiments. Adjust if performance is better/worse
min_cu: 0.25 # simulate free tier plan (0.25 -2 CU)
max_cu: 2
- warehouses: 500 # serverless plan (2-8 CU)
max_rate: 2000
min_cu: 2
max_cu: 8
- warehouses: 1000 # business plan (2-16 CU)
max_rate: 2900
min_cu: 2
max_cu: 16
max-parallel: 1 # we want to run each workload size sequentially to avoid noisy neighbors
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
PG_CONFIG: /tmp/neon/pg_install/v17/bin/pg_config
PSQL: /tmp/neon/pg_install/v17/bin/psql
PG_17_LIB_PATH: /tmp/neon/pg_install/v17/lib
POSTGRES_VERSION: 17
runs-on: [ self-hosted, us-east-2, x64 ]
timeout-minutes: 1440
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials # necessary to download artefacts
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours is currently max associated with IAM role
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Create Neon Project
id: create-neon-project-tpcc
uses: ./.github/actions/neon-project-create
with:
region_id: aws-us-east-2
postgres_version: ${{ env.POSTGRES_VERSION }}
compute_units: '[${{ matrix.min_cu }}, ${{ matrix.max_cu }}]'
api_key: ${{ secrets.NEON_PRODUCTION_API_KEY_4_BENCHMARKS }}
api_host: console.neon.tech # production (!)
- name: Initialize Neon project
env:
BENCHMARK_TPCC_CONNSTR: ${{ steps.create-neon-project-tpcc.outputs.dsn }}
PROJECT_ID: ${{ steps.create-neon-project-tpcc.outputs.project_id }}
run: |
echo "Initializing Neon project with project_id: ${PROJECT_ID}"
export LD_LIBRARY_PATH=${PG_17_LIB_PATH}
# Retry logic for psql connection with 1 minute sleep between attempts
for attempt in {1..3}; do
echo "Attempt ${attempt}/3: Creating extensions in Neon project"
if ${PSQL} "${BENCHMARK_TPCC_CONNSTR}" -c "CREATE EXTENSION IF NOT EXISTS neon; CREATE EXTENSION IF NOT EXISTS neon_utils;"; then
echo "Successfully created extensions"
break
else
echo "Failed to create extensions on attempt ${attempt}"
if [ ${attempt} -lt 3 ]; then
echo "Waiting 60 seconds before retry..."
sleep 60
else
echo "All attempts failed, exiting"
exit 1
fi
fi
done
echo "BENCHMARK_TPCC_CONNSTR=${BENCHMARK_TPCC_CONNSTR}" >> $GITHUB_ENV
- name: Generate BenchBase workload configuration
env:
WAREHOUSES: ${{ matrix.warehouses }}
MAX_RATE: ${{ matrix.max_rate }}
run: |
echo "Generating BenchBase configs for warehouses: ${WAREHOUSES}, max_rate: ${MAX_RATE}"
# Extract hostname and password from connection string
# Format: postgresql://username:password@hostname/database?params (no port for Neon)
HOSTNAME=$(echo "${BENCHMARK_TPCC_CONNSTR}" | sed -n 's|.*://[^:]*:[^@]*@\([^/]*\)/.*|\1|p')
PASSWORD=$(echo "${BENCHMARK_TPCC_CONNSTR}" | sed -n 's|.*://[^:]*:\([^@]*\)@.*|\1|p')
echo "Extracted hostname: ${HOSTNAME}"
# Use runner temp (NVMe) as working directory
cd "${RUNNER_TEMP}"
# Copy the generator script
cp "${GITHUB_WORKSPACE}/test_runner/performance/benchbase_tpc_c_helpers/generate_workload_size.py" .
# Generate configs and scripts
python3 generate_workload_size.py \
--warehouses ${WAREHOUSES} \
--max-rate ${MAX_RATE} \
--hostname ${HOSTNAME} \
--password ${PASSWORD} \
--runner-arch ${{ runner.arch }}
# Fix path mismatch: move generated configs and scripts to expected locations
mv ../configs ./configs
mv ../scripts ./scripts
- name: Prepare database (load data)
env:
WAREHOUSES: ${{ matrix.warehouses }}
run: |
cd "${RUNNER_TEMP}"
echo "Loading ${WAREHOUSES} warehouses into database..."
# Run the loader script and capture output to log file while preserving stdout/stderr
./scripts/load_${WAREHOUSES}_warehouses.sh 2>&1 | tee "load_${WAREHOUSES}_warehouses.log"
echo "Database loading completed"
- name: Run TPC-C benchmark (warmup phase, then benchmark at 70% of configuredmax TPS)
env:
WAREHOUSES: ${{ matrix.warehouses }}
run: |
cd "${RUNNER_TEMP}"
echo "Running TPC-C benchmark with ${WAREHOUSES} warehouses..."
# Run the optimal rate benchmark
./scripts/execute_${WAREHOUSES}_warehouses_opt_rate.sh
echo "Benchmark execution completed"
- name: Run TPC-C benchmark (warmup phase, then ramp down TPS and up again in 5 minute intervals)
env:
WAREHOUSES: ${{ matrix.warehouses }}
run: |
cd "${RUNNER_TEMP}"
echo "Running TPC-C ramp-down-up with ${WAREHOUSES} warehouses..."
# Run the optimal rate benchmark
./scripts/execute_${WAREHOUSES}_warehouses_ramp_up.sh
echo "Benchmark execution completed"
- name: Process results (upload to test results database and generate diagrams)
env:
WAREHOUSES: ${{ matrix.warehouses }}
MIN_CU: ${{ matrix.min_cu }}
MAX_CU: ${{ matrix.max_cu }}
PROJECT_ID: ${{ steps.create-neon-project-tpcc.outputs.project_id }}
REVISION: ${{ github.sha }}
PERF_DB_CONNSTR: ${{ secrets.PERF_TEST_RESULT_CONNSTR }}
run: |
cd "${RUNNER_TEMP}"
echo "Creating temporary Python environment for results processing..."
# Create temporary virtual environment
python3 -m venv temp_results_env
source temp_results_env/bin/activate
# Install required packages in virtual environment
pip install matplotlib pandas psycopg2-binary
echo "Copying results processing scripts..."
# Copy both processing scripts
cp "${GITHUB_WORKSPACE}/test_runner/performance/benchbase_tpc_c_helpers/generate_diagrams.py" .
cp "${GITHUB_WORKSPACE}/test_runner/performance/benchbase_tpc_c_helpers/upload_results_to_perf_test_results.py" .
echo "Processing load phase metrics..."
# Find and process load log
LOAD_LOG=$(find . -name "load_${WAREHOUSES}_warehouses.log" -type f | head -1)
if [ -n "$LOAD_LOG" ]; then
echo "Processing load metrics from: $LOAD_LOG"
python upload_results_to_perf_test_results.py \
--load-log "$LOAD_LOG" \
--run-type "load" \
--warehouses "${WAREHOUSES}" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Load log file not found: load_${WAREHOUSES}_warehouses.log"
fi
echo "Processing warmup results for optimal rate..."
# Find and process warmup results
WARMUP_CSV=$(find results_warmup -name "*.results.csv" -type f | head -1)
WARMUP_JSON=$(find results_warmup -name "*.summary.json" -type f | head -1)
if [ -n "$WARMUP_CSV" ] && [ -n "$WARMUP_JSON" ]; then
echo "Generating warmup diagram from: $WARMUP_CSV"
python generate_diagrams.py \
--input-csv "$WARMUP_CSV" \
--output-svg "warmup_${WAREHOUSES}_warehouses_performance.svg" \
--title-suffix "Warmup at max TPS"
echo "Uploading warmup metrics from: $WARMUP_JSON"
python upload_results_to_perf_test_results.py \
--summary-json "$WARMUP_JSON" \
--results-csv "$WARMUP_CSV" \
--run-type "warmup" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Missing warmup results files (CSV: $WARMUP_CSV, JSON: $WARMUP_JSON)"
fi
echo "Processing optimal rate results..."
# Find and process optimal rate results
OPTRATE_CSV=$(find results_opt_rate -name "*.results.csv" -type f | head -1)
OPTRATE_JSON=$(find results_opt_rate -name "*.summary.json" -type f | head -1)
if [ -n "$OPTRATE_CSV" ] && [ -n "$OPTRATE_JSON" ]; then
echo "Generating optimal rate diagram from: $OPTRATE_CSV"
python generate_diagrams.py \
--input-csv "$OPTRATE_CSV" \
--output-svg "benchmark_${WAREHOUSES}_warehouses_performance.svg" \
--title-suffix "70% of max TPS"
echo "Uploading optimal rate metrics from: $OPTRATE_JSON"
python upload_results_to_perf_test_results.py \
--summary-json "$OPTRATE_JSON" \
--results-csv "$OPTRATE_CSV" \
--run-type "opt-rate" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Missing optimal rate results files (CSV: $OPTRATE_CSV, JSON: $OPTRATE_JSON)"
fi
echo "Processing warmup 2 results for ramp down/up phase..."
# Find and process warmup results
WARMUP_CSV=$(find results_warmup -name "*.results.csv" -type f | tail -1)
WARMUP_JSON=$(find results_warmup -name "*.summary.json" -type f | tail -1)
if [ -n "$WARMUP_CSV" ] && [ -n "$WARMUP_JSON" ]; then
echo "Generating warmup diagram from: $WARMUP_CSV"
python generate_diagrams.py \
--input-csv "$WARMUP_CSV" \
--output-svg "warmup_2_${WAREHOUSES}_warehouses_performance.svg" \
--title-suffix "Warmup at max TPS"
echo "Uploading warmup metrics from: $WARMUP_JSON"
python upload_results_to_perf_test_results.py \
--summary-json "$WARMUP_JSON" \
--results-csv "$WARMUP_CSV" \
--run-type "warmup" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Missing warmup results files (CSV: $WARMUP_CSV, JSON: $WARMUP_JSON)"
fi
echo "Processing ramp results..."
# Find and process ramp results
RAMPUP_CSV=$(find results_ramp_up -name "*.results.csv" -type f | head -1)
RAMPUP_JSON=$(find results_ramp_up -name "*.summary.json" -type f | head -1)
if [ -n "$RAMPUP_CSV" ] && [ -n "$RAMPUP_JSON" ]; then
echo "Generating ramp diagram from: $RAMPUP_CSV"
python generate_diagrams.py \
--input-csv "$RAMPUP_CSV" \
--output-svg "ramp_${WAREHOUSES}_warehouses_performance.svg" \
--title-suffix "ramp TPS down and up in 5 minute intervals"
echo "Uploading ramp metrics from: $RAMPUP_JSON"
python upload_results_to_perf_test_results.py \
--summary-json "$RAMPUP_JSON" \
--results-csv "$RAMPUP_CSV" \
--run-type "ramp-up" \
--min-cu "${MIN_CU}" \
--max-cu "${MAX_CU}" \
--project-id "${PROJECT_ID}" \
--revision "${REVISION}" \
--connection-string "${PERF_DB_CONNSTR}"
else
echo "Warning: Missing ramp results files (CSV: $RAMPUP_CSV, JSON: $RAMPUP_JSON)"
fi
# Deactivate and clean up virtual environment
deactivate
rm -rf temp_results_env
rm upload_results_to_perf_test_results.py
echo "Results processing completed and environment cleaned up"
- name: Set date for upload
id: set-date
run: echo "date=$(date +%Y-%m-%d)" >> $GITHUB_OUTPUT
- name: Configure AWS credentials # necessary to upload results
uses: aws-actions/configure-aws-credentials@e3dd6a429d7300a6a4c196c26e071d42e0343502 # v4.0.2
with:
aws-region: us-east-2
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 900 # 900 is minimum value
- name: Upload benchmark results to S3
env:
S3_BUCKET: neon-public-benchmark-results
S3_PREFIX: benchbase-tpc-c/${{ steps.set-date.outputs.date }}/${{ github.run_id }}/${{ matrix.warehouses }}-warehouses
run: |
echo "Redacting passwords from configuration files before upload..."
# Mask all passwords in XML config files
find "${RUNNER_TEMP}/configs" -name "*.xml" -type f -exec sed -i 's|<password>[^<]*</password>|<password>redacted</password>|g' {} \;
echo "Uploading benchmark results to s3://${S3_BUCKET}/${S3_PREFIX}/"
# Upload the entire benchmark directory recursively
aws s3 cp --only-show-errors --recursive "${RUNNER_TEMP}" s3://${S3_BUCKET}/${S3_PREFIX}/
echo "Upload completed"
- name: Delete Neon Project
if: ${{ always() }}
uses: ./.github/actions/neon-project-delete
with:
project_id: ${{ steps.create-neon-project-tpcc.outputs.project_id }}
api_key: ${{ secrets.NEON_PRODUCTION_API_KEY_4_BENCHMARKS }}
api_host: console.neon.tech # production (!)

View File

@@ -0,0 +1,152 @@
#!/usr/bin/env python3
"""
Generate TPS and latency charts from BenchBase TPC-C results CSV files.
This script reads a CSV file containing BenchBase results and generates two charts:
1. TPS (requests per second) over time
2. P95 and P99 latencies over time
Both charts are combined in a single SVG file.
"""
import argparse
import sys
from pathlib import Path
import matplotlib.pyplot as plt # type: ignore[import-not-found]
import pandas as pd # type: ignore[import-untyped]
def load_results_csv(csv_file_path):
"""Load BenchBase results CSV file into a pandas DataFrame."""
try:
df = pd.read_csv(csv_file_path)
# Validate required columns exist
required_columns = [
"Time (seconds)",
"Throughput (requests/second)",
"95th Percentile Latency (millisecond)",
"99th Percentile Latency (millisecond)",
]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"Error: Missing required columns: {missing_columns}")
sys.exit(1)
return df
except FileNotFoundError:
print(f"Error: CSV file not found: {csv_file_path}")
sys.exit(1)
except pd.errors.EmptyDataError:
print(f"Error: CSV file is empty: {csv_file_path}")
sys.exit(1)
except Exception as e:
print(f"Error reading CSV file: {e}")
sys.exit(1)
def generate_charts(df, input_filename, output_svg_path, title_suffix=None):
"""Generate combined TPS and latency charts and save as SVG."""
# Get the filename without extension for chart titles
file_label = Path(input_filename).stem
# Build title ending with optional suffix
if title_suffix:
title_ending = f"{title_suffix} - {file_label}"
else:
title_ending = file_label
# Create figure with two subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
# Chart 1: Time vs TPS
ax1.plot(
df["Time (seconds)"],
df["Throughput (requests/second)"],
linewidth=1,
color="blue",
alpha=0.7,
)
ax1.set_xlabel("Time (seconds)")
ax1.set_ylabel("TPS (Requests Per Second)")
ax1.set_title(f"Benchbase TPC-C Like Throughput (TPS) - {title_ending}")
ax1.grid(True, alpha=0.3)
ax1.set_xlim(0, df["Time (seconds)"].max())
# Chart 2: Time vs P95 and P99 Latencies
ax2.plot(
df["Time (seconds)"],
df["95th Percentile Latency (millisecond)"],
linewidth=1,
color="orange",
alpha=0.7,
label="Latency P95",
)
ax2.plot(
df["Time (seconds)"],
df["99th Percentile Latency (millisecond)"],
linewidth=1,
color="red",
alpha=0.7,
label="Latency P99",
)
ax2.set_xlabel("Time (seconds)")
ax2.set_ylabel("Latency (ms)")
ax2.set_title(f"Benchbase TPC-C Like Latency - {title_ending}")
ax2.grid(True, alpha=0.3)
ax2.set_xlim(0, df["Time (seconds)"].max())
ax2.legend()
plt.tight_layout()
# Save as SVG
try:
plt.savefig(output_svg_path, format="svg", dpi=300, bbox_inches="tight")
print(f"Charts saved to: {output_svg_path}")
except Exception as e:
print(f"Error saving SVG file: {e}")
sys.exit(1)
def main():
"""Main function to parse arguments and generate charts."""
parser = argparse.ArgumentParser(
description="Generate TPS and latency charts from BenchBase TPC-C results CSV"
)
parser.add_argument(
"--input-csv", type=str, required=True, help="Path to the input CSV results file"
)
parser.add_argument(
"--output-svg", type=str, required=True, help="Path for the output SVG chart file"
)
parser.add_argument(
"--title-suffix",
type=str,
required=False,
help="Optional suffix to add to chart titles (e.g., 'Warmup', 'Benchmark Phase')",
)
args = parser.parse_args()
# Validate input file exists
if not Path(args.input_csv).exists():
print(f"Error: Input CSV file does not exist: {args.input_csv}")
sys.exit(1)
# Create output directory if it doesn't exist
output_path = Path(args.output_svg)
output_path.parent.mkdir(parents=True, exist_ok=True)
# Load data and generate charts
df = load_results_csv(args.input_csv)
generate_charts(df, args.input_csv, args.output_svg, args.title_suffix)
print(f"Successfully generated charts from {len(df)} data points")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,339 @@
import argparse
import html
import math
import os
import sys
from pathlib import Path
CONFIGS_DIR = Path("../configs")
SCRIPTS_DIR = Path("../scripts")
# Constants
## TODO increase times after testing
WARMUP_TIME_SECONDS = 1200 # 20 minutes
BENCHMARK_TIME_SECONDS = 3600 # 1 hour
RAMP_STEP_TIME_SECONDS = 300 # 5 minutes
BASE_TERMINALS = 130
TERMINALS_PER_WAREHOUSE = 0.2
OPTIMAL_RATE_FACTOR = 0.7 # 70% of max rate
BATCH_SIZE = 1000
LOADER_THREADS = 4
TRANSACTION_WEIGHTS = "45,43,4,4,4" # NewOrder, Payment, OrderStatus, Delivery, StockLevel
# Ramp-up rate multipliers
RAMP_RATE_FACTORS = [1.5, 1.1, 0.9, 0.7, 0.6, 0.4, 0.6, 0.7, 0.9, 1.1]
# Templates for XML configs
WARMUP_XML = """<?xml version="1.0"?>
<parameters>
<type>POSTGRES</type>
<driver>org.postgresql.Driver</driver>
<url>jdbc:postgresql://{hostname}/neondb?sslmode=require&amp;ApplicationName=tpcc&amp;reWriteBatchedInserts=true</url>
<username>neondb_owner</username>
<password>{password}</password>
<reconnectOnConnectionFailure>true</reconnectOnConnectionFailure>
<isolation>TRANSACTION_READ_COMMITTED</isolation>
<batchsize>{batch_size}</batchsize>
<scalefactor>{warehouses}</scalefactor>
<loaderThreads>0</loaderThreads>
<terminals>{terminals}</terminals>
<works>
<work>
<time>{warmup_time}</time>
<weights>{transaction_weights}</weights>
<rate>unlimited</rate>
<arrival>POISSON</arrival>
<distribution>ZIPFIAN</distribution>
</work>
</works>
<transactiontypes>
<transactiontype><name>NewOrder</name></transactiontype>
<transactiontype><name>Payment</name></transactiontype>
<transactiontype><name>OrderStatus</name></transactiontype>
<transactiontype><name>Delivery</name></transactiontype>
<transactiontype><name>StockLevel</name></transactiontype>
</transactiontypes>
</parameters>
"""
MAX_RATE_XML = """<?xml version="1.0"?>
<parameters>
<type>POSTGRES</type>
<driver>org.postgresql.Driver</driver>
<url>jdbc:postgresql://{hostname}/neondb?sslmode=require&amp;ApplicationName=tpcc&amp;reWriteBatchedInserts=true</url>
<username>neondb_owner</username>
<password>{password}</password>
<reconnectOnConnectionFailure>true</reconnectOnConnectionFailure>
<isolation>TRANSACTION_READ_COMMITTED</isolation>
<batchsize>{batch_size}</batchsize>
<scalefactor>{warehouses}</scalefactor>
<loaderThreads>0</loaderThreads>
<terminals>{terminals}</terminals>
<works>
<work>
<time>{benchmark_time}</time>
<weights>{transaction_weights}</weights>
<rate>unlimited</rate>
<arrival>POISSON</arrival>
<distribution>ZIPFIAN</distribution>
</work>
</works>
<transactiontypes>
<transactiontype><name>NewOrder</name></transactiontype>
<transactiontype><name>Payment</name></transactiontype>
<transactiontype><name>OrderStatus</name></transactiontype>
<transactiontype><name>Delivery</name></transactiontype>
<transactiontype><name>StockLevel</name></transactiontype>
</transactiontypes>
</parameters>
"""
OPT_RATE_XML = """<?xml version="1.0"?>
<parameters>
<type>POSTGRES</type>
<driver>org.postgresql.Driver</driver>
<url>jdbc:postgresql://{hostname}/neondb?sslmode=require&amp;ApplicationName=tpcc&amp;reWriteBatchedInserts=true</url>
<username>neondb_owner</username>
<password>{password}</password>
<reconnectOnConnectionFailure>true</reconnectOnConnectionFailure>
<isolation>TRANSACTION_READ_COMMITTED</isolation>
<batchsize>{batch_size}</batchsize>
<scalefactor>{warehouses}</scalefactor>
<loaderThreads>0</loaderThreads>
<terminals>{terminals}</terminals>
<works>
<work>
<time>{benchmark_time}</time>
<rate>{opt_rate}</rate>
<weights>{transaction_weights}</weights>
<arrival>POISSON</arrival>
<distribution>ZIPFIAN</distribution>
</work>
</works>
<transactiontypes>
<transactiontype><name>NewOrder</name></transactiontype>
<transactiontype><name>Payment</name></transactiontype>
<transactiontype><name>OrderStatus</name></transactiontype>
<transactiontype><name>Delivery</name></transactiontype>
<transactiontype><name>StockLevel</name></transactiontype>
</transactiontypes>
</parameters>
"""
RAMP_UP_XML = """<?xml version="1.0"?>
<parameters>
<type>POSTGRES</type>
<driver>org.postgresql.Driver</driver>
<url>jdbc:postgresql://{hostname}/neondb?sslmode=require&amp;ApplicationName=tpcc&amp;reWriteBatchedInserts=true</url>
<username>neondb_owner</username>
<password>{password}</password>
<reconnectOnConnectionFailure>true</reconnectOnConnectionFailure>
<isolation>TRANSACTION_READ_COMMITTED</isolation>
<batchsize>{batch_size}</batchsize>
<scalefactor>{warehouses}</scalefactor>
<loaderThreads>0</loaderThreads>
<terminals>{terminals}</terminals>
<works>
{works}
</works>
<transactiontypes>
<transactiontype><name>NewOrder</name></transactiontype>
<transactiontype><name>Payment</name></transactiontype>
<transactiontype><name>OrderStatus</name></transactiontype>
<transactiontype><name>Delivery</name></transactiontype>
<transactiontype><name>StockLevel</name></transactiontype>
</transactiontypes>
</parameters>
"""
WORK_TEMPLATE = f""" <work>\n <time>{RAMP_STEP_TIME_SECONDS}</time>\n <rate>{{rate}}</rate>\n <weights>{TRANSACTION_WEIGHTS}</weights>\n <arrival>POISSON</arrival>\n <distribution>ZIPFIAN</distribution>\n </work>\n"""
# Templates for shell scripts
EXECUTE_SCRIPT = """# Create results directories
mkdir -p results_warmup
mkdir -p results_{suffix}
chmod 777 results_warmup results_{suffix}
# Run warmup phase
docker run --network=host --rm \
-v $(pwd)/configs:/configs \
-v $(pwd)/results_warmup:/results \
{docker_image}\
-b tpcc \
-c /configs/execute_{warehouses}_warehouses_warmup.xml \
-d /results \
--create=false --load=false --execute=true
# Run benchmark phase
docker run --network=host --rm \
-v $(pwd)/configs:/configs \
-v $(pwd)/results_{suffix}:/results \
{docker_image}\
-b tpcc \
-c /configs/execute_{warehouses}_warehouses_{suffix}.xml \
-d /results \
--create=false --load=false --execute=true\n"""
LOAD_XML = """<?xml version="1.0"?>
<parameters>
<type>POSTGRES</type>
<driver>org.postgresql.Driver</driver>
<url>jdbc:postgresql://{hostname}/neondb?sslmode=require&amp;ApplicationName=tpcc&amp;reWriteBatchedInserts=true</url>
<username>neondb_owner</username>
<password>{password}</password>
<reconnectOnConnectionFailure>true</reconnectOnConnectionFailure>
<isolation>TRANSACTION_READ_COMMITTED</isolation>
<batchsize>{batch_size}</batchsize>
<scalefactor>{warehouses}</scalefactor>
<loaderThreads>{loader_threads}</loaderThreads>
</parameters>
"""
LOAD_SCRIPT = """# Create results directory for loading
mkdir -p results_load
chmod 777 results_load
docker run --network=host --rm \
-v $(pwd)/configs:/configs \
-v $(pwd)/results_load:/results \
{docker_image}\
-b tpcc \
-c /configs/load_{warehouses}_warehouses.xml \
-d /results \
--create=true --load=true --execute=false\n"""
def write_file(path, content):
path.parent.mkdir(parents=True, exist_ok=True)
try:
with open(path, "w") as f:
f.write(content)
except OSError as e:
print(f"Error writing {path}: {e}")
sys.exit(1)
# If it's a shell script, set executable permission
if str(path).endswith(".sh"):
os.chmod(path, 0o755)
def escape_xml_password(password):
"""Escape XML special characters in password."""
return html.escape(password, quote=True)
def get_docker_arch_tag(runner_arch):
"""Map GitHub Actions runner.arch to Docker image architecture tag."""
arch_mapping = {"X64": "amd64", "ARM64": "arm64"}
return arch_mapping.get(runner_arch, "amd64") # Default to amd64
def main():
parser = argparse.ArgumentParser(description="Generate BenchBase workload configs and scripts.")
parser.add_argument("--warehouses", type=int, required=True, help="Number of warehouses")
parser.add_argument("--max-rate", type=int, required=True, help="Max rate (TPS)")
parser.add_argument("--hostname", type=str, required=True, help="Database hostname")
parser.add_argument("--password", type=str, required=True, help="Database password")
parser.add_argument(
"--runner-arch", type=str, required=True, help="GitHub Actions runner architecture"
)
args = parser.parse_args()
warehouses = args.warehouses
max_rate = args.max_rate
hostname = args.hostname
password = args.password
runner_arch = args.runner_arch
# Escape password for safe XML insertion
escaped_password = escape_xml_password(password)
# Get the appropriate Docker architecture tag
docker_arch = get_docker_arch_tag(runner_arch)
docker_image = f"ghcr.io/neondatabase-labs/benchbase-postgres:latest-{docker_arch}"
opt_rate = math.ceil(max_rate * OPTIMAL_RATE_FACTOR)
# Calculate terminals as next rounded integer of 40% of warehouses
terminals = math.ceil(BASE_TERMINALS + warehouses * TERMINALS_PER_WAREHOUSE)
ramp_rates = [math.ceil(max_rate * factor) for factor in RAMP_RATE_FACTORS]
# Write configs
write_file(
CONFIGS_DIR / f"execute_{warehouses}_warehouses_warmup.xml",
WARMUP_XML.format(
warehouses=warehouses,
hostname=hostname,
password=escaped_password,
terminals=terminals,
batch_size=BATCH_SIZE,
warmup_time=WARMUP_TIME_SECONDS,
transaction_weights=TRANSACTION_WEIGHTS,
),
)
write_file(
CONFIGS_DIR / f"execute_{warehouses}_warehouses_max_rate.xml",
MAX_RATE_XML.format(
warehouses=warehouses,
hostname=hostname,
password=escaped_password,
terminals=terminals,
batch_size=BATCH_SIZE,
benchmark_time=BENCHMARK_TIME_SECONDS,
transaction_weights=TRANSACTION_WEIGHTS,
),
)
write_file(
CONFIGS_DIR / f"execute_{warehouses}_warehouses_opt_rate.xml",
OPT_RATE_XML.format(
warehouses=warehouses,
opt_rate=opt_rate,
hostname=hostname,
password=escaped_password,
terminals=terminals,
batch_size=BATCH_SIZE,
benchmark_time=BENCHMARK_TIME_SECONDS,
transaction_weights=TRANSACTION_WEIGHTS,
),
)
ramp_works = "".join([WORK_TEMPLATE.format(rate=rate) for rate in ramp_rates])
write_file(
CONFIGS_DIR / f"execute_{warehouses}_warehouses_ramp_up.xml",
RAMP_UP_XML.format(
warehouses=warehouses,
works=ramp_works,
hostname=hostname,
password=escaped_password,
terminals=terminals,
batch_size=BATCH_SIZE,
),
)
# Loader config
write_file(
CONFIGS_DIR / f"load_{warehouses}_warehouses.xml",
LOAD_XML.format(
warehouses=warehouses,
hostname=hostname,
password=escaped_password,
batch_size=BATCH_SIZE,
loader_threads=LOADER_THREADS,
),
)
# Write scripts
for suffix in ["max_rate", "opt_rate", "ramp_up"]:
script = EXECUTE_SCRIPT.format(
warehouses=warehouses, suffix=suffix, docker_image=docker_image
)
write_file(SCRIPTS_DIR / f"execute_{warehouses}_warehouses_{suffix}.sh", script)
# Loader script
write_file(
SCRIPTS_DIR / f"load_{warehouses}_warehouses.sh",
LOAD_SCRIPT.format(warehouses=warehouses, docker_image=docker_image),
)
print(f"Generated configs and scripts for {warehouses} warehouses and max rate {max_rate}.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,591 @@
#!/usr/bin/env python3
# ruff: noqa
# we exclude the file from ruff because on the github runner we have python 3.9 and ruff
# is running with newer python 3.12 which suggests changes incompatible with python 3.9
"""
Upload BenchBase TPC-C results from summary.json and results.csv files to perf_test_results database.
This script extracts metrics from BenchBase *.summary.json and *.results.csv files and uploads them
to a PostgreSQL database table for performance tracking and analysis.
"""
import argparse
import json
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
import pandas as pd # type: ignore[import-untyped]
import psycopg2
def load_summary_json(json_file_path):
"""Load summary.json file and return parsed data."""
try:
with open(json_file_path) as f:
return json.load(f)
except FileNotFoundError:
print(f"Error: Summary JSON file not found: {json_file_path}")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in file {json_file_path}: {e}")
sys.exit(1)
except Exception as e:
print(f"Error loading JSON file {json_file_path}: {e}")
sys.exit(1)
def get_metric_info(metric_name):
"""Get metric unit and report type for a given metric name."""
metrics_config = {
"Throughput": {"unit": "req/s", "report_type": "higher_is_better"},
"Goodput": {"unit": "req/s", "report_type": "higher_is_better"},
"Measured Requests": {"unit": "requests", "report_type": "higher_is_better"},
"95th Percentile Latency": {"unit": "µs", "report_type": "lower_is_better"},
"Maximum Latency": {"unit": "µs", "report_type": "lower_is_better"},
"Median Latency": {"unit": "µs", "report_type": "lower_is_better"},
"Minimum Latency": {"unit": "µs", "report_type": "lower_is_better"},
"25th Percentile Latency": {"unit": "µs", "report_type": "lower_is_better"},
"90th Percentile Latency": {"unit": "µs", "report_type": "lower_is_better"},
"99th Percentile Latency": {"unit": "µs", "report_type": "lower_is_better"},
"75th Percentile Latency": {"unit": "µs", "report_type": "lower_is_better"},
"Average Latency": {"unit": "µs", "report_type": "lower_is_better"},
}
return metrics_config.get(metric_name, {"unit": "", "report_type": "higher_is_better"})
def extract_metrics(summary_data):
"""Extract relevant metrics from summary JSON data."""
metrics = []
# Direct top-level metrics
direct_metrics = {
"Throughput (requests/second)": "Throughput",
"Goodput (requests/second)": "Goodput",
"Measured Requests": "Measured Requests",
}
for json_key, clean_name in direct_metrics.items():
if json_key in summary_data:
metrics.append((clean_name, summary_data[json_key]))
# Latency metrics from nested "Latency Distribution" object
if "Latency Distribution" in summary_data:
latency_data = summary_data["Latency Distribution"]
latency_metrics = {
"95th Percentile Latency (microseconds)": "95th Percentile Latency",
"Maximum Latency (microseconds)": "Maximum Latency",
"Median Latency (microseconds)": "Median Latency",
"Minimum Latency (microseconds)": "Minimum Latency",
"25th Percentile Latency (microseconds)": "25th Percentile Latency",
"90th Percentile Latency (microseconds)": "90th Percentile Latency",
"99th Percentile Latency (microseconds)": "99th Percentile Latency",
"75th Percentile Latency (microseconds)": "75th Percentile Latency",
"Average Latency (microseconds)": "Average Latency",
}
for json_key, clean_name in latency_metrics.items():
if json_key in latency_data:
metrics.append((clean_name, latency_data[json_key]))
return metrics
def build_labels(summary_data, project_id):
"""Build labels JSON object from summary data and project info."""
labels = {}
# Extract required label keys from summary data
label_keys = [
"DBMS Type",
"DBMS Version",
"Benchmark Type",
"Final State",
"isolation",
"scalefactor",
"terminals",
]
for key in label_keys:
if key in summary_data:
labels[key] = summary_data[key]
# Add project_id from workflow
labels["project_id"] = project_id
return labels
def build_suit_name(scalefactor, terminals, run_type, min_cu, max_cu):
"""Build the suit name according to specification."""
return f"benchbase-tpc-c-{scalefactor}-{terminals}-{run_type}-{min_cu}-{max_cu}"
def convert_timestamp_to_utc(timestamp_ms):
"""Convert millisecond timestamp to PostgreSQL-compatible UTC timestamp."""
try:
dt = datetime.fromtimestamp(timestamp_ms / 1000.0, tz=timezone.utc)
return dt.isoformat()
except (ValueError, TypeError) as e:
print(f"Warning: Could not convert timestamp {timestamp_ms}: {e}")
return datetime.now(timezone.utc).isoformat()
def insert_metrics(conn, metrics_data):
"""Insert metrics data into the perf_test_results table."""
insert_query = """
INSERT INTO perf_test_results
(suit, revision, platform, metric_name, metric_value, metric_unit,
metric_report_type, recorded_at_timestamp, labels)
VALUES (%(suit)s, %(revision)s, %(platform)s, %(metric_name)s, %(metric_value)s,
%(metric_unit)s, %(metric_report_type)s, %(recorded_at_timestamp)s, %(labels)s)
"""
try:
with conn.cursor() as cursor:
cursor.executemany(insert_query, metrics_data)
conn.commit()
print(f"Successfully inserted {len(metrics_data)} metrics into perf_test_results")
# Log some sample data for verification
if metrics_data:
print(
f"Sample metric: {metrics_data[0]['metric_name']} = {metrics_data[0]['metric_value']} {metrics_data[0]['metric_unit']}"
)
except Exception as e:
print(f"Error inserting metrics into database: {e}")
sys.exit(1)
def create_benchbase_results_details_table(conn):
"""Create benchbase_results_details table if it doesn't exist."""
create_table_query = """
CREATE TABLE IF NOT EXISTS benchbase_results_details (
id BIGSERIAL PRIMARY KEY,
suit TEXT,
revision CHAR(40),
platform TEXT,
recorded_at_timestamp TIMESTAMP WITH TIME ZONE,
requests_per_second NUMERIC,
average_latency_ms NUMERIC,
minimum_latency_ms NUMERIC,
p25_latency_ms NUMERIC,
median_latency_ms NUMERIC,
p75_latency_ms NUMERIC,
p90_latency_ms NUMERIC,
p95_latency_ms NUMERIC,
p99_latency_ms NUMERIC,
maximum_latency_ms NUMERIC
);
CREATE INDEX IF NOT EXISTS benchbase_results_details_recorded_at_timestamp_idx
ON benchbase_results_details USING BRIN (recorded_at_timestamp);
CREATE INDEX IF NOT EXISTS benchbase_results_details_suit_idx
ON benchbase_results_details USING BTREE (suit text_pattern_ops);
"""
try:
with conn.cursor() as cursor:
cursor.execute(create_table_query)
conn.commit()
print("Successfully created/verified benchbase_results_details table")
except Exception as e:
print(f"Error creating benchbase_results_details table: {e}")
sys.exit(1)
def process_csv_results(csv_file_path, start_timestamp_ms, suit, revision, platform):
"""Process CSV results and return data for database insertion."""
try:
# Read CSV file
df = pd.read_csv(csv_file_path)
# Validate required columns exist
required_columns = [
"Time (seconds)",
"Throughput (requests/second)",
"Average Latency (millisecond)",
"Minimum Latency (millisecond)",
"25th Percentile Latency (millisecond)",
"Median Latency (millisecond)",
"75th Percentile Latency (millisecond)",
"90th Percentile Latency (millisecond)",
"95th Percentile Latency (millisecond)",
"99th Percentile Latency (millisecond)",
"Maximum Latency (millisecond)",
]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"Error: Missing required columns in CSV: {missing_columns}")
return []
csv_data = []
for _, row in df.iterrows():
# Calculate timestamp: start_timestamp_ms + (time_seconds * 1000)
time_seconds = row["Time (seconds)"]
row_timestamp_ms = start_timestamp_ms + (time_seconds * 1000)
# Convert to UTC timestamp
row_timestamp = datetime.fromtimestamp(
row_timestamp_ms / 1000.0, tz=timezone.utc
).isoformat()
csv_row = {
"suit": suit,
"revision": revision,
"platform": platform,
"recorded_at_timestamp": row_timestamp,
"requests_per_second": float(row["Throughput (requests/second)"]),
"average_latency_ms": float(row["Average Latency (millisecond)"]),
"minimum_latency_ms": float(row["Minimum Latency (millisecond)"]),
"p25_latency_ms": float(row["25th Percentile Latency (millisecond)"]),
"median_latency_ms": float(row["Median Latency (millisecond)"]),
"p75_latency_ms": float(row["75th Percentile Latency (millisecond)"]),
"p90_latency_ms": float(row["90th Percentile Latency (millisecond)"]),
"p95_latency_ms": float(row["95th Percentile Latency (millisecond)"]),
"p99_latency_ms": float(row["99th Percentile Latency (millisecond)"]),
"maximum_latency_ms": float(row["Maximum Latency (millisecond)"]),
}
csv_data.append(csv_row)
print(f"Processed {len(csv_data)} rows from CSV file")
return csv_data
except FileNotFoundError:
print(f"Error: CSV file not found: {csv_file_path}")
return []
except Exception as e:
print(f"Error processing CSV file {csv_file_path}: {e}")
return []
def insert_csv_results(conn, csv_data):
"""Insert CSV results into benchbase_results_details table."""
if not csv_data:
print("No CSV data to insert")
return
insert_query = """
INSERT INTO benchbase_results_details
(suit, revision, platform, recorded_at_timestamp, requests_per_second,
average_latency_ms, minimum_latency_ms, p25_latency_ms, median_latency_ms,
p75_latency_ms, p90_latency_ms, p95_latency_ms, p99_latency_ms, maximum_latency_ms)
VALUES (%(suit)s, %(revision)s, %(platform)s, %(recorded_at_timestamp)s, %(requests_per_second)s,
%(average_latency_ms)s, %(minimum_latency_ms)s, %(p25_latency_ms)s, %(median_latency_ms)s,
%(p75_latency_ms)s, %(p90_latency_ms)s, %(p95_latency_ms)s, %(p99_latency_ms)s, %(maximum_latency_ms)s)
"""
try:
with conn.cursor() as cursor:
cursor.executemany(insert_query, csv_data)
conn.commit()
print(
f"Successfully inserted {len(csv_data)} detailed results into benchbase_results_details"
)
# Log some sample data for verification
sample = csv_data[0]
print(
f"Sample detail: {sample['requests_per_second']} req/s at {sample['recorded_at_timestamp']}"
)
except Exception as e:
print(f"Error inserting CSV results into database: {e}")
sys.exit(1)
def parse_load_log(log_file_path, scalefactor):
"""Parse load log file and extract load metrics."""
try:
with open(log_file_path) as f:
log_content = f.read()
# Regex patterns to match the timestamp lines
loading_pattern = r"\[INFO \] (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),\d{3}.*Loading data into TPCC database"
finished_pattern = r"\[INFO \] (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}),\d{3}.*Finished loading data into TPCC database"
loading_match = re.search(loading_pattern, log_content)
finished_match = re.search(finished_pattern, log_content)
if not loading_match or not finished_match:
print(f"Warning: Could not find loading timestamps in log file {log_file_path}")
return None
# Parse timestamps
loading_time = datetime.strptime(loading_match.group(1), "%Y-%m-%d %H:%M:%S")
finished_time = datetime.strptime(finished_match.group(1), "%Y-%m-%d %H:%M:%S")
# Calculate duration in seconds
duration_seconds = (finished_time - loading_time).total_seconds()
# Calculate throughput: scalefactor/warehouses: 10 warehouses is approx. 1 GB of data
load_throughput = (scalefactor * 1024 / 10.0) / duration_seconds
# Convert end time to UTC timestamp for database
finished_time_utc = finished_time.replace(tzinfo=timezone.utc).isoformat()
print(f"Load metrics: Duration={duration_seconds}s, Throughput={load_throughput:.2f} MB/s")
return {
"duration_seconds": duration_seconds,
"throughput_mb_per_sec": load_throughput,
"end_timestamp": finished_time_utc,
}
except FileNotFoundError:
print(f"Warning: Load log file not found: {log_file_path}")
return None
except Exception as e:
print(f"Error parsing load log file {log_file_path}: {e}")
return None
def insert_load_metrics(conn, load_metrics, suit, revision, platform, labels_json):
"""Insert load metrics into perf_test_results table."""
if not load_metrics:
print("No load metrics to insert")
return
load_metrics_data = [
{
"suit": suit,
"revision": revision,
"platform": platform,
"metric_name": "load_duration_seconds",
"metric_value": load_metrics["duration_seconds"],
"metric_unit": "seconds",
"metric_report_type": "lower_is_better",
"recorded_at_timestamp": load_metrics["end_timestamp"],
"labels": labels_json,
},
{
"suit": suit,
"revision": revision,
"platform": platform,
"metric_name": "load_throughput",
"metric_value": load_metrics["throughput_mb_per_sec"],
"metric_unit": "MB/second",
"metric_report_type": "higher_is_better",
"recorded_at_timestamp": load_metrics["end_timestamp"],
"labels": labels_json,
},
]
insert_query = """
INSERT INTO perf_test_results
(suit, revision, platform, metric_name, metric_value, metric_unit,
metric_report_type, recorded_at_timestamp, labels)
VALUES (%(suit)s, %(revision)s, %(platform)s, %(metric_name)s, %(metric_value)s,
%(metric_unit)s, %(metric_report_type)s, %(recorded_at_timestamp)s, %(labels)s)
"""
try:
with conn.cursor() as cursor:
cursor.executemany(insert_query, load_metrics_data)
conn.commit()
print(f"Successfully inserted {len(load_metrics_data)} load metrics into perf_test_results")
except Exception as e:
print(f"Error inserting load metrics into database: {e}")
sys.exit(1)
def main():
"""Main function to parse arguments and upload results."""
parser = argparse.ArgumentParser(
description="Upload BenchBase TPC-C results to perf_test_results database"
)
parser.add_argument(
"--summary-json", type=str, required=False, help="Path to the summary.json file"
)
parser.add_argument(
"--run-type",
type=str,
required=True,
choices=["warmup", "opt-rate", "ramp-up", "load"],
help="Type of benchmark run",
)
parser.add_argument("--min-cu", type=float, required=True, help="Minimum compute units")
parser.add_argument("--max-cu", type=float, required=True, help="Maximum compute units")
parser.add_argument("--project-id", type=str, required=True, help="Neon project ID")
parser.add_argument(
"--revision", type=str, required=True, help="Git commit hash (40 characters)"
)
parser.add_argument(
"--connection-string", type=str, required=True, help="PostgreSQL connection string"
)
parser.add_argument(
"--results-csv",
type=str,
required=False,
help="Path to the results.csv file for detailed metrics upload",
)
parser.add_argument(
"--load-log",
type=str,
required=False,
help="Path to the load log file for load phase metrics",
)
parser.add_argument(
"--warehouses",
type=int,
required=False,
help="Number of warehouses (scalefactor) for load metrics calculation",
)
args = parser.parse_args()
# Validate inputs
if args.summary_json and not Path(args.summary_json).exists():
print(f"Error: Summary JSON file does not exist: {args.summary_json}")
sys.exit(1)
if not args.summary_json and not args.load_log:
print("Error: Either summary JSON or load log file must be provided")
sys.exit(1)
if len(args.revision) != 40:
print(f"Warning: Revision should be 40 characters, got {len(args.revision)}")
# Load and process summary data if provided
summary_data = None
metrics = []
if args.summary_json:
summary_data = load_summary_json(args.summary_json)
metrics = extract_metrics(summary_data)
if not metrics:
print("Warning: No metrics found in summary JSON")
# Build common data for all metrics
if summary_data:
scalefactor = summary_data.get("scalefactor", "unknown")
terminals = summary_data.get("terminals", "unknown")
labels = build_labels(summary_data, args.project_id)
else:
# For load-only processing, use warehouses argument as scalefactor
scalefactor = args.warehouses if args.warehouses else "unknown"
terminals = "unknown"
labels = {"project_id": args.project_id}
suit = build_suit_name(scalefactor, terminals, args.run_type, args.min_cu, args.max_cu)
platform = f"prod-us-east-2-{args.project_id}"
# Convert timestamp - only needed for summary metrics and CSV processing
current_timestamp_ms = None
start_timestamp_ms = None
recorded_at = None
if summary_data:
current_timestamp_ms = summary_data.get("Current Timestamp (milliseconds)")
start_timestamp_ms = summary_data.get("Start timestamp (milliseconds)")
if current_timestamp_ms:
recorded_at = convert_timestamp_to_utc(current_timestamp_ms)
else:
print("Warning: No timestamp found in JSON, using current time")
recorded_at = datetime.now(timezone.utc).isoformat()
if not start_timestamp_ms:
print("Warning: No start timestamp found in JSON, CSV upload may be incorrect")
start_timestamp_ms = (
current_timestamp_ms or datetime.now(timezone.utc).timestamp() * 1000
)
# Print Grafana dashboard link for cross-service endpoint debugging
if start_timestamp_ms and current_timestamp_ms:
grafana_url = (
f"https://neonprod.grafana.net/d/cdya0okb81zwga/cross-service-endpoint-debugging"
f"?orgId=1&from={int(start_timestamp_ms)}&to={int(current_timestamp_ms)}"
f"&timezone=utc&var-env=prod&var-input_project_id={args.project_id}"
)
print(f'Cross service endpoint dashboard for "{args.run_type}" phase: {grafana_url}')
# Prepare metrics data for database insertion (only if we have summary metrics)
metrics_data = []
if metrics and recorded_at:
for metric_name, metric_value in metrics:
metric_info = get_metric_info(metric_name)
row = {
"suit": suit,
"revision": args.revision,
"platform": platform,
"metric_name": metric_name,
"metric_value": float(metric_value), # Ensure numeric type
"metric_unit": metric_info["unit"],
"metric_report_type": metric_info["report_type"],
"recorded_at_timestamp": recorded_at,
"labels": json.dumps(labels), # Convert to JSON string for JSONB column
}
metrics_data.append(row)
print(f"Prepared {len(metrics_data)} summary metrics for upload to database")
print(f"Suit: {suit}")
print(f"Platform: {platform}")
# Connect to database and insert metrics
try:
conn = psycopg2.connect(args.connection_string)
# Insert summary metrics into perf_test_results (if any)
if metrics_data:
insert_metrics(conn, metrics_data)
else:
print("No summary metrics to upload")
# Process and insert detailed CSV results if provided
if args.results_csv:
print(f"Processing detailed CSV results from: {args.results_csv}")
# Create table if it doesn't exist
create_benchbase_results_details_table(conn)
# Process CSV data
csv_data = process_csv_results(
args.results_csv, start_timestamp_ms, suit, args.revision, platform
)
# Insert CSV data
if csv_data:
insert_csv_results(conn, csv_data)
else:
print("No CSV data to upload")
else:
print("No CSV file provided, skipping detailed results upload")
# Process and insert load metrics if provided
if args.load_log:
print(f"Processing load metrics from: {args.load_log}")
# Parse load log and extract metrics
load_metrics = parse_load_log(args.load_log, scalefactor)
# Insert load metrics
if load_metrics:
insert_load_metrics(
conn, load_metrics, suit, args.revision, platform, json.dumps(labels)
)
else:
print("No load metrics to upload")
else:
print("No load log file provided, skipping load metrics upload")
conn.close()
print("Database upload completed successfully")
except psycopg2.Error as e:
print(f"Database connection/query error: {e}")
sys.exit(1)
except Exception as e:
print(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()