Compare commits

...

7 Commits

Author SHA1 Message Date
BodoBolero
9b9a12a3a5 remove flamegraph added by mistake 2024-10-16 08:04:15 +02:00
BodoBolero
52eafba2cc improved db size visualizer 2024-10-16 08:03:09 +02:00
BodoBolero
4cfedee06d undo --no-sync - it didn't provide big improvement 2024-10-11 17:42:03 +02:00
BodoBolero
80ae39311e more ingest testcases 2024-10-11 17:41:00 +02:00
BodoBolero
144cac71d8 version of wal_test that works with neon local and sk but no ps 2024-10-10 20:03:18 +02:00
BodoBolero
4f2d8894ac first version of wal_test which doesn't seem to work without PS 2024-10-10 17:47:36 +02:00
BodoBolero
1755fb4ef3 test pg_restore throughput postgres with SK but no PS 2024-10-10 12:38:29 +02:00

View File

@@ -14,7 +14,7 @@ from contextlib import closing
from dataclasses import dataclass, field
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, List
import psycopg2
import psycopg2.errors
@@ -735,7 +735,7 @@ class ProposerPostgres(PgProtocol):
"""Path to postgresql.conf"""
return os.path.join(self.pgdata_dir, "postgresql.conf")
def create_dir_config(self, safekeepers: str):
def create_dir_config(self, safekeepers: str, additional_config_options: Optional[dict[str,str]] = None):
"""Create dir and config for running --sync-safekeepers"""
Path(self.pg_data_dir_path()).mkdir(exist_ok=True)
@@ -750,6 +750,9 @@ class ProposerPostgres(PgProtocol):
f"listen_addresses = '{self.listen_addr}'\n",
f"port = '{self.port}'\n",
]
if additional_config_options:
for key, value in additional_config_options.items():
cfg.append(f"{key} = '{value}'\n")
f.writelines(cfg)
@@ -1446,6 +1449,7 @@ class SafekeeperEnv:
pg_bin: PgBin,
neon_binpath: Path,
num_safekeepers: int = 1,
pg_conf_options: Optional[dict[str, str]] = None,
):
self.repo_dir = repo_dir
self.port_distributor = port_distributor
@@ -1457,6 +1461,7 @@ class SafekeeperEnv:
self.postgres: Optional[ProposerPostgres] = None
self.tenant_id: Optional[TenantId] = None
self.timeline_id: Optional[TimelineId] = None
self.pg_conf_options = pg_conf_options
def init(self) -> SafekeeperEnv:
assert self.postgres is None, "postgres is already initialized"
@@ -1499,6 +1504,7 @@ class SafekeeperEnv:
str(i),
"--broker-endpoint",
self.fake_broker_endpoint,
# "--no-sync",
]
log.info(f'Running command "{" ".join(cmd)}"')
@@ -1533,7 +1539,7 @@ class SafekeeperEnv:
self.port_distributor.get_port(),
)
pg.initdb()
pg.create_dir_config(self.get_safekeeper_connstrs())
pg.create_dir_config(self.get_safekeeper_connstrs(), self.pg_conf_options)
return pg
def kill_safekeeper(self, sk_dir):
@@ -1558,6 +1564,224 @@ class SafekeeperEnv:
self.kill_safekeeper(sk_proc.args[6])
def run_pg_restore(pg_bin: PgBin, dump_file: Path, postgres: ProposerPostgres, table_names: List[str]):
# env_vars = {
# "PGOPTIONS": "-c maintenance_work_mem=4388608 -c max_parallel_maintenance_workers=7",
# }
postgres.connstr
pg_restore_command: List[str] = [
"pg_restore",
"-v", # Verbose output
"-d", postgres.connstr(options='-c maintenance_work_mem=4388608 -c max_parallel_maintenance_workers=7 -cstatement_timeout=0'), # Target database
"--no-owner", # Do not restore ownership
"--jobs=4", # Number of parallel jobs
str(dump_file), # Dump file
]
# Add table names to the command
for table in table_names:
pg_restore_command.insert(-2, "-t")
pg_restore_command.insert(-2, table)
pg_bin.run(pg_restore_command)
return None
def download_pg_dump(database_name: str) -> Path:
dump_file_path = Path(f"/tmp/{database_name}.pg_dump")
if not dump_file_path.exists():
s3_path = f"s3://neon-github-dev/performance/pgdumps/{database_name}/{database_name}.pg_dump"
try:
log.info(f"Downloading {s3_path} to {dump_file_path}")
subprocess.run(["aws", "s3", "cp", s3_path, str(dump_file_path)], check=True)
except subprocess.CalledProcessError:
log.error("Failed to download the pg_dump file. Ensure AWS S3 credentials are set in the environment variables.")
raise
return dump_file_path
def test_safekeeper_without_pageserver_and_pg_restore(
test_output_dir: str,
port_distributor: PortDistributor,
pg_bin: PgBin,
neon_binpath: Path,
):
# Create the environment in the test-specific output dir
repo_dir = Path(os.path.join(test_output_dir, "repo"))
# Download the pg_dump file if it doesn't exist
database_name = "clickbench" # Replace with your database name
dump_file_path = download_pg_dump(database_name)
pg_conf_options: dict[str, str] = {
"shared_buffers": "8GB",
}
env = SafekeeperEnv(
repo_dir,
port_distributor,
pg_bin,
neon_binpath,
pg_conf_options=pg_conf_options,
)
with env:
env.init()
assert env.postgres is not None
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
log.info(f"shared_buffers: {shared_buffers}")
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
log.info(f"Database size before restore: {size_before}")
start_time = time.time()
run_pg_restore(pg_bin, dump_file_path, env.postgres, ["hits"])
end_time = time.time()
duration = end_time - start_time
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
log.info(f"Database size after restore: {size_after}")
# Calculate the restore rate in bytes/second
restored_size = size_after - size_before
restore_rate = restored_size / duration
log.info(f"pg_restore duration: {duration:.2f} seconds")
log.info(f"Restore rate: {restore_rate:.2f} bytes/second")
def test_safekeeper_without_pageserver_and_pg_restore_tpch(
test_output_dir: str,
port_distributor: PortDistributor,
pg_bin: PgBin,
neon_binpath: Path,
):
# Create the environment in the test-specific output dir
repo_dir = Path(os.path.join(test_output_dir, "repo"))
# Download the pg_dump file if it doesn't exist
database_name = "tpch" # Replace with your database name
dump_file_path = download_pg_dump(database_name)
pg_conf_options: dict[str, str] = {
"shared_buffers": "8GB",
}
env = SafekeeperEnv(
repo_dir,
port_distributor,
pg_bin,
neon_binpath,
pg_conf_options=pg_conf_options,
)
with env:
env.init()
assert env.postgres is not None
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
log.info(f"shared_buffers: {shared_buffers}")
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
log.info(f"Database size before restore: {size_before}")
table_names = ["customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier"]
start_time = time.time()
run_pg_restore(pg_bin, dump_file_path, env.postgres, table_names)
end_time = time.time()
duration = end_time - start_time
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
log.info(f"Database size after restore: {size_after}")
# Calculate the restore rate in bytes/second
restored_size = size_after - size_before
restore_rate = restored_size / duration
log.info(f"pg_restore duration: {duration:.2f} seconds")
log.info(f"Restore rate: {restore_rate:.2f} bytes/second")
def test_safekeeper_without_pageserver_and_waltest(
test_output_dir: str,
port_distributor: PortDistributor,
pg_bin: PgBin,
neon_binpath: Path,
):
# Create the environment in the test-specific output dir
repo_dir = Path(os.path.join(test_output_dir, "repo"))
pg_conf_options: dict[str, str] = {
"shared_buffers": "8GB",
}
env = SafekeeperEnv(
repo_dir,
port_distributor,
pg_bin,
neon_binpath,
pg_conf_options=pg_conf_options,
)
wal_test = """
CREATE OR REPLACE FUNCTION public.wal_bandwidth_test()
RETURNS text
LANGUAGE plpgsql
AS $function$
declare
i int;
j int;
lastlsn pg_lsn;
nowlsn pg_lsn;
lastts timestamp;
nowts timestamp;
bandwidth numeric;
result text := ''; -- Initialize an empty string to capture messages
begin
for i in 1..100 loop
lastlsn = pg_current_wal_insert_lsn();
lastts = clock_timestamp();
-- Emit 100 MB of WAL
for j in 1..10000 loop
perform pg_logical_emit_message(false, '', repeat('x', 10486));
end loop;
nowlsn = pg_current_wal_insert_lsn();
nowts = clock_timestamp();
bandwidth = (nowlsn - lastlsn) / (extract(epoch from nowts) - extract(epoch from lastts));
-- Capture the message instead of raising a notice
result := result || format('bandwidth: %s kB / s%s',
lpad(round(bandwidth / 1024)::text, 10),
chr(10)); -- Newline for formatting
end loop;
return result; -- Return the concatenated string of messages
end;
$function$
;
"""
with env:
env.init()
assert env.postgres is not None
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
log.info(f"shared_buffers: {shared_buffers}")
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
log.info(f"Database size before test: {size_before}")
#env.postgres.safe_psql("create extension neon;")
env.postgres.safe_psql(wal_test)
start_time = time.time()
output = env.postgres.safe_psql("""
SET statement_timeout = 0;
select wal_bandwidth_test();
""")[0][0]
end_time = time.time()
duration = end_time - start_time
log.info(output)
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
log.info(f"Database size after test (irrelevant because no real WAL records, no relations): {size_after}")
# Calculate the restore rate in bytes/second
restored_size = size_after - size_before
restore_rate = restored_size / duration
log.info(f"test duration: {duration:.2f} seconds")
log.info(f"Average ingest rate(irrelevant because no real WAL records, no relations): {restore_rate:.2f} bytes/second")
def test_safekeeper_without_pageserver(
test_output_dir: str,
port_distributor: PortDistributor,