Fixture for fast_import binary is working

This commit is contained in:
Gleb Novikov
2025-01-02 18:06:12 +00:00
parent ebe26e218b
commit e291fb7edc
3 changed files with 205 additions and 1 deletions

View File

@@ -55,6 +55,8 @@ struct Args {
pg_bin_dir: Utf8PathBuf,
#[clap(long)]
pg_lib_dir: Utf8PathBuf,
#[clap(long)]
pg_port: Option<u16>, // port to run postgres on, 5432 is default
}
#[serde_with::serde_as]
@@ -91,6 +93,12 @@ pub(crate) async fn main() -> anyhow::Result<()> {
let working_directory = args.working_directory;
let pg_bin_dir = args.pg_bin_dir;
let pg_lib_dir = args.pg_lib_dir;
let pg_port = if args.pg_port.is_some() {
args.pg_port.unwrap()
} else {
info!("pg_port not specified, using default 5432");
5432
};
// Initialize AWS clients only if s3_prefix is specified
let (aws_config, kms_client) = if args.s3_prefix.is_some() {
@@ -191,6 +199,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
let mut postgres_proc = tokio::process::Command::new(pgbin)
.arg("-D")
.arg(&pgdata_dir)
.args(["-p", &format!("{pg_port}")])
.args(["-c", "wal_level=minimal"])
.args(["-c", "shared_buffers=10GB"])
.args(["-c", "max_wal_senders=0"])
@@ -226,7 +235,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
// Create neondb database in the running postgres
let restore_pg_connstring =
format!("host=localhost port=5432 user={superuser} dbname=postgres");
format!("host=localhost port={pg_port} user={superuser} dbname=postgres");
loop {
match tokio_postgres::connect(&restore_pg_connstring, tokio_postgres::NoTls).await {
Ok((client, connection)) => {

View File

@@ -0,0 +1,45 @@
import subprocess
from pathlib import Path
from fixtures.neon_cli import AbstractNeonCli
class FastImport(AbstractNeonCli):
COMMAND = "fast_import"
def __init__(self, extra_env: dict[str, str] | None, binpath: Path, pg_dir: Path):
if extra_env is None:
env_vars = {}
else:
env_vars = extra_env.copy()
if not (binpath / self.COMMAND).exists():
raise Exception(f"{self.COMMAND} binary not found at '{binpath}'")
super().__init__(env_vars, binpath)
self.pg_bin = pg_dir / "bin"
if not (self.pg_bin / "postgres") .exists():
raise Exception(f"postgres binary was not found at '{self.pg_bin}'")
self.pg_lib = pg_dir / "lib"
def run(self,
workdir: Path,
pg_port: int,
source_connection_string: str | None = None,
s3prefix: str | None = None,
interactive: bool = False,
) -> subprocess.CompletedProcess[str]:
args = [
f"--pg-bin-dir={self.pg_bin}",
f"--pg-lib-dir={self.pg_lib}",
f"--pg-port={pg_port}",
f"--working-directory={workdir}",
]
if source_connection_string is not None:
args.append(f"--source-connection-string={source_connection_string}")
if s3prefix is not None:
args.append(f"--s3-prefix={s3prefix}")
if interactive:
args.append("--interactive")
return self.raw_cli(args)

View File

@@ -0,0 +1,150 @@
from __future__ import annotations
import shutil
import tempfile
from pathlib import Path
from fixtures.fast_import import FastImport
from fixtures.log_helper import log
#
# Create ancestor branches off the main branch.
#
def test_fast_import(port_distributor, test_output_dir: Path, neon_binpath: Path, pg_distrib_dir: Path):
# TODO:
# 1. fast_import fixture
# 2. maybe run vanilla postgres and insert some data into it
# 3. run fast_import in interactive mode in the background
# - wait for "interactive mode" message
# 4. run some query against the imported data (in running postgres inside fast_import, port is known)
# Maybe test with pageserver?
# 1. run whole neon env
# 2. create timeline with some s3 path???
# 3. run fast_import with s3 prefix
# 4. ??? mock http where pageserver will report progress
# 5. run compute on this timeline and check if data is there
fast_import = FastImport(
None,
neon_binpath,
pg_distrib_dir / "v16",
)
workdir = Path(tempfile.mkdtemp())
cmd = fast_import.run(
Path(tempfile.mkdtemp()),
port_distributor.get_port(),
"postgresql://gleb:O9UM4tPHEafC@ep-autumn-rain-a3smlr75.eu-central-1.aws.neon.tech/neondb?sslmode=require",
None,
)
# dump stdout & stderr into test log dir
with open(test_output_dir / "fast_import.stdout", "w") as f:
f.write(cmd.stdout)
with open(test_output_dir / "fast_import.stderr", "w") as f:
f.write(cmd.stderr)
log.info('Written logs to %s', test_output_dir)
# clean workdir
shutil.rmtree(workdir)
# env = neon_env_builder.init_start()
# pageserver_http = env.pageserver.http_client()
# # Override defaults: 4M checkpoint_distance, disable background compaction and gc.
# tenant, _ = env.create_tenant(
# conf={
# "checkpoint_distance": "4194304",
# "gc_period": "0s",
# "compaction_period": "0s",
# }
# )
#
# failpoint = "flush-frozen-pausable"
#
# pageserver_http.configure_failpoints((failpoint, "sleep(10000)"))
#
# endpoint_branch0 = env.endpoints.create_start("main", tenant_id=tenant)
# branch0_cur = endpoint_branch0.connect().cursor()
# branch0_timeline = TimelineId(query_scalar(branch0_cur, "SHOW neon.timeline_id"))
# log.info(f"b0 timeline {branch0_timeline}")
#
# # Create table, and insert 100k rows.
# branch0_lsn = query_scalar(branch0_cur, "SELECT pg_current_wal_insert_lsn()")
# log.info(f"b0 at lsn {branch0_lsn}")
#
# branch0_cur.execute("CREATE TABLE foo (t text) WITH (autovacuum_enabled = off)")
# branch0_cur.execute(
# """
# INSERT INTO foo
# SELECT '00112233445566778899AABBCCDDEEFF' || ':branch0:' || g
# FROM generate_series(1, 100000) g
# """
# )
# lsn_100 = query_scalar(branch0_cur, "SELECT pg_current_wal_insert_lsn()")
# log.info(f"LSN after 100k rows: {lsn_100}")
#
# # Create branch1.
# env.create_branch(
# "branch1", ancestor_branch_name="main", ancestor_start_lsn=lsn_100, tenant_id=tenant
# )
# endpoint_branch1 = env.endpoints.create_start("branch1", tenant_id=tenant)
#
# branch1_cur = endpoint_branch1.connect().cursor()
# branch1_timeline = TimelineId(query_scalar(branch1_cur, "SHOW neon.timeline_id"))
# log.info(f"b1 timeline {branch1_timeline}")
#
# branch1_lsn = query_scalar(branch1_cur, "SELECT pg_current_wal_insert_lsn()")
# log.info(f"b1 at lsn {branch1_lsn}")
#
# # Insert 100k rows.
# branch1_cur.execute(
# """
# INSERT INTO foo
# SELECT '00112233445566778899AABBCCDDEEFF' || ':branch1:' || g
# FROM generate_series(1, 100000) g
# """
# )
# lsn_200 = query_scalar(branch1_cur, "SELECT pg_current_wal_insert_lsn()")
# log.info(f"LSN after 200k rows: {lsn_200}")
#
# # Create branch2.
# env.create_branch(
# "branch2", ancestor_branch_name="branch1", ancestor_start_lsn=lsn_200, tenant_id=tenant
# )
# endpoint_branch2 = env.endpoints.create_start("branch2", tenant_id=tenant)
# branch2_cur = endpoint_branch2.connect().cursor()
#
# branch2_timeline = TimelineId(query_scalar(branch2_cur, "SHOW neon.timeline_id"))
# log.info(f"b2 timeline {branch2_timeline}")
#
# branch2_lsn = query_scalar(branch2_cur, "SELECT pg_current_wal_insert_lsn()")
# log.info(f"b2 at lsn {branch2_lsn}")
#
# # Insert 100k rows.
# branch2_cur.execute(
# """
# INSERT INTO foo
# SELECT '00112233445566778899AABBCCDDEEFF' || ':branch2:' || g
# FROM generate_series(1, 100000) g
# """
# )
# lsn_300 = query_scalar(branch2_cur, "SELECT pg_current_wal_insert_lsn()")
# log.info(f"LSN after 300k rows: {lsn_300}")
#
# # Run compaction on branch1.
# compact = f"compact {tenant} {branch1_timeline}"
# log.info(compact)
# pageserver_http.timeline_compact(tenant, branch1_timeline)
#
# assert query_scalar(branch0_cur, "SELECT count(*) FROM foo") == 100000
#
# assert query_scalar(branch1_cur, "SELECT count(*) FROM foo") == 200000
#
# assert query_scalar(branch2_cur, "SELECT count(*) FROM foo") == 300000
#
# pageserver_http.configure_failpoints((failpoint, "off"))