mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-03 14:20:36 +00:00
Compare commits
7 Commits
conrad/rem
...
bodobolero
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9b9a12a3a5 | ||
|
|
52eafba2cc | ||
|
|
4cfedee06d | ||
|
|
80ae39311e | ||
|
|
144cac71d8 | ||
|
|
4f2d8894ac | ||
|
|
1755fb4ef3 |
@@ -14,7 +14,7 @@ from contextlib import closing
|
||||
from dataclasses import dataclass, field
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, List
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.errors
|
||||
@@ -735,7 +735,7 @@ class ProposerPostgres(PgProtocol):
|
||||
"""Path to postgresql.conf"""
|
||||
return os.path.join(self.pgdata_dir, "postgresql.conf")
|
||||
|
||||
def create_dir_config(self, safekeepers: str):
|
||||
def create_dir_config(self, safekeepers: str, additional_config_options: Optional[dict[str,str]] = None):
|
||||
"""Create dir and config for running --sync-safekeepers"""
|
||||
|
||||
Path(self.pg_data_dir_path()).mkdir(exist_ok=True)
|
||||
@@ -750,6 +750,9 @@ class ProposerPostgres(PgProtocol):
|
||||
f"listen_addresses = '{self.listen_addr}'\n",
|
||||
f"port = '{self.port}'\n",
|
||||
]
|
||||
if additional_config_options:
|
||||
for key, value in additional_config_options.items():
|
||||
cfg.append(f"{key} = '{value}'\n")
|
||||
|
||||
f.writelines(cfg)
|
||||
|
||||
@@ -1446,6 +1449,7 @@ class SafekeeperEnv:
|
||||
pg_bin: PgBin,
|
||||
neon_binpath: Path,
|
||||
num_safekeepers: int = 1,
|
||||
pg_conf_options: Optional[dict[str, str]] = None,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.port_distributor = port_distributor
|
||||
@@ -1457,6 +1461,7 @@ class SafekeeperEnv:
|
||||
self.postgres: Optional[ProposerPostgres] = None
|
||||
self.tenant_id: Optional[TenantId] = None
|
||||
self.timeline_id: Optional[TimelineId] = None
|
||||
self.pg_conf_options = pg_conf_options
|
||||
|
||||
def init(self) -> SafekeeperEnv:
|
||||
assert self.postgres is None, "postgres is already initialized"
|
||||
@@ -1499,6 +1504,7 @@ class SafekeeperEnv:
|
||||
str(i),
|
||||
"--broker-endpoint",
|
||||
self.fake_broker_endpoint,
|
||||
# "--no-sync",
|
||||
]
|
||||
log.info(f'Running command "{" ".join(cmd)}"')
|
||||
|
||||
@@ -1533,7 +1539,7 @@ class SafekeeperEnv:
|
||||
self.port_distributor.get_port(),
|
||||
)
|
||||
pg.initdb()
|
||||
pg.create_dir_config(self.get_safekeeper_connstrs())
|
||||
pg.create_dir_config(self.get_safekeeper_connstrs(), self.pg_conf_options)
|
||||
return pg
|
||||
|
||||
def kill_safekeeper(self, sk_dir):
|
||||
@@ -1558,6 +1564,224 @@ class SafekeeperEnv:
|
||||
self.kill_safekeeper(sk_proc.args[6])
|
||||
|
||||
|
||||
def run_pg_restore(pg_bin: PgBin, dump_file: Path, postgres: ProposerPostgres, table_names: List[str]):
|
||||
# env_vars = {
|
||||
# "PGOPTIONS": "-c maintenance_work_mem=4388608 -c max_parallel_maintenance_workers=7",
|
||||
# }
|
||||
|
||||
postgres.connstr
|
||||
|
||||
pg_restore_command: List[str] = [
|
||||
"pg_restore",
|
||||
"-v", # Verbose output
|
||||
"-d", postgres.connstr(options='-c maintenance_work_mem=4388608 -c max_parallel_maintenance_workers=7 -cstatement_timeout=0'), # Target database
|
||||
"--no-owner", # Do not restore ownership
|
||||
"--jobs=4", # Number of parallel jobs
|
||||
str(dump_file), # Dump file
|
||||
]
|
||||
|
||||
# Add table names to the command
|
||||
for table in table_names:
|
||||
pg_restore_command.insert(-2, "-t")
|
||||
pg_restore_command.insert(-2, table)
|
||||
|
||||
pg_bin.run(pg_restore_command)
|
||||
return None
|
||||
|
||||
|
||||
def download_pg_dump(database_name: str) -> Path:
|
||||
dump_file_path = Path(f"/tmp/{database_name}.pg_dump")
|
||||
if not dump_file_path.exists():
|
||||
s3_path = f"s3://neon-github-dev/performance/pgdumps/{database_name}/{database_name}.pg_dump"
|
||||
try:
|
||||
log.info(f"Downloading {s3_path} to {dump_file_path}")
|
||||
subprocess.run(["aws", "s3", "cp", s3_path, str(dump_file_path)], check=True)
|
||||
except subprocess.CalledProcessError:
|
||||
log.error("Failed to download the pg_dump file. Ensure AWS S3 credentials are set in the environment variables.")
|
||||
raise
|
||||
return dump_file_path
|
||||
|
||||
def test_safekeeper_without_pageserver_and_pg_restore(
|
||||
test_output_dir: str,
|
||||
port_distributor: PortDistributor,
|
||||
pg_bin: PgBin,
|
||||
neon_binpath: Path,
|
||||
):
|
||||
# Create the environment in the test-specific output dir
|
||||
repo_dir = Path(os.path.join(test_output_dir, "repo"))
|
||||
|
||||
# Download the pg_dump file if it doesn't exist
|
||||
database_name = "clickbench" # Replace with your database name
|
||||
dump_file_path = download_pg_dump(database_name)
|
||||
|
||||
pg_conf_options: dict[str, str] = {
|
||||
"shared_buffers": "8GB",
|
||||
}
|
||||
|
||||
env = SafekeeperEnv(
|
||||
repo_dir,
|
||||
port_distributor,
|
||||
pg_bin,
|
||||
neon_binpath,
|
||||
pg_conf_options=pg_conf_options,
|
||||
)
|
||||
|
||||
with env:
|
||||
env.init()
|
||||
assert env.postgres is not None
|
||||
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
|
||||
log.info(f"shared_buffers: {shared_buffers}")
|
||||
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size before restore: {size_before}")
|
||||
|
||||
start_time = time.time()
|
||||
run_pg_restore(pg_bin, dump_file_path, env.postgres, ["hits"])
|
||||
end_time = time.time()
|
||||
duration = end_time - start_time
|
||||
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size after restore: {size_after}")
|
||||
# Calculate the restore rate in bytes/second
|
||||
restored_size = size_after - size_before
|
||||
restore_rate = restored_size / duration
|
||||
log.info(f"pg_restore duration: {duration:.2f} seconds")
|
||||
log.info(f"Restore rate: {restore_rate:.2f} bytes/second")
|
||||
|
||||
def test_safekeeper_without_pageserver_and_pg_restore_tpch(
|
||||
test_output_dir: str,
|
||||
port_distributor: PortDistributor,
|
||||
pg_bin: PgBin,
|
||||
neon_binpath: Path,
|
||||
):
|
||||
# Create the environment in the test-specific output dir
|
||||
repo_dir = Path(os.path.join(test_output_dir, "repo"))
|
||||
|
||||
# Download the pg_dump file if it doesn't exist
|
||||
database_name = "tpch" # Replace with your database name
|
||||
dump_file_path = download_pg_dump(database_name)
|
||||
|
||||
pg_conf_options: dict[str, str] = {
|
||||
"shared_buffers": "8GB",
|
||||
}
|
||||
|
||||
env = SafekeeperEnv(
|
||||
repo_dir,
|
||||
port_distributor,
|
||||
pg_bin,
|
||||
neon_binpath,
|
||||
pg_conf_options=pg_conf_options,
|
||||
)
|
||||
|
||||
with env:
|
||||
env.init()
|
||||
assert env.postgres is not None
|
||||
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
|
||||
log.info(f"shared_buffers: {shared_buffers}")
|
||||
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size before restore: {size_before}")
|
||||
|
||||
table_names = ["customer", "lineitem", "nation", "orders", "part", "partsupp", "region", "supplier"]
|
||||
start_time = time.time()
|
||||
run_pg_restore(pg_bin, dump_file_path, env.postgres, table_names)
|
||||
end_time = time.time()
|
||||
duration = end_time - start_time
|
||||
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size after restore: {size_after}")
|
||||
# Calculate the restore rate in bytes/second
|
||||
restored_size = size_after - size_before
|
||||
restore_rate = restored_size / duration
|
||||
log.info(f"pg_restore duration: {duration:.2f} seconds")
|
||||
log.info(f"Restore rate: {restore_rate:.2f} bytes/second")
|
||||
|
||||
def test_safekeeper_without_pageserver_and_waltest(
|
||||
test_output_dir: str,
|
||||
port_distributor: PortDistributor,
|
||||
pg_bin: PgBin,
|
||||
neon_binpath: Path,
|
||||
):
|
||||
# Create the environment in the test-specific output dir
|
||||
repo_dir = Path(os.path.join(test_output_dir, "repo"))
|
||||
|
||||
pg_conf_options: dict[str, str] = {
|
||||
"shared_buffers": "8GB",
|
||||
}
|
||||
|
||||
env = SafekeeperEnv(
|
||||
repo_dir,
|
||||
port_distributor,
|
||||
pg_bin,
|
||||
neon_binpath,
|
||||
pg_conf_options=pg_conf_options,
|
||||
)
|
||||
|
||||
wal_test = """
|
||||
CREATE OR REPLACE FUNCTION public.wal_bandwidth_test()
|
||||
RETURNS text
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
declare
|
||||
i int;
|
||||
j int;
|
||||
lastlsn pg_lsn;
|
||||
nowlsn pg_lsn;
|
||||
lastts timestamp;
|
||||
nowts timestamp;
|
||||
bandwidth numeric;
|
||||
|
||||
result text := ''; -- Initialize an empty string to capture messages
|
||||
begin
|
||||
for i in 1..100 loop
|
||||
|
||||
lastlsn = pg_current_wal_insert_lsn();
|
||||
lastts = clock_timestamp();
|
||||
|
||||
-- Emit 100 MB of WAL
|
||||
for j in 1..10000 loop
|
||||
perform pg_logical_emit_message(false, '', repeat('x', 10486));
|
||||
end loop;
|
||||
|
||||
nowlsn = pg_current_wal_insert_lsn();
|
||||
nowts = clock_timestamp();
|
||||
|
||||
bandwidth = (nowlsn - lastlsn) / (extract(epoch from nowts) - extract(epoch from lastts));
|
||||
|
||||
-- Capture the message instead of raising a notice
|
||||
result := result || format('bandwidth: %s kB / s%s',
|
||||
lpad(round(bandwidth / 1024)::text, 10),
|
||||
chr(10)); -- Newline for formatting
|
||||
|
||||
end loop;
|
||||
|
||||
return result; -- Return the concatenated string of messages
|
||||
end;
|
||||
$function$
|
||||
;
|
||||
"""
|
||||
|
||||
with env:
|
||||
env.init()
|
||||
assert env.postgres is not None
|
||||
shared_buffers = env.postgres.safe_psql("show shared_buffers")[0][0]
|
||||
log.info(f"shared_buffers: {shared_buffers}")
|
||||
size_before = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size before test: {size_before}")
|
||||
#env.postgres.safe_psql("create extension neon;")
|
||||
env.postgres.safe_psql(wal_test)
|
||||
start_time = time.time()
|
||||
output = env.postgres.safe_psql("""
|
||||
SET statement_timeout = 0;
|
||||
select wal_bandwidth_test();
|
||||
""")[0][0]
|
||||
end_time = time.time()
|
||||
duration = end_time - start_time
|
||||
log.info(output)
|
||||
size_after = env.postgres.safe_psql("select pg_database_size('postgres');")[0][0]
|
||||
log.info(f"Database size after test (irrelevant because no real WAL records, no relations): {size_after}")
|
||||
# Calculate the restore rate in bytes/second
|
||||
restored_size = size_after - size_before
|
||||
restore_rate = restored_size / duration
|
||||
log.info(f"test duration: {duration:.2f} seconds")
|
||||
log.info(f"Average ingest rate(irrelevant because no real WAL records, no relations): {restore_rate:.2f} bytes/second")
|
||||
|
||||
def test_safekeeper_without_pageserver(
|
||||
test_output_dir: str,
|
||||
port_distributor: PortDistributor,
|
||||
|
||||
Reference in New Issue
Block a user