mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-03 11:32:56 +00:00
python: more linting (#4734)
Ruff has "B" class of lints, including B018 which will nag on useless expressions, related to #4719. Enable such lints and fix the existing issues. Most notably: - https://beta.ruff.rs/docs/rules/mutable-argument-default/ - https://beta.ruff.rs/docs/rules/assert-false/ --------- Co-authored-by: Alexander Bayandin <alexander@neon.tech>
This commit is contained in:
@@ -79,6 +79,7 @@ module = [
|
||||
ignore_missing_imports = true
|
||||
|
||||
[tool.ruff]
|
||||
target-version = "py39"
|
||||
extend-exclude = ["vendor/"]
|
||||
ignore = ["E501"]
|
||||
select = [
|
||||
@@ -86,4 +87,5 @@ select = [
|
||||
"F", # Pyflakes
|
||||
"I", # isort
|
||||
"W", # pycodestyle
|
||||
"B", # bugbear
|
||||
]
|
||||
|
||||
@@ -214,8 +214,7 @@ class VanillaPostgres(PgProtocol):
|
||||
assert not self.running
|
||||
self.running = True
|
||||
|
||||
if log_path is None:
|
||||
log_path = os.path.join(self.pgdatadir, "pg.log")
|
||||
log_path = log_path or os.path.join(self.pgdatadir, "pg.log")
|
||||
|
||||
self.pg_bin.run_capture(
|
||||
["pg_ctl", "-w", "-D", str(self.pgdatadir), "-l", log_path, "start"]
|
||||
@@ -396,7 +395,7 @@ def reconstruct_paths(log_dir, pg_bin, base_tar, port: int):
|
||||
|
||||
query = "select relname, pg_relation_filepath(oid) from pg_class"
|
||||
result = vanilla_pg.safe_psql(query, user="cloud_admin", dbname=database)
|
||||
for relname, filepath in result:
|
||||
for _relname, filepath in result:
|
||||
if filepath is not None:
|
||||
if database == "template0copy":
|
||||
# Add all template0copy paths to template0
|
||||
|
||||
@@ -5,7 +5,6 @@ import json
|
||||
import os
|
||||
import re
|
||||
import timeit
|
||||
import warnings
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
@@ -18,6 +17,7 @@ from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
from _pytest.terminal import TerminalReporter
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonPageserver
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
@@ -385,7 +385,7 @@ class NeonBenchmarker:
|
||||
path = f"{repo_dir}/tenants/{tenant_id}/timelines/{timeline_id}"
|
||||
|
||||
totalbytes = 0
|
||||
for root, dirs, files in os.walk(path):
|
||||
for root, _dirs, files in os.walk(path):
|
||||
for name in files:
|
||||
totalbytes += os.path.getsize(os.path.join(root, name))
|
||||
|
||||
@@ -492,7 +492,7 @@ def pytest_terminal_summary(
|
||||
return
|
||||
|
||||
if not result:
|
||||
warnings.warn("no results to store (no passed test suites)")
|
||||
log.warning("no results to store (no passed test suites)")
|
||||
return
|
||||
|
||||
get_out_path(Path(out_dir), revision=revision).write_text(
|
||||
|
||||
@@ -213,7 +213,7 @@ def worker_base_port(worker_seq_no: int) -> int:
|
||||
def get_dir_size(path: str) -> int:
|
||||
"""Return size in bytes."""
|
||||
totalbytes = 0
|
||||
for root, dirs, files in os.walk(path):
|
||||
for root, _dirs, files in os.walk(path):
|
||||
for name in files:
|
||||
totalbytes += os.path.getsize(os.path.join(root, name))
|
||||
|
||||
@@ -1231,7 +1231,7 @@ class AbstractNeonCli(abc.ABC):
|
||||
stderr: {res.stderr}
|
||||
"""
|
||||
log.info(msg)
|
||||
raise Exception(msg) from subprocess.CalledProcessError(
|
||||
raise RuntimeError(msg) from subprocess.CalledProcessError(
|
||||
res.returncode, res.args, res.stdout, res.stderr
|
||||
)
|
||||
return res
|
||||
@@ -1255,10 +1255,8 @@ class NeonCli(AbstractNeonCli):
|
||||
"""
|
||||
Creates a new tenant, returns its id and its initial timeline's id.
|
||||
"""
|
||||
if tenant_id is None:
|
||||
tenant_id = TenantId.generate()
|
||||
if timeline_id is None:
|
||||
timeline_id = TimelineId.generate()
|
||||
tenant_id = tenant_id or TenantId.generate()
|
||||
timeline_id = timeline_id or TimelineId.generate()
|
||||
|
||||
args = [
|
||||
"tenant",
|
||||
@@ -1885,8 +1883,7 @@ class VanillaPostgres(PgProtocol):
|
||||
assert not self.running
|
||||
self.running = True
|
||||
|
||||
if log_path is None:
|
||||
log_path = os.path.join(self.pgdatadir, "pg.log")
|
||||
log_path = log_path or os.path.join(self.pgdatadir, "pg.log")
|
||||
|
||||
self.pg_bin.run_capture(
|
||||
["pg_ctl", "-w", "-D", str(self.pgdatadir), "-l", log_path, "start"]
|
||||
@@ -2346,8 +2343,7 @@ class Endpoint(PgProtocol):
|
||||
if not config_lines:
|
||||
config_lines = []
|
||||
|
||||
if endpoint_id is None:
|
||||
endpoint_id = self.env.generate_endpoint_id()
|
||||
endpoint_id = endpoint_id or self.env.generate_endpoint_id()
|
||||
self.endpoint_id = endpoint_id
|
||||
self.branch_name = branch_name
|
||||
|
||||
@@ -2363,8 +2359,7 @@ class Endpoint(PgProtocol):
|
||||
path = Path("endpoints") / self.endpoint_id / "pgdata"
|
||||
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
|
||||
|
||||
if config_lines is None:
|
||||
config_lines = []
|
||||
config_lines = config_lines or []
|
||||
|
||||
# set small 'max_replication_write_lag' to enable backpressure
|
||||
# and make tests more stable.
|
||||
@@ -2560,8 +2555,7 @@ class EndpointFactory:
|
||||
http_port=self.env.port_distributor.get_port(),
|
||||
)
|
||||
|
||||
if endpoint_id is None:
|
||||
endpoint_id = self.env.generate_endpoint_id()
|
||||
endpoint_id = endpoint_id or self.env.generate_endpoint_id()
|
||||
|
||||
self.num_instances += 1
|
||||
self.endpoints.append(ep)
|
||||
@@ -2641,7 +2635,7 @@ class Safekeeper:
|
||||
if elapsed > 3:
|
||||
raise RuntimeError(
|
||||
f"timed out waiting {elapsed:.0f}s for wal acceptor start: {e}"
|
||||
)
|
||||
) from e
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
break # success
|
||||
@@ -2721,7 +2715,8 @@ class SafekeeperHttpClient(requests.Session):
|
||||
def check_status(self):
|
||||
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
|
||||
|
||||
def debug_dump(self, params: Dict[str, str] = {}) -> Dict[str, Any]:
|
||||
def debug_dump(self, params: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
|
||||
params = params or {}
|
||||
res = self.get(f"http://localhost:{self.port}/v1/debug_dump", params=params)
|
||||
res.raise_for_status()
|
||||
res_json = res.json()
|
||||
@@ -2861,7 +2856,7 @@ class NeonBroker:
|
||||
if elapsed > 5:
|
||||
raise RuntimeError(
|
||||
f"timed out waiting {elapsed:.0f}s for storage_broker start: {e}"
|
||||
)
|
||||
) from e
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
break # success
|
||||
@@ -2977,7 +2972,7 @@ def should_skip_file(filename: str) -> bool:
|
||||
#
|
||||
def list_files_to_compare(pgdata_dir: Path) -> List[str]:
|
||||
pgdata_files = []
|
||||
for root, _file, filenames in os.walk(pgdata_dir):
|
||||
for root, _dirs, filenames in os.walk(pgdata_dir):
|
||||
for filename in filenames:
|
||||
rel_dir = os.path.relpath(root, pgdata_dir)
|
||||
# Skip some dirs and files we don't want to compare
|
||||
|
||||
@@ -193,8 +193,7 @@ class PageserverHttpClient(requests.Session):
|
||||
body = "null"
|
||||
else:
|
||||
# null-config is prohibited by the API
|
||||
if config is None:
|
||||
config = {}
|
||||
config = config or {}
|
||||
body = json.dumps({"config": config})
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach",
|
||||
|
||||
@@ -95,7 +95,7 @@ def query_scalar(cur: cursor, query: str) -> Any:
|
||||
def get_dir_size(path: str) -> int:
|
||||
"""Return size in bytes."""
|
||||
totalbytes = 0
|
||||
for root, dirs, files in os.walk(path):
|
||||
for root, _dirs, files in os.walk(path):
|
||||
for name in files:
|
||||
try:
|
||||
totalbytes += os.path.getsize(os.path.join(root, name))
|
||||
|
||||
@@ -47,7 +47,7 @@ def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchma
|
||||
# without modifying the earlier parts of the table.
|
||||
for step in range(n_steps):
|
||||
cur.execute(f"INSERT INTO t (step) SELECT {step} FROM generate_series(1, {step_size})")
|
||||
for i in range(n_update_iters):
|
||||
for _ in range(n_update_iters):
|
||||
cur.execute(f"UPDATE t set count=count+1 where step = {step}")
|
||||
cur.execute("vacuum t")
|
||||
|
||||
|
||||
@@ -33,6 +33,6 @@ def test_hot_table(env: PgCompare):
|
||||
|
||||
# Read the table
|
||||
with env.record_duration("read"):
|
||||
for i in range(num_reads):
|
||||
for _ in range(num_reads):
|
||||
cur.execute("select * from t;")
|
||||
cur.fetchall()
|
||||
|
||||
@@ -28,7 +28,7 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
endpoint = env.endpoints.create_start("test_layer_map", tenant_id=tenant)
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute("create table t(x integer)")
|
||||
for i in range(n_iters):
|
||||
for _ in range(n_iters):
|
||||
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ from fixtures.neon_fixtures import PgProtocol
|
||||
|
||||
|
||||
async def repeat_bytes(buf, repetitions: int):
|
||||
for i in range(repetitions):
|
||||
for _ in range(repetitions):
|
||||
yield buf
|
||||
|
||||
|
||||
|
||||
@@ -77,8 +77,8 @@ def test_random_writes(neon_with_baseline: PgCompare):
|
||||
|
||||
# Update random keys
|
||||
with env.record_duration("run"):
|
||||
for it in range(n_iterations):
|
||||
for i in range(n_writes):
|
||||
for _ in range(n_iterations):
|
||||
for _ in range(n_writes):
|
||||
key = random.randint(1, n_rows)
|
||||
cur.execute(f"update Big set count=count+1 where pk={key}")
|
||||
env.flush()
|
||||
|
||||
@@ -61,5 +61,5 @@ def test_seqscans(env: PgCompare, scale: int, rows: int, iters: int, workers: in
|
||||
cur.execute(f"set max_parallel_workers_per_gather = {workers}")
|
||||
|
||||
with env.record_duration("run"):
|
||||
for i in range(iters):
|
||||
for _ in range(iters):
|
||||
cur.execute("select count(*) from t;")
|
||||
|
||||
@@ -19,7 +19,7 @@ async def run(**kwargs) -> asyncpg.Record:
|
||||
|
||||
if __name__ == "__main__":
|
||||
kwargs = {
|
||||
k.lstrip("NEON_").lower(): v
|
||||
k.removeprefix("NEON_").lower(): v
|
||||
for k in ("NEON_HOST", "NEON_DATABASE", "NEON_USER", "NEON_PASSWORD")
|
||||
if (v := os.environ.get(k, None)) is not None
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import pg8000.dbapi
|
||||
|
||||
if __name__ == "__main__":
|
||||
kwargs = {
|
||||
k.lstrip("NEON_").lower(): v
|
||||
k.removeprefix("NEON_").lower(): v
|
||||
for k in ("NEON_HOST", "NEON_DATABASE", "NEON_USER", "NEON_PASSWORD")
|
||||
if (v := os.environ.get(k, None)) is not None
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from contextlib import closing
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgProtocol
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
@@ -106,7 +107,7 @@ def test_auth_failures(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
if expect_success:
|
||||
op()
|
||||
else:
|
||||
with pytest.raises(Exception):
|
||||
with pytest.raises(psycopg2.Error):
|
||||
op()
|
||||
|
||||
def check_pageserver(expect_success: bool, **conn_kwargs):
|
||||
|
||||
@@ -141,13 +141,13 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
|
||||
log.info("stopping check thread")
|
||||
check_stop_event.set()
|
||||
check_thread.join()
|
||||
assert (
|
||||
False
|
||||
), f"Exception {e} while inserting rows, but WAL lag is within configured threshold. That means backpressure is not tuned properly"
|
||||
raise AssertionError(
|
||||
f"Exception {e} while inserting rows, but WAL lag is within configured threshold. That means backpressure is not tuned properly"
|
||||
) from e
|
||||
else:
|
||||
assert (
|
||||
False
|
||||
), f"Exception {e} while inserting rows and WAL lag overflowed configured threshold. That means backpressure doesn't work."
|
||||
raise AssertionError(
|
||||
f"Exception {e} while inserting rows and WAL lag overflowed configured threshold. That means backpressure doesn't work."
|
||||
) from e
|
||||
|
||||
log.info(f"inserted {rows_inserted} rows")
|
||||
|
||||
@@ -157,9 +157,9 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
|
||||
check_thread.join()
|
||||
log.info("check thread stopped")
|
||||
else:
|
||||
assert (
|
||||
False
|
||||
), "WAL lag overflowed configured threshold. That means backpressure doesn't work."
|
||||
raise AssertionError(
|
||||
"WAL lag overflowed configured threshold. That means backpressure doesn't work."
|
||||
)
|
||||
|
||||
|
||||
# TODO test_backpressure_disk_consistent_lsn_lag. Play with pageserver's checkpoint settings
|
||||
|
||||
@@ -26,7 +26,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
tenant_timelines: List[Tuple[TenantId, TimelineId, Endpoint]] = []
|
||||
|
||||
for n in range(4):
|
||||
for _ in range(4):
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
|
||||
@@ -12,7 +12,7 @@ def test_build_info_metric(neon_env_builder: NeonEnvBuilder, link_proxy: NeonPro
|
||||
parsed_metrics["safekeeper"] = parse_metrics(env.safekeepers[0].http_client().get_metrics_str())
|
||||
parsed_metrics["proxy"] = parse_metrics(link_proxy.get_metrics())
|
||||
|
||||
for component, metrics in parsed_metrics.items():
|
||||
for _component, metrics in parsed_metrics.items():
|
||||
sample = metrics.query_one("libmetrics_build_info")
|
||||
|
||||
assert "revision" in sample.labels
|
||||
|
||||
@@ -54,7 +54,7 @@ async def gc(env: NeonEnv, timeline: TimelineId):
|
||||
# At the same time, run UPDATEs and GC
|
||||
async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId):
|
||||
workers = []
|
||||
for worker_id in range(num_connections):
|
||||
for _ in range(num_connections):
|
||||
workers.append(asyncio.create_task(update_table(endpoint)))
|
||||
workers.append(asyncio.create_task(gc(env, timeline)))
|
||||
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import subprocess
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
|
||||
|
||||
@@ -38,7 +40,7 @@ def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
pageserver_http.configure_failpoints(("after-timeline-gc-removed-layers", "exit"))
|
||||
|
||||
for _ in range(5):
|
||||
with pytest.raises(Exception):
|
||||
with pytest.raises(subprocess.SubprocessError):
|
||||
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
@@ -135,11 +135,11 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
# Importing empty file fails
|
||||
empty_file = os.path.join(test_output_dir, "empty_file")
|
||||
with open(empty_file, "w") as _:
|
||||
with pytest.raises(Exception):
|
||||
with pytest.raises(RuntimeError):
|
||||
import_tar(empty_file, empty_file)
|
||||
|
||||
# Importing corrupt backup fails
|
||||
with pytest.raises(Exception):
|
||||
with pytest.raises(RuntimeError):
|
||||
import_tar(corrupt_base_tar, wal_tar)
|
||||
|
||||
# A tar with trailing garbage is currently accepted. It prints a warnings
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonPageserver
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
|
||||
|
||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/2703")
|
||||
@@ -77,7 +78,7 @@ def test_delta_layer_writer_fail_before_finish(neon_simple_env: NeonEnv):
|
||||
pageserver_http.configure_failpoints(("delta-layer-writer-fail-before-finish", "return"))
|
||||
# Note: we cannot test whether the exception is exactly 'delta-layer-writer-fail-before-finish'
|
||||
# since our code does it in loop, we cannot get this exact error for our request.
|
||||
with pytest.raises(Exception):
|
||||
with pytest.raises(PageserverApiException):
|
||||
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
new_temp_layer_files = list(
|
||||
|
||||
@@ -30,7 +30,7 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
# Lock entries using parallel connections in a round-robin fashion.
|
||||
nclients = 20
|
||||
connections = []
|
||||
for i in range(nclients):
|
||||
for _ in range(nclients):
|
||||
# Do not turn on autocommit. We want to hold the key-share locks.
|
||||
conn = endpoint.connect(autocommit=False)
|
||||
connections.append(conn)
|
||||
|
||||
@@ -58,12 +58,12 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Make a lot of updates on a single row, generating a lot of WAL. Trigger
|
||||
# garbage collections so that the page server will remove old page versions.
|
||||
for i in range(10):
|
||||
for _ in range(10):
|
||||
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
|
||||
gc_result = pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
|
||||
print_gc_result(gc_result)
|
||||
|
||||
for j in range(100):
|
||||
for _ in range(100):
|
||||
cur.execute("UPDATE foo SET val = val + 1 WHERE id = 1;")
|
||||
|
||||
# All (or at least most of) the updates should've been on the same page, so
|
||||
|
||||
@@ -25,7 +25,7 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
|
||||
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
|
||||
thread.start()
|
||||
|
||||
for i in range(n_restarts):
|
||||
for _ in range(n_restarts):
|
||||
# Stop the pageserver gracefully and restart it.
|
||||
time.sleep(1)
|
||||
env.pageserver.stop()
|
||||
|
||||
@@ -6,7 +6,7 @@ from fixtures.neon_fixtures import Endpoint, NeonEnv
|
||||
|
||||
|
||||
async def repeat_bytes(buf, repetitions: int):
|
||||
for i in range(repetitions):
|
||||
for _ in range(repetitions):
|
||||
yield buf
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import json
|
||||
import subprocess
|
||||
from typing import Any, List
|
||||
from typing import Any, List, Optional
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
@@ -179,7 +179,8 @@ def test_close_on_connections_exit(static_proxy: NeonProxy):
|
||||
def test_sql_over_http(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create role http with login password 'http' superuser")
|
||||
|
||||
def q(sql: str, params: List[Any] = []) -> Any:
|
||||
def q(sql: str, params: Optional[List[Any]] = None) -> Any:
|
||||
params = params or []
|
||||
connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
|
||||
response = requests.post(
|
||||
f"https://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
|
||||
@@ -229,7 +230,8 @@ def test_sql_over_http(static_proxy: NeonProxy):
|
||||
def test_sql_over_http_output_options(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create role http2 with login password 'http2' superuser")
|
||||
|
||||
def q(sql: str, raw_text: bool, array_mode: bool, params: List[Any] = []) -> Any:
|
||||
def q(sql: str, raw_text: bool, array_mode: bool, params: Optional[List[Any]] = None) -> Any:
|
||||
params = params or []
|
||||
connstr = (
|
||||
f"postgresql://http2:http2@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
|
||||
)
|
||||
|
||||
@@ -133,7 +133,7 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
log.info("Validation page inspect won't allow reading pages of dropped relations")
|
||||
try:
|
||||
c.execute("select * from page_header(get_raw_page('foo', 'main', 0));")
|
||||
assert False, "query should have failed"
|
||||
raise AssertionError("query should have failed")
|
||||
except UndefinedTable as e:
|
||||
log.info("Caught an expected failure: {}".format(e))
|
||||
|
||||
@@ -157,7 +157,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('Unknown', 'main', 0, '0/0'))"
|
||||
)
|
||||
assert False, "query should have failed"
|
||||
raise AssertionError("query should have failed")
|
||||
except UndefinedTable as e:
|
||||
log.info("Caught an expected failure: {}".format(e))
|
||||
|
||||
@@ -169,7 +169,7 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
c.execute(
|
||||
"select lsn, lower, upper from page_header(get_raw_page_at_lsn('foo', 'main', 0, '0/0'))"
|
||||
)
|
||||
assert False, "query should have failed"
|
||||
raise AssertionError("query should have failed")
|
||||
except IoError as e:
|
||||
log.info("Caught an expected failure: {}".format(e))
|
||||
|
||||
|
||||
@@ -8,10 +8,10 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
def test_fixture_restart(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
for i in range(3):
|
||||
for _ in range(3):
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
for i in range(3):
|
||||
for _ in range(3):
|
||||
env.safekeepers[0].stop()
|
||||
env.safekeepers[0].start()
|
||||
|
||||
@@ -167,7 +167,7 @@ async def reattach_while_busy(
|
||||
env: NeonEnv, endpoint: Endpoint, pageserver_http: PageserverHttpClient, tenant_id: TenantId
|
||||
):
|
||||
workers = []
|
||||
for worker_id in range(num_connections):
|
||||
for _ in range(num_connections):
|
||||
pg_conn = await endpoint.connect_async()
|
||||
workers.append(asyncio.create_task(update_table(pg_conn)))
|
||||
|
||||
|
||||
@@ -80,7 +80,7 @@ def new_pageserver_service(
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
pageserver_process.kill()
|
||||
raise Exception(f"Failed to start pageserver as {cmd}, reason: {e}")
|
||||
raise Exception(f"Failed to start pageserver as {cmd}, reason: {e}") from e
|
||||
|
||||
log.info("new pageserver started")
|
||||
try:
|
||||
|
||||
@@ -94,7 +94,7 @@ def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Rem
|
||||
|
||||
# Wait for the remote storage uploads to finish
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
for tenant, endpoint in tenants_endpoints:
|
||||
for _tenant, endpoint in tenants_endpoints:
|
||||
res = endpoint.safe_psql_many(
|
||||
["SHOW neon.tenant_id", "SHOW neon.timeline_id", "SELECT pg_current_wal_flush_lsn()"]
|
||||
)
|
||||
|
||||
@@ -144,7 +144,7 @@ def test_delete_timeline_post_rm_failure(
|
||||
ps_http.configure_failpoints((failpoint_name, "return"))
|
||||
|
||||
ps_http.timeline_delete(env.initial_tenant, env.initial_timeline)
|
||||
timeline_info = wait_until_timeline_state(
|
||||
wait_until_timeline_state(
|
||||
pageserver_http=ps_http,
|
||||
tenant_id=env.initial_tenant,
|
||||
timeline_id=env.initial_timeline,
|
||||
@@ -152,7 +152,8 @@ def test_delete_timeline_post_rm_failure(
|
||||
iterations=2, # effectively try immediately and retry once in one second
|
||||
)
|
||||
|
||||
timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm"
|
||||
# FIXME: #4719
|
||||
# timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm"
|
||||
|
||||
at_failpoint_log_message = f".*{env.initial_timeline}.*at failpoint {failpoint_name}.*"
|
||||
env.pageserver.allowed_errors.append(at_failpoint_log_message)
|
||||
@@ -326,7 +327,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
)
|
||||
|
||||
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
|
||||
timeline_info = wait_until_timeline_state(
|
||||
wait_until_timeline_state(
|
||||
pageserver_http=ps_http,
|
||||
tenant_id=env.initial_tenant,
|
||||
timeline_id=leaf_timeline_id,
|
||||
@@ -334,7 +335,8 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
|
||||
iterations=2, # effectively try immediately and retry once in one second
|
||||
)
|
||||
|
||||
timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm"
|
||||
# FIXME: #4719
|
||||
# timeline_info["state"]["Broken"]["reason"] == "failpoint: timeline-delete-after-rm"
|
||||
|
||||
assert leaf_timeline_path.exists(), "the failpoint didn't work"
|
||||
|
||||
|
||||
@@ -189,7 +189,7 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# If we get here, the timeline size limit failed
|
||||
log.error("Query unexpectedly succeeded")
|
||||
assert False
|
||||
raise AssertionError()
|
||||
|
||||
except psycopg2.errors.DiskFull as err:
|
||||
log.info(f"Query expectedly failed with: {err}")
|
||||
@@ -284,9 +284,9 @@ def test_timeline_initial_logical_size_calculation_cancellation(
|
||||
# give it some time to settle in the state where it waits for size computation task
|
||||
time.sleep(5)
|
||||
if not delete_timeline_success.empty():
|
||||
assert (
|
||||
False
|
||||
), f"test is broken, the {deletion_method} should be stuck waiting for size computation task, got result {delete_timeline_success.get()}"
|
||||
raise AssertionError(
|
||||
f"test is broken, the {deletion_method} should be stuck waiting for size computation task, got result {delete_timeline_success.get()}"
|
||||
)
|
||||
|
||||
log.info(
|
||||
"resume the size calculation. The failpoint checks that the timeline directory still exists."
|
||||
|
||||
@@ -32,7 +32,7 @@ def test_truncate(neon_env_builder: NeonEnvBuilder, zenbenchmark):
|
||||
cur.execute("create table t1(x integer)")
|
||||
cur.execute(f"insert into t1 values (generate_series(1,{n_records}))")
|
||||
cur.execute("vacuum t1")
|
||||
for i in range(n_iter):
|
||||
for _ in range(n_iter):
|
||||
cur.execute(f"delete from t1 where x>{n_records//2}")
|
||||
cur.execute("vacuum t1")
|
||||
time.sleep(1) # let pageserver a chance to create image layers
|
||||
|
||||
@@ -46,8 +46,9 @@ def wait_lsn_force_checkpoint(
|
||||
timeline_id: TimelineId,
|
||||
endpoint: Endpoint,
|
||||
ps: NeonPageserver,
|
||||
pageserver_conn_options={},
|
||||
pageserver_conn_options=None,
|
||||
):
|
||||
pageserver_conn_options = pageserver_conn_options or {}
|
||||
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
log.info(f"pg_current_wal_flush_lsn is {lsn}, waiting for it on pageserver")
|
||||
|
||||
@@ -944,7 +945,7 @@ class SafekeeperEnv:
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
safekeeper_process.kill()
|
||||
raise Exception(f"Failed to start safekepeer as {cmd}, reason: {e}")
|
||||
raise Exception(f"Failed to start safekepeer as {cmd}, reason: {e}") from e
|
||||
|
||||
def get_safekeeper_connstrs(self):
|
||||
assert self.safekeepers is not None, "safekeepers are not initialized"
|
||||
@@ -1137,7 +1138,7 @@ def test_wal_deleted_after_broadcast(neon_env_builder: NeonEnvBuilder):
|
||||
collect_stats(endpoint, cur)
|
||||
|
||||
# generate WAL to simulate normal workload
|
||||
for i in range(5):
|
||||
for _ in range(5):
|
||||
generate_wal(cur)
|
||||
collect_stats(endpoint, cur)
|
||||
|
||||
|
||||
@@ -392,7 +392,7 @@ async def run_concurrent_computes(
|
||||
break
|
||||
await asyncio.sleep(0.1)
|
||||
else:
|
||||
assert False, "Timed out while waiting for another query by computes[0]"
|
||||
raise AssertionError("Timed out while waiting for another query by computes[0]")
|
||||
computes[0].stopped = True
|
||||
|
||||
await asyncio.gather(background_tasks[0])
|
||||
@@ -545,7 +545,7 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Pat
|
||||
# invalid, to make them unavailable to the endpoint. We use
|
||||
# ports 10, 11 and 12 to simulate unavailable safekeepers.
|
||||
config = toml.load(test_output_dir / "repo" / "config")
|
||||
for i, (sk, active) in enumerate(zip(env.safekeepers, active_sk)):
|
||||
for i, (_sk, active) in enumerate(zip(env.safekeepers, active_sk)):
|
||||
if active:
|
||||
config["safekeepers"][i]["pg_port"] = env.safekeepers[i].port.pg
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user