mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 17:10:38 +00:00
Merge branch 'main' of https://github.com/neondatabase/neon into skyzh/rm-file-if-fail
This commit is contained in:
@@ -5,7 +5,6 @@ import json
|
||||
import os
|
||||
import re
|
||||
import timeit
|
||||
import warnings
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
@@ -18,6 +17,7 @@ from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
from _pytest.terminal import TerminalReporter
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonPageserver
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
@@ -385,7 +385,7 @@ class NeonBenchmarker:
|
||||
path = f"{repo_dir}/tenants/{tenant_id}/timelines/{timeline_id}"
|
||||
|
||||
totalbytes = 0
|
||||
for root, dirs, files in os.walk(path):
|
||||
for root, _dirs, files in os.walk(path):
|
||||
for name in files:
|
||||
totalbytes += os.path.getsize(os.path.join(root, name))
|
||||
|
||||
@@ -492,7 +492,7 @@ def pytest_terminal_summary(
|
||||
return
|
||||
|
||||
if not result:
|
||||
warnings.warn("no results to store (no passed test suites)")
|
||||
log.warning("no results to store (no passed test suites)")
|
||||
return
|
||||
|
||||
get_out_path(Path(out_dir), revision=revision).write_text(
|
||||
|
||||
@@ -213,7 +213,7 @@ def worker_base_port(worker_seq_no: int) -> int:
|
||||
def get_dir_size(path: str) -> int:
|
||||
"""Return size in bytes."""
|
||||
totalbytes = 0
|
||||
for root, dirs, files in os.walk(path):
|
||||
for root, _dirs, files in os.walk(path):
|
||||
for name in files:
|
||||
totalbytes += os.path.getsize(os.path.join(root, name))
|
||||
|
||||
@@ -459,6 +459,7 @@ class AuthKeys:
|
||||
def generate_safekeeper_token(self) -> str:
|
||||
return self.generate_token(scope="safekeeperdata")
|
||||
|
||||
# generate token giving access to only one tenant
|
||||
def generate_tenant_token(self, tenant_id: TenantId) -> str:
|
||||
return self.generate_token(scope="tenant", tenant_id=str(tenant_id))
|
||||
|
||||
@@ -965,6 +966,7 @@ class NeonEnv:
|
||||
for i in range(1, config.num_safekeepers + 1):
|
||||
port = SafekeeperPort(
|
||||
pg=self.port_distributor.get_port(),
|
||||
pg_tenant_only=self.port_distributor.get_port(),
|
||||
http=self.port_distributor.get_port(),
|
||||
)
|
||||
id = config.safekeepers_id_start + i # assign ids sequentially
|
||||
@@ -973,6 +975,7 @@ class NeonEnv:
|
||||
[[safekeepers]]
|
||||
id = {id}
|
||||
pg_port = {port.pg}
|
||||
pg_tenant_only_port = {port.pg_tenant_only}
|
||||
http_port = {port.http}
|
||||
sync = {'true' if config.safekeepers_enable_fsync else 'false'}"""
|
||||
)
|
||||
@@ -1231,7 +1234,7 @@ class AbstractNeonCli(abc.ABC):
|
||||
stderr: {res.stderr}
|
||||
"""
|
||||
log.info(msg)
|
||||
raise Exception(msg) from subprocess.CalledProcessError(
|
||||
raise RuntimeError(msg) from subprocess.CalledProcessError(
|
||||
res.returncode, res.args, res.stdout, res.stderr
|
||||
)
|
||||
return res
|
||||
@@ -1255,10 +1258,8 @@ class NeonCli(AbstractNeonCli):
|
||||
"""
|
||||
Creates a new tenant, returns its id and its initial timeline's id.
|
||||
"""
|
||||
if tenant_id is None:
|
||||
tenant_id = TenantId.generate()
|
||||
if timeline_id is None:
|
||||
timeline_id = TimelineId.generate()
|
||||
tenant_id = tenant_id or TenantId.generate()
|
||||
timeline_id = timeline_id or TimelineId.generate()
|
||||
|
||||
args = [
|
||||
"tenant",
|
||||
@@ -1885,8 +1886,7 @@ class VanillaPostgres(PgProtocol):
|
||||
assert not self.running
|
||||
self.running = True
|
||||
|
||||
if log_path is None:
|
||||
log_path = os.path.join(self.pgdatadir, "pg.log")
|
||||
log_path = log_path or os.path.join(self.pgdatadir, "pg.log")
|
||||
|
||||
self.pg_bin.run_capture(
|
||||
["pg_ctl", "-w", "-D", str(self.pgdatadir), "-l", log_path, "start"]
|
||||
@@ -2346,8 +2346,7 @@ class Endpoint(PgProtocol):
|
||||
if not config_lines:
|
||||
config_lines = []
|
||||
|
||||
if endpoint_id is None:
|
||||
endpoint_id = self.env.generate_endpoint_id()
|
||||
endpoint_id = endpoint_id or self.env.generate_endpoint_id()
|
||||
self.endpoint_id = endpoint_id
|
||||
self.branch_name = branch_name
|
||||
|
||||
@@ -2363,8 +2362,7 @@ class Endpoint(PgProtocol):
|
||||
path = Path("endpoints") / self.endpoint_id / "pgdata"
|
||||
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
|
||||
|
||||
if config_lines is None:
|
||||
config_lines = []
|
||||
config_lines = config_lines or []
|
||||
|
||||
# set small 'max_replication_write_lag' to enable backpressure
|
||||
# and make tests more stable.
|
||||
@@ -2560,8 +2558,7 @@ class EndpointFactory:
|
||||
http_port=self.env.port_distributor.get_port(),
|
||||
)
|
||||
|
||||
if endpoint_id is None:
|
||||
endpoint_id = self.env.generate_endpoint_id()
|
||||
endpoint_id = endpoint_id or self.env.generate_endpoint_id()
|
||||
|
||||
self.num_instances += 1
|
||||
self.endpoints.append(ep)
|
||||
@@ -2614,6 +2611,7 @@ class EndpointFactory:
|
||||
@dataclass
|
||||
class SafekeeperPort:
|
||||
pg: int
|
||||
pg_tenant_only: int
|
||||
http: int
|
||||
|
||||
|
||||
@@ -2641,7 +2639,7 @@ class Safekeeper:
|
||||
if elapsed > 3:
|
||||
raise RuntimeError(
|
||||
f"timed out waiting {elapsed:.0f}s for wal acceptor start: {e}"
|
||||
)
|
||||
) from e
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
break # success
|
||||
@@ -2721,7 +2719,8 @@ class SafekeeperHttpClient(requests.Session):
|
||||
def check_status(self):
|
||||
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
|
||||
|
||||
def debug_dump(self, params: Dict[str, str] = {}) -> Dict[str, Any]:
|
||||
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
|
||||
params = params or {}
|
||||
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
@@ -2861,7 +2860,7 @@ class NeonBroker:
|
||||
if elapsed > 5:
|
||||
raise RuntimeError(
|
||||
f"timed out waiting {elapsed:.0f}s for storage_broker start: {e}"
|
||||
)
|
||||
) from e
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
break # success
|
||||
@@ -2977,7 +2976,7 @@ def should_skip_file(filename: str) -> bool:
|
||||
#
|
||||
def list_files_to_compare(pgdata_dir: Path) -> List[str]:
|
||||
pgdata_files = []
|
||||
for root, _file, filenames in os.walk(pgdata_dir):
|
||||
for root, _dirs, filenames in os.walk(pgdata_dir):
|
||||
for filename in filenames:
|
||||
rel_dir = os.path.relpath(root, pgdata_dir)
|
||||
# Skip some dirs and files we don't want to compare
|
||||
@@ -3109,3 +3108,18 @@ def last_flush_lsn_upload(
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
|
||||
return last_flush_lsn
|
||||
|
||||
|
||||
def parse_project_git_version_output(s: str) -> str:
|
||||
"""
|
||||
Parses the git commit hash out of the --version output supported at least by neon_local.
|
||||
|
||||
The information is generated by utils::project_git_version!
|
||||
"""
|
||||
import re
|
||||
|
||||
res = re.search(r"git(-env)?:([0-9a-fA-F]{8,40})(-\S+)?", s)
|
||||
if res and (commit := res.group(2)):
|
||||
return commit
|
||||
|
||||
raise ValueError(f"unable to parse --version output: '{s}'")
|
||||
|
||||
@@ -193,8 +193,7 @@ class PageserverHttpClient(requests.Session):
|
||||
body = "null"
|
||||
else:
|
||||
# null-config is prohibited by the API
|
||||
if config is None:
|
||||
config = {}
|
||||
config = config or {}
|
||||
body = json.dumps({"config": config})
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach",
|
||||
|
||||
@@ -95,7 +95,7 @@ def query_scalar(cur: cursor, query: str) -> Any:
|
||||
def get_dir_size(path: str) -> int:
|
||||
"""Return size in bytes."""
|
||||
totalbytes = 0
|
||||
for root, dirs, files in os.walk(path):
|
||||
for root, _dirs, files in os.walk(path):
|
||||
for name in files:
|
||||
try:
|
||||
totalbytes += os.path.getsize(os.path.join(root, name))
|
||||
|
||||
@@ -47,7 +47,7 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
|
||||
# without modifying the earlier parts of the table.
|
||||
for step in range(n_steps):
|
||||
cur.execute(f"INSERT INTO t (step) SELECT {step} FROM generate_series(1, {step_size})")
|
||||
for i in range(n_update_iters):
|
||||
for _ in range(n_update_iters):
|
||||
cur.execute(f"UPDATE t set count=count+1 where step = {step}")
|
||||
cur.execute("vacuum t")
|
||||
|
||||
|
||||
@@ -33,6 +33,6 @@ def test_hot_table(env: PgCompare):
|
||||
|
||||
# Read the table
|
||||
with env.record_duration("read"):
|
||||
for i in range(num_reads):
|
||||
for _ in range(num_reads):
|
||||
cur.execute("select * from t;")
|
||||
cur.fetchall()
|
||||
|
||||
@@ -28,7 +28,7 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
endpoint = env.endpoints.create_start("test_layer_map", tenant_id=tenant)
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute("create table t(x integer)")
|
||||
for i in range(n_iters):
|
||||
for _ in range(n_iters):
|
||||
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ from fixtures.neon_fixtures import PgProtocol
|
||||
|
||||
|
||||
async def repeat_bytes(buf, repetitions: int):
|
||||
for i in range(repetitions):
|
||||
for _ in range(repetitions):
|
||||
yield buf
|
||||
|
||||
|
||||
|
||||
@@ -77,8 +77,8 @@ def test_random_writes(neon_with_baseline: PgCompare):
|
||||
|
||||
# Update random keys
|
||||
with env.record_duration("run"):
|
||||
for it in range(n_iterations):
|
||||
for i in range(n_writes):
|
||||
for _ in range(n_iterations):
|
||||
for _ in range(n_writes):
|
||||
key = random.randint(1, n_rows)
|
||||
cur.execute(f"update Big set count=count+1 where pk={key}")
|
||||
env.flush()
|
||||
|
||||
@@ -61,5 +61,5 @@ def test_seqscans(env: PgCompare, scale: int, rows: int, iters: int, workers: in
|
||||
cur.execute(f"set max_parallel_workers_per_gather = {workers}")
|
||||
|
||||
with env.record_duration("run"):
|
||||
for i in range(iters):
|
||||
for _ in range(iters):
|
||||
cur.execute("select count(*) from t;")
|
||||
|
||||
@@ -43,7 +43,17 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
|
||||
if endpoint:
|
||||
endpoint.start()
|
||||
else:
|
||||
endpoint = env.endpoints.create_start("test_startup")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_startup",
|
||||
# Shared buffers need to be allocated during startup, so they
|
||||
# impact startup time. This is the default value we use for
|
||||
# 1CPU pods (maybe different for VMs).
|
||||
#
|
||||
# TODO extensions also contribute to shared memory allocation,
|
||||
# and this test doesn't include all default extensions we
|
||||
# load.
|
||||
config_lines=["shared_buffers=262144"],
|
||||
)
|
||||
endpoint.safe_psql("select 1;")
|
||||
|
||||
# Get metrics
|
||||
@@ -60,6 +70,11 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
|
||||
value = metrics[key]
|
||||
zenbenchmark.record(name, value, "ms", report=MetricReport.LOWER_IS_BETTER)
|
||||
|
||||
# Check basebackup size makes sense
|
||||
basebackup_bytes = metrics["basebackup_bytes"]
|
||||
if i > 0:
|
||||
assert basebackup_bytes < 100 * 1024
|
||||
|
||||
# Stop so we can restart
|
||||
endpoint.stop()
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ async def run(**kwargs) -> asyncpg.Record:
|
||||
|
||||
if __name__ == "__main__":
|
||||
kwargs = {
|
||||
k.lstrip("NEON_").lower(): v
|
||||
k.removeprefix("NEON_").lower(): v
|
||||
for k in ("NEON_HOST", "NEON_DATABASE", "NEON_USER", "NEON_PASSWORD")
|
||||
if (v := os.environ.get(k, None)) is not None
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import pg8000.dbapi
|
||||
|
||||
if __name__ == "__main__":
|
||||
kwargs = {
|
||||
k.lstrip("NEON_").lower(): v
|
||||
k.removeprefix("NEON_").lower(): v
|
||||
for k in ("NEON_HOST", "NEON_DATABASE", "NEON_USER", "NEON_PASSWORD")
|
||||
if (v := os.environ.get(k, None)) is not None
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
|
||||
}
|
||||
)
|
||||
|
||||
pageserver_http.configure_failpoints(("flush-frozen-before-sync", "sleep(10000)"))
|
||||
pageserver_http.configure_failpoints(("flush-frozen-pausable", "sleep(10000)"))
|
||||
|
||||
endpoint_branch0 = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
branch0_cur = endpoint_branch0.connect().cursor()
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from contextlib import closing
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgProtocol
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
@@ -106,7 +107,7 @@ def test_auth_failures(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
if expect_success:
|
||||
op()
|
||||
else:
|
||||
with pytest.raises(Exception):
|
||||
with pytest.raises(psycopg2.Error):
|
||||
op()
|
||||
|
||||
def check_pageserver(expect_success: bool, **conn_kwargs):
|
||||
|
||||
@@ -141,13 +141,13 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
|
||||
log.info("stopping check thread")
|
||||
check_stop_event.set()
|
||||
check_thread.join()
|
||||
assert (
|
||||
False
|
||||
), f"Exception {e} while inserting rows, but WAL lag is within configured threshold. That means backpressure is not tuned properly"
|
||||
raise AssertionError(
|
||||
f"Exception {e} while inserting rows, but WAL lag is within configured threshold. That means backpressure is not tuned properly"
|
||||
) from e
|
||||
else:
|
||||
assert (
|
||||
False
|
||||
), f"Exception {e} while inserting rows and WAL lag overflowed configured threshold. That means backpressure doesn't work."
|
||||
raise AssertionError(
|
||||
f"Exception {e} while inserting rows and WAL lag overflowed configured threshold. That means backpressure doesn't work."
|
||||
) from e
|
||||
|
||||
log.info(f"inserted {rows_inserted} rows")
|
||||
|
||||
@@ -157,9 +157,9 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
|
||||
check_thread.join()
|
||||
log.info("check thread stopped")
|
||||
else:
|
||||
assert (
|
||||
False
|
||||
), "WAL lag overflowed configured threshold. That means backpressure doesn't work."
|
||||
raise AssertionError(
|
||||
"WAL lag overflowed configured threshold. That means backpressure doesn't work."
|
||||
)
|
||||
|
||||
|
||||
# TODO test_backpressure_disk_consistent_lsn_lag. Play with pageserver's checkpoint settings
|
||||
|
||||
@@ -26,7 +26,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
tenant_timelines: List[Tuple[TenantId, TimelineId, Endpoint]] = []
|
||||
|
||||
for n in range(4):
|
||||
for _ in range(4):
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
|
||||
@@ -12,7 +12,7 @@ def test_build_info_metric(neon_env_builder: NeonEnvBuilder, link_proxy: NeonPro
|
||||
parsed_metrics["safekeeper"] = parse_metrics(env.safekeepers[0].http_client().get_metrics_str())
|
||||
parsed_metrics["proxy"] = parse_metrics(link_proxy.get_metrics())
|
||||
|
||||
for component, metrics in parsed_metrics.items():
|
||||
for _component, metrics in parsed_metrics.items():
|
||||
sample = metrics.query_one("libmetrics_build_info")
|
||||
|
||||
assert "revision" in sample.labels
|
||||
|
||||
@@ -14,6 +14,7 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
PortDistributor,
|
||||
parse_project_git_version_output,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
@@ -72,9 +73,9 @@ def test_create_snapshot(
|
||||
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
|
||||
)
|
||||
|
||||
pg_bin.run(["pgbench", "--initialize", "--scale=10", endpoint.connstr()])
|
||||
pg_bin.run(["pgbench", "--time=60", "--progress=2", endpoint.connstr()])
|
||||
pg_bin.run(
|
||||
pg_bin.run_capture(["pgbench", "--initialize", "--scale=10", endpoint.connstr()])
|
||||
pg_bin.run_capture(["pgbench", "--time=60", "--progress=2", endpoint.connstr()])
|
||||
pg_bin.run_capture(
|
||||
["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
|
||||
)
|
||||
|
||||
@@ -352,7 +353,7 @@ def prepare_snapshot(
|
||||
# get git SHA of neon binary
|
||||
def get_neon_version(neon_binpath: Path):
|
||||
out = subprocess.check_output([neon_binpath / "neon_local", "--version"]).decode("utf-8")
|
||||
return out.split("git:", 1)[1].rstrip()
|
||||
return parse_project_git_version_output(out)
|
||||
|
||||
|
||||
def check_neon_works(
|
||||
@@ -404,7 +405,9 @@ def check_neon_works(
|
||||
request.addfinalizer(lambda: cli_current.endpoint_stop("main"))
|
||||
|
||||
connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres"
|
||||
pg_bin.run(["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"])
|
||||
pg_bin.run_capture(
|
||||
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"]
|
||||
)
|
||||
initial_dump_differs = dump_differs(
|
||||
repo_dir.parent / "dump.sql",
|
||||
test_output_dir / "dump.sql",
|
||||
@@ -424,7 +427,7 @@ def check_neon_works(
|
||||
shutil.rmtree(repo_dir / "local_fs_remote_storage")
|
||||
timeline_delete_wait_completed(pageserver_http, tenant_id, timeline_id)
|
||||
pageserver_http.timeline_create(pg_version, tenant_id, timeline_id)
|
||||
pg_bin.run(
|
||||
pg_bin.run_capture(
|
||||
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"]
|
||||
)
|
||||
# The assert itself deferred to the end of the test
|
||||
@@ -436,7 +439,7 @@ def check_neon_works(
|
||||
)
|
||||
|
||||
# Check that we can interract with the data
|
||||
pg_bin.run(["pgbench", "--time=10", "--progress=2", connstr])
|
||||
pg_bin.run_capture(["pgbench", "--time=10", "--progress=2", connstr])
|
||||
|
||||
assert not dump_from_wal_differs, "dump from WAL differs"
|
||||
assert not initial_dump_differs, "initial dump differs"
|
||||
|
||||
36
test_runner/regress/test_duplicate_layers.py
Normal file
36
test_runner/regress/test_duplicate_layers.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
|
||||
|
||||
# Test duplicate layer detection
|
||||
#
|
||||
# This test sets fail point at the end of first compaction phase:
|
||||
# after flushing new L1 layers but before deletion of L0 layers
|
||||
# it should cause generation of duplicate L1 layer by compaction after restart.
|
||||
@pytest.mark.timeout(600)
|
||||
def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# Use aggressive compaction and checkpoint settings
|
||||
tenant_id, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
"compaction_period": "5 s",
|
||||
"compaction_threshold": "3",
|
||||
}
|
||||
)
|
||||
|
||||
pageserver_http.configure_failpoints(("compact-level0-phase1-return-same", "return"))
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
connstr = endpoint.connstr(options="-csynchronous_commit=off")
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s1", connstr])
|
||||
|
||||
time.sleep(10) # let compaction to be performed
|
||||
assert env.pageserver.log_contains("compact-level0-phase1-return-same")
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])
|
||||
@@ -54,7 +54,7 @@ async def gc(env: NeonEnv, timeline: TimelineId):
|
||||
# At the same time, run UPDATEs and GC
|
||||
async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId):
|
||||
workers = []
|
||||
for worker_id in range(num_connections):
|
||||
for _ in range(num_connections):
|
||||
workers.append(asyncio.create_task(update_table(endpoint)))
|
||||
workers.append(asyncio.create_task(gc(env, timeline)))
|
||||
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import subprocess
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
|
||||
@@ -38,7 +40,7 @@ def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
pageserver_http.configure_failpoints(("after-timeline-gc-removed-layers", "exit"))
|
||||
|
||||
for _ in range(5):
|
||||
with pytest.raises(Exception):
|
||||
with pytest.raises(subprocess.SubprocessError):
|
||||
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
@@ -135,11 +135,11 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
# Importing empty file fails
|
||||
empty_file = os.path.join(test_output_dir, "empty_file")
|
||||
with open(empty_file, "w") as _:
|
||||
with pytest.raises(Exception):
|
||||
with pytest.raises(RuntimeError):
|
||||
import_tar(empty_file, empty_file)
|
||||
|
||||
# Importing corrupt backup fails
|
||||
with pytest.raises(Exception):
|
||||
with pytest.raises(RuntimeError):
|
||||
import_tar(corrupt_base_tar, wal_tar)
|
||||
|
||||
# A tar with trailing garbage is currently accepted. It prints a warnings
|
||||
@@ -149,12 +149,6 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
".*WARN.*ignored .* unexpected bytes after the tar archive.*"
|
||||
)
|
||||
|
||||
# NOTE: delete can easily come before upload operations are completed
|
||||
# https://github.com/neondatabase/neon/issues/4326
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*files not bound to index_file.json, proceeding with their deletion.*"
|
||||
)
|
||||
|
||||
timeline_delete_wait_completed(client, tenant, timeline)
|
||||
|
||||
# Importing correct backup works
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonPageserver
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
|
||||
|
||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/2703")
|
||||
@@ -77,7 +78,7 @@ def test_delta_layer_writer_fail_before_finish(neon_simple_env: NeonEnv):
|
||||
pageserver_http.configure_failpoints(("delta-layer-writer-fail-before-finish", "return"))
|
||||
# Note: we cannot test whether the exception is exactly 'delta-layer-writer-fail-before-finish'
|
||||
# since our code does it in loop, we cannot get this exact error for our request.
|
||||
with pytest.raises(Exception):
|
||||
with pytest.raises(PageserverApiException):
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
new_temp_layer_files = list(
|
||||
|
||||
@@ -8,6 +8,10 @@ from fixtures.utils import query_scalar
|
||||
# Now this test is very minimalistic -
|
||||
# it only checks next_multixact_id field in restored pg_control,
|
||||
# since we don't have functions to check multixact internals.
|
||||
# We do check that the datadir contents exported from the
|
||||
# pageserver match what the running PostgreSQL produced. This
|
||||
# is enough to verify that the WAL records are handled correctly
|
||||
# in the pageserver.
|
||||
#
|
||||
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
@@ -18,8 +22,8 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE TABLE t1(i int primary key);
|
||||
INSERT INTO t1 select * from generate_series(1, 100);
|
||||
CREATE TABLE t1(i int primary key, n_updated int);
|
||||
INSERT INTO t1 select g, 0 from generate_series(1, 50) g;
|
||||
"""
|
||||
)
|
||||
|
||||
@@ -29,21 +33,28 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
|
||||
# Lock entries using parallel connections in a round-robin fashion.
|
||||
nclients = 20
|
||||
update_every = 97
|
||||
connections = []
|
||||
for i in range(nclients):
|
||||
for _ in range(nclients):
|
||||
# Do not turn on autocommit. We want to hold the key-share locks.
|
||||
conn = endpoint.connect(autocommit=False)
|
||||
connections.append(conn)
|
||||
|
||||
# On each iteration, we commit the previous transaction on a connection,
|
||||
# and issue antoher select. Each SELECT generates a new multixact that
|
||||
# and issue another select. Each SELECT generates a new multixact that
|
||||
# includes the new XID, and the XIDs of all the other parallel transactions.
|
||||
# This generates enough traffic on both multixact offsets and members SLRUs
|
||||
# to cross page boundaries.
|
||||
for i in range(5000):
|
||||
for i in range(20000):
|
||||
conn = connections[i % nclients]
|
||||
conn.commit()
|
||||
conn.cursor().execute("select * from t1 for key share")
|
||||
|
||||
# Perform some non-key UPDATEs too, to exercise different multixact
|
||||
# member statuses.
|
||||
if i % update_every == 0:
|
||||
conn.cursor().execute(f"update t1 set n_updated = n_updated + 1 where i = {i % 50}")
|
||||
else:
|
||||
conn.cursor().execute("select * from t1 for key share")
|
||||
|
||||
# We have multixacts now. We can close the connections.
|
||||
for c in connections:
|
||||
|
||||
@@ -1,12 +1,18 @@
|
||||
import os
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import cast
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.neon_fixtures import (
|
||||
DEFAULT_BRANCH_NAME,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
parse_project_git_version_output,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pg_version import PgVersion, skip_on_postgres
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
|
||||
@@ -131,3 +137,66 @@ def test_cli_start_stop(neon_env_builder: NeonEnvBuilder):
|
||||
# Default stop
|
||||
res = env.neon_cli.raw_cli(["stop"])
|
||||
res.check_returncode()
|
||||
|
||||
|
||||
@skip_on_postgres(PgVersion.V14, reason="does not use postgres")
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("BUILD_TYPE") == "debug", reason="unit test for test support, either build works"
|
||||
)
|
||||
def test_parse_project_git_version_output_positive():
|
||||
commit = "b6f77b5816cf1dba12a3bc8747941182ce220846"
|
||||
|
||||
positive = [
|
||||
# most likely when developing locally
|
||||
f"Neon CLI git:{commit}-modified",
|
||||
# when developing locally
|
||||
f"Neon CLI git:{commit}",
|
||||
# this is not produced in practice, but the impl supports it
|
||||
f"Neon CLI git-env:{commit}-modified",
|
||||
# most likely from CI or docker build
|
||||
f"Neon CLI git-env:{commit}",
|
||||
]
|
||||
|
||||
for example in positive:
|
||||
assert parse_project_git_version_output(example) == commit
|
||||
|
||||
|
||||
@skip_on_postgres(PgVersion.V14, reason="does not use postgres")
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("BUILD_TYPE") == "debug", reason="unit test for test support, either build works"
|
||||
)
|
||||
def test_parse_project_git_version_output_local_docker():
|
||||
"""
|
||||
Makes sure the tests don't accept the default version in Dockerfile one gets without providing
|
||||
a commit lookalike in --build-arg GIT_VERSION=XXX
|
||||
"""
|
||||
input = "Neon CLI git-env:local"
|
||||
|
||||
with pytest.raises(ValueError) as e:
|
||||
parse_project_git_version_output(input)
|
||||
|
||||
assert input in str(e)
|
||||
|
||||
|
||||
@skip_on_postgres(PgVersion.V14, reason="does not use postgres")
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("BUILD_TYPE") == "debug", reason="cli api sanity, either build works"
|
||||
)
|
||||
def test_binaries_version_parses(neon_binpath: Path):
|
||||
"""
|
||||
Ensures that we can parse the actual outputs of --version from a set of binaries.
|
||||
|
||||
The list is not meant to be exhaustive, and compute_ctl has a different way for example.
|
||||
"""
|
||||
|
||||
binaries = [
|
||||
"neon_local",
|
||||
"pageserver",
|
||||
"safekeeper",
|
||||
"proxy",
|
||||
"pg_sni_router",
|
||||
"storage_broker",
|
||||
]
|
||||
for bin in binaries:
|
||||
out = subprocess.check_output([neon_binpath / bin, "--version"]).decode("utf-8")
|
||||
parse_project_git_version_output(out)
|
||||
|
||||
@@ -58,12 +58,12 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Make a lot of updates on a single row, generating a lot of WAL. Trigger
|
||||
# garbage collections so that the page server will remove old page versions.
|
||||
for i in range(10):
|
||||
for _ in range(10):
|
||||
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
|
||||
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
|
||||
print_gc_result(gc_result)
|
||||
|
||||
for j in range(100):
|
||||
for _ in range(100):
|
||||
cur.execute("UPDATE foo SET val = val + 1 WHERE id = 1;")
|
||||
|
||||
# All (or at least most of) the updates should've been on the same page, so
|
||||
|
||||
@@ -25,7 +25,7 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
|
||||
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
|
||||
thread.start()
|
||||
|
||||
for i in range(n_restarts):
|
||||
for _ in range(n_restarts):
|
||||
# Stop the pageserver gracefully and restart it.
|
||||
time.sleep(1)
|
||||
env.pageserver.stop()
|
||||
|
||||
@@ -6,7 +6,7 @@ from fixtures.neon_fixtures import Endpoint, NeonEnv
|
||||
|
||||
|
||||
async def repeat_bytes(buf, repetitions: int):
|
||||
for i in range(repetitions):
|
||||
for _ in range(repetitions):
|
||||
yield buf
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import json
|
||||
import subprocess
|
||||
from typing import Any, List
|
||||
from typing import Any, List, Optional
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
@@ -179,7 +179,8 @@ def test_close_on_connections_exit(static_proxy: NeonProxy):
|
||||
def test_sql_over_http(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create role http with login password 'http' superuser")
|
||||
|
||||
def q(sql: str, params: List[Any] = []) -> Any:
|
||||
def q(sql: str, params: Optional[List[Any]] = None) -> Any:
|
||||
params = params or []
|
||||
connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
|
||||
response = requests.post(
|
||||
f"https://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
|
||||
@@ -229,7 +230,8 @@ def test_sql_over_http(static_proxy: NeonProxy):
|
||||
def test_sql_over_http_output_options(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create role http2 with login password 'http2' superuser")
|
||||
|
||||
def q(sql: str, raw_text: bool, array_mode: bool, params: List[Any] = []) -> Any:
|
||||
def q(sql: str, raw_text: bool, array_mode: bool, params: Optional[List[Any]] = None) -> Any:
|
||||
params = params or []
|
||||
connstr = (
|
||||
f"postgresql://http2:http2@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
|
||||
)
|
||||
|
||||
@@ -133,7 +133,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
log.info("Validation page inspect won't allow reading pages of dropped relations")
|
||||
try:
|
||||
c.execute("select * from page_header(get_raw_page('foo', 'main', 0));")
|
||||
assert False, "query should have failed"
|
||||
raise AssertionError("query should have failed")
|
||||
except UndefinedTable as e:
|
||||
log.info("Caught an expected failure: {}".format(e))
|
||||
|
||||
@@ -157,7 +157,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('Unknown', 'main', 0, '0/0'))"
|
||||
)
|
||||
assert False, "query should have failed"
|
||||
raise AssertionError("query should have failed")
|
||||
except UndefinedTable as e:
|
||||
log.info("Caught an expected failure: {}".format(e))
|
||||
|
||||
@@ -169,7 +169,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '0/0'))"
|
||||
)
|
||||
assert False, "query should have failed"
|
||||
raise AssertionError("query should have failed")
|
||||
except IoError as e:
|
||||
log.info("Caught an expected failure: {}".format(e))
|
||||
|
||||
|
||||
@@ -38,8 +38,8 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
# Configure failpoints
|
||||
pageserver_http.configure_failpoints(
|
||||
[
|
||||
("flush-frozen-before-sync", "sleep(2000)"),
|
||||
("checkpoint-after-sync", "exit"),
|
||||
("flush-frozen-pausable", "sleep(2000)"),
|
||||
("flush-frozen-exit", "exit"),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -598,9 +598,6 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
|
||||
".* ERROR .*Error processing HTTP request: InternalServerError\\(timeline is Stopping"
|
||||
)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*files not bound to index_file.json, proceeding with their deletion.*"
|
||||
)
|
||||
timeline_delete_wait_completed(client, tenant_id, timeline_id)
|
||||
|
||||
assert not timeline_path.exists()
|
||||
@@ -777,6 +774,95 @@ def test_empty_branch_remote_storage_upload_on_restart(
|
||||
create_thread.join()
|
||||
|
||||
|
||||
# Regression test for a race condition where files are compactified before the upload,
|
||||
# resulting in the uploading complaining about the file not being found
|
||||
# https://github.com/neondatabase/neon/issues/4526
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_compaction_delete_before_upload(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
remote_storage_kind: RemoteStorageKind,
|
||||
):
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
test_name="test_compaction_delete_before_upload",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# create tenant with config that will determinstically allow
|
||||
# compaction and disables gc
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# Set a small compaction threshold
|
||||
"compaction_threshold": "3",
|
||||
# Disable GC
|
||||
"gc_period": "0s",
|
||||
# disable PITR
|
||||
"pitr_interval": "0s",
|
||||
}
|
||||
)
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
# Build two tables with some data inside
|
||||
endpoint.safe_psql("CREATE TABLE foo AS SELECT x FROM generate_series(1, 10000) g(x)")
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
endpoint.safe_psql("CREATE TABLE bar AS SELECT x FROM generate_series(1, 10000) g(x)")
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
# Now make the flushing hang and update one small piece of data
|
||||
client.configure_failpoints(("flush-frozen-pausable", "pause"))
|
||||
|
||||
endpoint.safe_psql("UPDATE foo SET x = 0 WHERE x = 1")
|
||||
|
||||
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
|
||||
|
||||
q: queue.Queue[Optional[PageserverApiException]] = queue.Queue()
|
||||
barrier = threading.Barrier(2)
|
||||
|
||||
def checkpoint_in_background():
|
||||
barrier.wait()
|
||||
try:
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
q.put(None)
|
||||
except PageserverApiException as e:
|
||||
q.put(e)
|
||||
|
||||
create_thread = threading.Thread(target=checkpoint_in_background)
|
||||
create_thread.start()
|
||||
|
||||
try:
|
||||
barrier.wait()
|
||||
|
||||
time.sleep(4)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
|
||||
client.configure_failpoints(("flush-frozen-pausable", "off"))
|
||||
|
||||
conflict = q.get()
|
||||
|
||||
assert conflict is None
|
||||
finally:
|
||||
create_thread.join()
|
||||
|
||||
# Add a delay for the uploads to run into either the file not found or the
|
||||
time.sleep(4)
|
||||
|
||||
# Ensure that this actually terminates
|
||||
wait_upload_queue_empty(client, tenant_id, timeline_id)
|
||||
|
||||
# For now we are hitting this message.
|
||||
# Maybe in the future the underlying race condition will be fixed,
|
||||
# but until then, ensure that this message is hit instead.
|
||||
assert env.pageserver.log_contains(
|
||||
"File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more."
|
||||
)
|
||||
|
||||
|
||||
def wait_upload_queue_empty(
|
||||
client: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
|
||||
):
|
||||
|
||||
@@ -8,10 +8,10 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
def test_fixture_restart(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
for i in range(3):
|
||||
for _ in range(3):
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
for i in range(3):
|
||||
for _ in range(3):
|
||||
env.safekeepers[0].stop()
|
||||
env.safekeepers[0].start()
|
||||
|
||||
@@ -167,7 +167,7 @@ async def reattach_while_busy(
|
||||
env: NeonEnv, endpoint: Endpoint, pageserver_http: PageserverHttpClient, tenant_id: TenantId
|
||||
):
|
||||
workers = []
|
||||
for worker_id in range(num_connections):
|
||||
for _ in range(num_connections):
|
||||
pg_conn = await endpoint.connect_async()
|
||||
workers.append(asyncio.create_task(update_table(pg_conn)))
|
||||
|
||||
@@ -791,7 +791,7 @@ def test_ignore_while_attaching(
|
||||
pageserver_http.tenant_attach(tenant_id)
|
||||
# Run ignore on the task, thereby cancelling the attach.
|
||||
# XXX This should take priority over attach, i.e., it should cancel the attach task.
|
||||
# But neither the failpoint, nor the proper storage_sync download functions,
|
||||
# But neither the failpoint, nor the proper remote_timeline_client download functions,
|
||||
# are sensitive to task_mgr::shutdown.
|
||||
# This problem is tracked in https://github.com/neondatabase/neon/issues/2996 .
|
||||
# So, for now, effectively, this ignore here will block until attach task completes.
|
||||
|
||||
@@ -80,7 +80,7 @@ def new_pageserver_service(
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
pageserver_process.kill()
|
||||
raise Exception(f"Failed to start pageserver as {cmd}, reason: {e}")
|
||||
raise Exception(f"Failed to start pageserver as {cmd}, reason: {e}") from e
|
||||
|
||||
log.info("new pageserver started")
|
||||
try:
|
||||
|
||||
@@ -94,7 +94,7 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem
|
||||
|
||||
# Wait for the remote storage uploads to finish
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
for tenant, endpoint in tenants_endpoints:
|
||||
for _tenant, endpoint in tenants_endpoints:
|
||||
res = endpoint.safe_psql_many(
|
||||
["SHOW neon.tenant_id", "SHOW neon.timeline_id", "SELECT pg_current_wal_flush_lsn()"]
|
||||
)
|
||||
|
||||
@@ -144,7 +144,7 @@ def test_delete_timeline_post_rm_failure(
|
||||
ps_http.configure_failpoints((failpoint_name, "return"))
|
||||
|
||||
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline)
|
||||
timeline_info = wait_until_timeline_state(
|
||||
wait_until_timeline_state(
|
||||
pageserver_http=ps_http,
|
||||
tenant_id=env.initial_tenant,
|
||||
timeline_id=env.initial_timeline,
|
||||
@@ -152,7 +152,8 @@ def test_delete_timeline_post_rm_failure(
|
||||
iterations=2, # effectively try immediately and retry once in one second
|
||||
)
|
||||
|
||||
timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm"
|
||||
# FIXME: #4719
|
||||
# timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm"
|
||||
|
||||
at_failpoint_log_message = f".*{env.initial_timeline}.*at failpoint {failpoint_name}.*"
|
||||
env.pageserver.allowed_errors.append(at_failpoint_log_message)
|
||||
@@ -326,7 +327,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
)
|
||||
|
||||
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
|
||||
timeline_info = wait_until_timeline_state(
|
||||
wait_until_timeline_state(
|
||||
pageserver_http=ps_http,
|
||||
tenant_id=env.initial_tenant,
|
||||
timeline_id=leaf_timeline_id,
|
||||
@@ -334,7 +335,8 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
iterations=2, # effectively try immediately and retry once in one second
|
||||
)
|
||||
|
||||
timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm"
|
||||
# FIXME: #4719
|
||||
# timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm"
|
||||
|
||||
assert leaf_timeline_path.exists(), "the failpoint didn't work"
|
||||
|
||||
|
||||
@@ -189,7 +189,7 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# If we get here, the timeline size limit failed
|
||||
log.error("Query unexpectedly succeeded")
|
||||
assert False
|
||||
raise AssertionError()
|
||||
|
||||
except psycopg2.errors.DiskFull as err:
|
||||
log.info(f"Query expectedly failed with: {err}")
|
||||
@@ -284,9 +284,9 @@ def test_timeline_initial_logical_size_calculation_cancellation(
|
||||
# give it some time to settle in the state where it waits for size computation task
|
||||
time.sleep(5)
|
||||
if not delete_timeline_success.empty():
|
||||
assert (
|
||||
False
|
||||
), f"test is broken, the {deletion_method} should be stuck waiting for size computation task, got result {delete_timeline_success.get()}"
|
||||
raise AssertionError(
|
||||
f"test is broken, the {deletion_method} should be stuck waiting for size computation task, got result {delete_timeline_success.get()}"
|
||||
)
|
||||
|
||||
log.info(
|
||||
"resume the size calculation. The failpoint checks that the timeline directory still exists."
|
||||
|
||||
@@ -32,7 +32,7 @@ def test_truncate(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
cur.execute("create table t1(x integer)")
|
||||
cur.execute(f"insert into t1 values (generate_series(1,{n_records}))")
|
||||
cur.execute("vacuum t1")
|
||||
for i in range(n_iter):
|
||||
for _ in range(n_iter):
|
||||
cur.execute(f"delete from t1 where x>{n_records//2}")
|
||||
cur.execute("vacuum t1")
|
||||
time.sleep(1) # let pageserver a chance to create image layers
|
||||
|
||||
@@ -13,6 +13,7 @@ from functools import partial
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Optional
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
@@ -46,8 +47,9 @@ def wait_lsn_force_checkpoint(
|
||||
timeline_id: TimelineId,
|
||||
endpoint: Endpoint,
|
||||
ps: NeonPageserver,
|
||||
pageserver_conn_options={},
|
||||
pageserver_conn_options=None,
|
||||
):
|
||||
pageserver_conn_options = pageserver_conn_options or {}
|
||||
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
log.info(f"pg_current_wal_flush_lsn is {lsn}, waiting for it on pageserver")
|
||||
|
||||
@@ -865,6 +867,41 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
assert debug_dump_1["config"]["id"] == env.safekeepers[0].id
|
||||
|
||||
|
||||
# Test auth on WAL service (postgres protocol) ports.
|
||||
def test_sk_auth(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.auth_enabled = True
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_sk_auth")
|
||||
endpoint = env.endpoints.create_start("test_sk_auth")
|
||||
|
||||
sk = env.safekeepers[0]
|
||||
|
||||
# learn neon timeline from compute
|
||||
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
|
||||
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
|
||||
|
||||
tenant_token = env.auth_keys.generate_tenant_token(tenant_id)
|
||||
full_token = env.auth_keys.generate_safekeeper_token()
|
||||
|
||||
conn_opts = {
|
||||
"host": "127.0.0.1",
|
||||
"options": f"-c timeline_id={timeline_id} tenant_id={tenant_id}",
|
||||
}
|
||||
connector = PgProtocol(**conn_opts)
|
||||
# no password, should fail
|
||||
with pytest.raises(psycopg2.OperationalError):
|
||||
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg)
|
||||
# giving password, should be ok with either token on main pg port
|
||||
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg, password=tenant_token)
|
||||
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg, password=full_token)
|
||||
# on tenant only port tenant only token should work
|
||||
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg_tenant_only, password=tenant_token)
|
||||
# but full token should fail
|
||||
with pytest.raises(psycopg2.OperationalError):
|
||||
connector.safe_psql("IDENTIFY_SYSTEM", port=sk.port.pg_tenant_only, password=full_token)
|
||||
|
||||
|
||||
class SafekeeperEnv:
|
||||
def __init__(
|
||||
self,
|
||||
@@ -911,6 +948,7 @@ class SafekeeperEnv:
|
||||
def start_safekeeper(self, i):
|
||||
port = SafekeeperPort(
|
||||
pg=self.port_distributor.get_port(),
|
||||
pg_tenant_only=self.port_distributor.get_port(),
|
||||
http=self.port_distributor.get_port(),
|
||||
)
|
||||
|
||||
@@ -944,7 +982,7 @@ class SafekeeperEnv:
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
safekeeper_process.kill()
|
||||
raise Exception(f"Failed to start safekepeer as {cmd}, reason: {e}")
|
||||
raise Exception(f"Failed to start safekepeer as {cmd}, reason: {e}") from e
|
||||
|
||||
def get_safekeeper_connstrs(self):
|
||||
assert self.safekeepers is not None, "safekeepers are not initialized"
|
||||
@@ -1137,7 +1175,7 @@ def test_wal_deleted_after_broadcast(neon_env_builder: NeonEnvBuilder):
|
||||
collect_stats(endpoint, cur)
|
||||
|
||||
# generate WAL to simulate normal workload
|
||||
for i in range(5):
|
||||
for _ in range(5):
|
||||
generate_wal(cur)
|
||||
collect_stats(endpoint, cur)
|
||||
|
||||
|
||||
@@ -392,7 +392,7 @@ async def run_concurrent_computes(
|
||||
break
|
||||
await asyncio.sleep(0.1)
|
||||
else:
|
||||
assert False, "Timed out while waiting for another query by computes[0]"
|
||||
raise AssertionError("Timed out while waiting for another query by computes[0]")
|
||||
computes[0].stopped = True
|
||||
|
||||
await asyncio.gather(background_tasks[0])
|
||||
@@ -545,7 +545,7 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Pat
|
||||
# invalid, to make them unavailable to the endpoint. We use
|
||||
# ports 10, 11 and 12 to simulate unavailable safekeepers.
|
||||
config = toml.load(test_output_dir / "repo" / "config")
|
||||
for i, (sk, active) in enumerate(zip(env.safekeepers, active_sk)):
|
||||
for i, (_sk, active) in enumerate(zip(env.safekeepers, active_sk)):
|
||||
if active:
|
||||
config["safekeepers"][i]["pg_port"] = env.safekeepers[i].port.pg
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user