mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 15:10:44 +00:00
Merge branch 'main' into vlad/get-vectored-read-path
This commit is contained in:
@@ -96,5 +96,6 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_evictions_total",
|
||||
"pageserver_evictions_with_low_residence_duration_total",
|
||||
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
||||
# "pageserver_directory_entries_count", -- only used if above a certain threshold
|
||||
# "pageserver_broken_tenants_count" -- used only for broken
|
||||
)
|
||||
|
||||
@@ -23,7 +23,7 @@ from itertools import chain, product
|
||||
from pathlib import Path
|
||||
from types import TracebackType
|
||||
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Type, Union, cast
|
||||
from urllib.parse import urlparse
|
||||
from urllib.parse import quote, urlparse
|
||||
|
||||
import asyncpg
|
||||
import backoff
|
||||
@@ -904,7 +904,7 @@ class NeonEnvBuilder:
|
||||
|
||||
if self.scrub_on_exit:
|
||||
try:
|
||||
S3Scrubber(self.test_output_dir, self).scan_metadata()
|
||||
S3Scrubber(self).scan_metadata()
|
||||
except Exception as e:
|
||||
log.error(f"Error during remote storage scrub: {e}")
|
||||
cleanup_error = e
|
||||
@@ -1407,7 +1407,6 @@ class AbstractNeonCli(abc.ABC):
|
||||
|
||||
args = [bin_neon] + arguments
|
||||
log.info('Running command "{}"'.format(" ".join(args)))
|
||||
log.info(f'Running in "{self.env.repo_dir}"')
|
||||
|
||||
env_vars = os.environ.copy()
|
||||
env_vars["NEON_REPO_DIR"] = str(self.env.repo_dir)
|
||||
@@ -1823,6 +1822,7 @@ class NeonCli(AbstractNeonCli):
|
||||
endpoint_id: str,
|
||||
destroy=False,
|
||||
check_return_code=True,
|
||||
mode: Optional[str] = None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
args = [
|
||||
"endpoint",
|
||||
@@ -1830,6 +1830,8 @@ class NeonCli(AbstractNeonCli):
|
||||
]
|
||||
if destroy:
|
||||
args.append("--destroy")
|
||||
if mode is not None:
|
||||
args.append(f"--mode={mode}")
|
||||
if endpoint_id is not None:
|
||||
args.append(endpoint_id)
|
||||
|
||||
@@ -1956,6 +1958,15 @@ class NeonAttachmentService:
|
||||
|
||||
return headers
|
||||
|
||||
def ready(self) -> bool:
|
||||
resp = self.request("GET", f"{self.env.attachment_service_api}/ready")
|
||||
if resp.status_code == 503:
|
||||
return False
|
||||
elif resp.status_code == 200:
|
||||
return True
|
||||
else:
|
||||
raise RuntimeError(f"Unexpected status {resp.status_code} from readiness endpoint")
|
||||
|
||||
def attach_hook_issue(
|
||||
self, tenant_shard_id: Union[TenantId, TenantShardId], pageserver_id: int
|
||||
) -> int:
|
||||
@@ -2454,6 +2465,7 @@ def pg_bin(test_output_dir: Path, pg_distrib_dir: Path, pg_version: PgVersion) -
|
||||
return PgBin(test_output_dir, pg_distrib_dir, pg_version)
|
||||
|
||||
|
||||
# TODO make port an optional argument
|
||||
class VanillaPostgres(PgProtocol):
|
||||
def __init__(self, pgdatadir: Path, pg_bin: PgBin, port: int, init: bool = True):
|
||||
super().__init__(host="localhost", port=port, dbname="postgres")
|
||||
@@ -2817,8 +2829,8 @@ class NeonProxy(PgProtocol):
|
||||
|
||||
def http_query(self, query, args, **kwargs):
|
||||
# TODO maybe use default values if not provided
|
||||
user = kwargs["user"]
|
||||
password = kwargs["password"]
|
||||
user = quote(kwargs["user"])
|
||||
password = quote(kwargs["password"])
|
||||
expected_code = kwargs.get("expected_code")
|
||||
|
||||
connstr = f"postgresql://{user}:{password}@{self.domain}:{self.proxy_port}/postgres"
|
||||
@@ -3138,10 +3150,7 @@ class Endpoint(PgProtocol):
|
||||
log.info(json.dumps(dict(data_dict, **kwargs)))
|
||||
json.dump(dict(data_dict, **kwargs), file, indent=4)
|
||||
|
||||
# Please note: if you didn't respec this endpoint to have the `migrations`
|
||||
# feature, this function will probably fail because neon_migration.migration_id
|
||||
# won't exist. This is temporary - soon we'll get rid of the feature flag and
|
||||
# migrations will be enabled for everyone.
|
||||
# Please note: Migrations only run if pg_skip_catalog_updates is false
|
||||
def wait_for_migrations(self):
|
||||
with self.cursor() as cur:
|
||||
|
||||
@@ -3163,7 +3172,7 @@ class Endpoint(PgProtocol):
|
||||
with open(remote_extensions_spec_path, "w") as file:
|
||||
json.dump(spec, file, indent=4)
|
||||
|
||||
def stop(self) -> "Endpoint":
|
||||
def stop(self, mode: str = "fast") -> "Endpoint":
|
||||
"""
|
||||
Stop the Postgres instance if it's running.
|
||||
Returns self.
|
||||
@@ -3172,13 +3181,13 @@ class Endpoint(PgProtocol):
|
||||
if self.running:
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, check_return_code=self.check_stop_result
|
||||
self.endpoint_id, check_return_code=self.check_stop_result, mode=mode
|
||||
)
|
||||
self.running = False
|
||||
|
||||
return self
|
||||
|
||||
def stop_and_destroy(self) -> "Endpoint":
|
||||
def stop_and_destroy(self, mode: str = "immediate") -> "Endpoint":
|
||||
"""
|
||||
Stop the Postgres instance, then destroy the endpoint.
|
||||
Returns self.
|
||||
@@ -3186,7 +3195,7 @@ class Endpoint(PgProtocol):
|
||||
|
||||
assert self.endpoint_id is not None
|
||||
self.env.neon_cli.endpoint_stop(
|
||||
self.endpoint_id, True, check_return_code=self.check_stop_result
|
||||
self.endpoint_id, True, check_return_code=self.check_stop_result, mode=mode
|
||||
)
|
||||
self.endpoint_id = None
|
||||
self.running = False
|
||||
@@ -3657,9 +3666,9 @@ class SafekeeperHttpClient(requests.Session):
|
||||
|
||||
|
||||
class S3Scrubber:
|
||||
def __init__(self, log_dir: Path, env: NeonEnvBuilder):
|
||||
def __init__(self, env: NeonEnvBuilder, log_dir: Optional[Path] = None):
|
||||
self.env = env
|
||||
self.log_dir = log_dir
|
||||
self.log_dir = log_dir or env.test_output_dir
|
||||
|
||||
def scrubber_cli(self, args: list[str], timeout) -> str:
|
||||
assert isinstance(self.env.pageserver_remote_storage, S3Storage)
|
||||
@@ -3680,7 +3689,7 @@ class S3Scrubber:
|
||||
args = base_args + args
|
||||
|
||||
(output_path, stdout, status_code) = subprocess_capture(
|
||||
self.log_dir,
|
||||
self.env.test_output_dir,
|
||||
args,
|
||||
echo_stderr=True,
|
||||
echo_stdout=True,
|
||||
@@ -4064,7 +4073,7 @@ def logical_replication_sync(subscriber: VanillaPostgres, publisher: Endpoint) -
|
||||
|
||||
|
||||
def tenant_get_shards(
|
||||
env: NeonEnv, tenant_id: TenantId, pageserver_id: Optional[int]
|
||||
env: NeonEnv, tenant_id: TenantId, pageserver_id: Optional[int] = None
|
||||
) -> list[tuple[TenantShardId, NeonPageserver]]:
|
||||
"""
|
||||
Helper for when you want to talk to one or more pageservers, and the
|
||||
|
||||
@@ -563,13 +563,13 @@ class PageserverHttpClient(requests.Session):
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
timeline_id: TimelineId,
|
||||
timestamp,
|
||||
timestamp: datetime,
|
||||
):
|
||||
log.info(
|
||||
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
|
||||
)
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp.isoformat()}Z",
|
||||
)
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
|
||||
@@ -2,57 +2,58 @@ import os
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from _pytest.python import Metafunc
|
||||
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
"""
|
||||
Dynamically parametrize tests by Postgres version, build type (debug/release/remote), and possibly by other parameters
|
||||
Dynamically parametrize tests by different parameters
|
||||
"""
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def pg_version(request: FixtureRequest) -> Optional[PgVersion]:
|
||||
# Do not parametrize performance tests yet, we need to prepare grafana charts first
|
||||
if "test_runner/performance" in str(request.node.path):
|
||||
v = os.environ.get("DEFAULT_PG_VERSION")
|
||||
return PgVersion(v)
|
||||
|
||||
def pg_version() -> Optional[PgVersion]:
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def build_type(request: FixtureRequest) -> Optional[str]:
|
||||
# Do not parametrize performance tests yet, we need to prepare grafana charts first
|
||||
if "test_runner/performance" in str(request.node.path):
|
||||
return os.environ.get("BUILD_TYPE", "").lower()
|
||||
|
||||
def build_type() -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def pageserver_virtual_file_io_engine(request: FixtureRequest) -> Optional[str]:
|
||||
def platform() -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def pageserver_virtual_file_io_engine() -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc: Metafunc):
|
||||
if (v := os.environ.get("DEFAULT_PG_VERSION")) is None:
|
||||
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
|
||||
else:
|
||||
pg_versions = [PgVersion(v)]
|
||||
|
||||
if (bt := os.environ.get("BUILD_TYPE")) is None:
|
||||
if (bt := os.getenv("BUILD_TYPE")) is None:
|
||||
build_types = ["debug", "release"]
|
||||
else:
|
||||
build_types = [bt.lower()]
|
||||
|
||||
# Do not parametrize performance tests yet by Postgres version or build type, we need to prepare grafana charts first
|
||||
if "test_runner/performance" not in metafunc.definition._nodeid:
|
||||
metafunc.parametrize("build_type", build_types)
|
||||
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))
|
||||
metafunc.parametrize("build_type", build_types)
|
||||
|
||||
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
|
||||
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
|
||||
else:
|
||||
pg_versions = [PgVersion(v)]
|
||||
|
||||
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))
|
||||
|
||||
# A hacky way to parametrize tests only for `pageserver_virtual_file_io_engine=tokio-epoll-uring`
|
||||
# And do not change test name for default `pageserver_virtual_file_io_engine=std-fs` to keep tests statistics
|
||||
if (io_engine := os.environ.get("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in ("", "std-fs"):
|
||||
if (io_engine := os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in ("", "std-fs"):
|
||||
metafunc.parametrize("pageserver_virtual_file_io_engine", [io_engine])
|
||||
|
||||
# For performance tests, parametrize also by platform
|
||||
if (
|
||||
"test_runner/performance" in metafunc.definition._nodeid
|
||||
and (platform := os.getenv("PLATFORM")) is not None
|
||||
):
|
||||
metafunc.parametrize("platform", [platform.lower()])
|
||||
|
||||
@@ -45,7 +45,6 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
|
||||
# Create branch1.
|
||||
env.neon_cli.create_branch("branch1", "main", tenant_id=tenant, ancestor_start_lsn=lsn_100)
|
||||
endpoint_branch1 = env.endpoints.create_start("branch1", tenant_id=tenant)
|
||||
log.info("postgres is running on 'branch1' branch")
|
||||
|
||||
branch1_cur = endpoint_branch1.connect().cursor()
|
||||
branch1_timeline = TimelineId(query_scalar(branch1_cur, "SHOW neon.timeline_id"))
|
||||
@@ -68,7 +67,6 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
|
||||
# Create branch2.
|
||||
env.neon_cli.create_branch("branch2", "branch1", tenant_id=tenant, ancestor_start_lsn=lsn_200)
|
||||
endpoint_branch2 = env.endpoints.create_start("branch2", tenant_id=tenant)
|
||||
log.info("postgres is running on 'branch2' branch")
|
||||
branch2_cur = endpoint_branch2.connect().cursor()
|
||||
|
||||
branch2_timeline = TimelineId(query_scalar(branch2_cur, "SHOW neon.timeline_id"))
|
||||
|
||||
@@ -107,7 +107,6 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
|
||||
# which is needed for backpressure_lsns() to work
|
||||
endpoint.respec(skip_pg_catalog_updates=False)
|
||||
endpoint.start()
|
||||
log.info("postgres is running on 'test_backpressure' branch")
|
||||
|
||||
# setup check thread
|
||||
check_stop_event = threading.Event()
|
||||
|
||||
@@ -21,7 +21,6 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
|
||||
# Branch at the point where only 100 rows were inserted
|
||||
branch_behind_timeline_id = env.neon_cli.create_branch("test_branch_behind")
|
||||
endpoint_main = env.endpoints.create_start("test_branch_behind")
|
||||
log.info("postgres is running on 'test_branch_behind' branch")
|
||||
|
||||
main_cur = endpoint_main.connect().cursor()
|
||||
|
||||
|
||||
@@ -25,7 +25,6 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
|
||||
]
|
||||
|
||||
endpoint = env.endpoints.create_start("test_clog_truncate", config_lines=config)
|
||||
log.info("postgres is running on test_clog_truncate branch")
|
||||
|
||||
# Install extension containing function needed for test
|
||||
endpoint.safe_psql("CREATE EXTENSION neon_test_utils")
|
||||
@@ -62,7 +61,6 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
|
||||
"test_clog_truncate_new", "test_clog_truncate", ancestor_start_lsn=lsn_after_truncation
|
||||
)
|
||||
endpoint2 = env.endpoints.create_start("test_clog_truncate_new")
|
||||
log.info("postgres is running on test_clog_truncate_new branch")
|
||||
|
||||
# check that new node doesn't contain truncated segment
|
||||
pg_xact_0000_path_new = os.path.join(endpoint2.pg_xact_dir_path(), "0000")
|
||||
|
||||
@@ -112,11 +112,6 @@ def test_create_snapshot(
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
# FIXME: Is this expected?
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
|
||||
)
|
||||
|
||||
pg_bin.run_capture(["pgbench", "--initialize", "--scale=10", endpoint.connstr()])
|
||||
pg_bin.run_capture(["pgbench", "--time=60", "--progress=2", endpoint.connstr()])
|
||||
pg_bin.run_capture(
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
@@ -13,7 +12,6 @@ def test_config(neon_simple_env: NeonEnv):
|
||||
|
||||
# change config
|
||||
endpoint = env.endpoints.create_start("test_config", config_lines=["log_min_messages=debug1"])
|
||||
log.info("postgres is running on test_config branch")
|
||||
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
|
||||
@@ -20,7 +20,6 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
|
||||
env.neon_cli.create_branch("test_createdb", "empty")
|
||||
|
||||
endpoint = env.endpoints.create_start("test_createdb")
|
||||
log.info("postgres is running on 'test_createdb' branch")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
# Cause a 'relmapper' change in the original branch
|
||||
@@ -65,7 +64,6 @@ def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_dropdb", "empty")
|
||||
endpoint = env.endpoints.create_start("test_dropdb")
|
||||
log.info("postgres is running on 'test_dropdb' branch")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("CREATE DATABASE foodb")
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
@@ -10,7 +9,6 @@ def test_createuser(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_createuser", "empty")
|
||||
endpoint = env.endpoints.create_start("test_createuser")
|
||||
log.info("postgres is running on 'test_createuser' branch")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
# Cause a 'relmapper' change in the original branch
|
||||
|
||||
@@ -296,7 +296,6 @@ def test_ddl_forwarding_invalid_db(neon_simple_env: NeonEnv):
|
||||
# Some non-existent url
|
||||
config_lines=["neon.console_url=http://localhost:9999/unknown/api/v0/roles_and_databases"],
|
||||
)
|
||||
log.info("postgres is running on 'test_ddl_forwarding_invalid_db' branch")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("SET neon.forward_ddl = false")
|
||||
|
||||
@@ -893,37 +893,14 @@ def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv):
|
||||
# in its heatmap
|
||||
ps_secondary.http_client().tenant_secondary_download(tenant_id)
|
||||
|
||||
# Configure the secondary pageserver to have a phony small disk size
|
||||
ps_secondary.stop()
|
||||
total_size, _, _ = env.timelines_du(ps_secondary)
|
||||
blocksize = 512
|
||||
total_blocks = (total_size + (blocksize - 1)) // blocksize
|
||||
evict_bytes = total_size // 3
|
||||
|
||||
min_avail_bytes = total_size // 3
|
||||
|
||||
env.pageserver_start_with_disk_usage_eviction(
|
||||
ps_secondary,
|
||||
period="1s",
|
||||
max_usage_pct=100,
|
||||
min_avail_bytes=min_avail_bytes,
|
||||
mock_behavior={
|
||||
"type": "Success",
|
||||
"blocksize": blocksize,
|
||||
"total_blocks": total_blocks,
|
||||
# Only count layer files towards used bytes in the mock_statvfs.
|
||||
# This avoids accounting for metadata files & tenant conf in the tests.
|
||||
"name_filter": ".*__.*",
|
||||
},
|
||||
eviction_order=EvictionOrder.ABSOLUTE_ORDER,
|
||||
)
|
||||
|
||||
def relieved_log_message():
|
||||
assert ps_secondary.log_contains(".*disk usage pressure relieved")
|
||||
|
||||
wait_until(10, 1, relieved_log_message)
|
||||
response = ps_secondary.http_client().disk_usage_eviction_run({"evict_bytes": evict_bytes})
|
||||
log.info(f"{response}")
|
||||
|
||||
post_eviction_total_size, _, _ = env.timelines_du(ps_secondary)
|
||||
|
||||
assert (
|
||||
total_size - post_eviction_total_size >= min_avail_bytes
|
||||
), "we requested at least min_avail_bytes worth of free space"
|
||||
total_size - post_eviction_total_size >= evict_bytes
|
||||
), "we requested at least evict_bytes worth of free space"
|
||||
|
||||
@@ -26,7 +26,6 @@ def test_fullbackup(
|
||||
|
||||
env.neon_cli.create_branch("test_fullbackup")
|
||||
endpoint_main = env.endpoints.create_start("test_fullbackup")
|
||||
log.info("postgres is running on 'test_fullbackup' branch")
|
||||
|
||||
with endpoint_main.cursor() as cur:
|
||||
timeline = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
|
||||
@@ -67,12 +66,6 @@ def test_fullbackup(
|
||||
# Restore from the backup and find the data we inserted
|
||||
port = port_distributor.get_port()
|
||||
with VanillaPostgres(restored_dir_path, pg_bin, port, init=False) as vanilla_pg:
|
||||
# TODO make port an optional argument
|
||||
vanilla_pg.configure(
|
||||
[
|
||||
f"port={port}",
|
||||
]
|
||||
)
|
||||
vanilla_pg.start()
|
||||
num_rows_found = vanilla_pg.safe_psql("select count(*) from tbl;", user="cloud_admin")[0][0]
|
||||
assert num_rows == num_rows_found
|
||||
|
||||
@@ -71,7 +71,6 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
timeline = env.neon_cli.create_branch("test_gc_aggressive", "main")
|
||||
endpoint = env.endpoints.create_start("test_gc_aggressive")
|
||||
log.info("postgres is running on test_gc_aggressive branch")
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
# Create table, and insert the first 100 rows
|
||||
|
||||
@@ -95,16 +95,6 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
".*InternalServerError.*Tenant .* not found.*",
|
||||
".*InternalServerError.*Timeline .* not found.*",
|
||||
".*InternalServerError.*Cannot delete timeline which has child timelines.*",
|
||||
".*ignored .* unexpected bytes after the tar archive.*",
|
||||
]
|
||||
)
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# FIXME: we should clean up pageserver to not print this
|
||||
".*exited with error: unexpected message type: CopyData.*",
|
||||
# FIXME: Is this expected?
|
||||
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -142,12 +132,9 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
|
||||
with pytest.raises(RuntimeError):
|
||||
import_tar(corrupt_base_tar, wal_tar)
|
||||
|
||||
# A tar with trailing garbage is currently accepted. It prints a warnings
|
||||
# to the pageserver log, however. Check that.
|
||||
import_tar(base_plus_garbage_tar, wal_tar)
|
||||
assert env.pageserver.log_contains(
|
||||
".*WARN.*ignored .* unexpected bytes after the tar archive.*"
|
||||
)
|
||||
# Importing a tar with trailing garbage fails
|
||||
with pytest.raises(RuntimeError):
|
||||
import_tar(base_plus_garbage_tar, wal_tar)
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
timeline_delete_wait_completed(client, tenant, timeline)
|
||||
@@ -172,11 +159,6 @@ def test_import_from_pageserver_small(
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# FIXME: Is this expected?
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
|
||||
)
|
||||
|
||||
timeline = env.neon_cli.create_branch("test_import_from_pageserver_small")
|
||||
endpoint = env.endpoints.create_start("test_import_from_pageserver_small")
|
||||
|
||||
|
||||
@@ -21,7 +21,6 @@ def test_layer_bloating(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
"test_logical_replication", config_lines=["log_statement=all"]
|
||||
)
|
||||
|
||||
log.info("postgres is running on 'test_logical_replication' branch")
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
|
||||
@@ -23,7 +23,6 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
)
|
||||
n_resize = 10
|
||||
scale = 10
|
||||
log.info("postgres is running on 'test_lfc_resize' branch")
|
||||
|
||||
def run_pgbench(connstr: str):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
|
||||
@@ -26,7 +26,6 @@ def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
"test_logical_replication", config_lines=["log_statement=all"]
|
||||
)
|
||||
|
||||
log.info("postgres is running on 'test_logical_replication' branch")
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
@@ -315,7 +314,6 @@ def test_slots_and_branching(neon_simple_env: NeonEnv):
|
||||
# Create branch ws.
|
||||
env.neon_cli.create_branch("ws", "main", tenant_id=tenant)
|
||||
ws_branch = env.endpoints.create_start("ws", tenant_id=tenant)
|
||||
log.info("postgres is running on 'ws' branch")
|
||||
|
||||
# Check that we can create slot with the same name
|
||||
ws_cur = ws_branch.connect().cursor()
|
||||
|
||||
@@ -28,7 +28,6 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
timeline_id = env.neon_cli.create_branch("test_lsn_mapping", tenant_id=tenant_id)
|
||||
endpoint_main = env.endpoints.create_start("test_lsn_mapping", tenant_id=tenant_id)
|
||||
timeline_id = endpoint_main.safe_psql("show neon.timeline_id")[0][0]
|
||||
log.info("postgres is running on 'main' branch")
|
||||
|
||||
cur = endpoint_main.connect().cursor()
|
||||
|
||||
@@ -64,18 +63,14 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
# Check edge cases
|
||||
# Timestamp is in the future
|
||||
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||
assert result["kind"] == "future"
|
||||
# make sure that we return a well advanced lsn here
|
||||
assert Lsn(result["lsn"]) > start_lsn
|
||||
|
||||
# Timestamp is in the unreachable past
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||
assert result["kind"] == "past"
|
||||
# make sure that we return the minimum lsn here at the start of the range
|
||||
assert Lsn(result["lsn"]) < start_lsn
|
||||
@@ -83,9 +78,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
# Probe a bunch of timestamps in the valid range
|
||||
for i in range(1, len(tbl), 100):
|
||||
probe_timestamp = tbl[i][1]
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id, probe_timestamp)
|
||||
assert result["kind"] not in ["past", "nodata"]
|
||||
lsn = result["lsn"]
|
||||
# Call get_lsn_by_timestamp to get the LSN
|
||||
@@ -108,9 +101,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Timestamp is in the unreachable past
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id_child, f"{probe_timestamp.isoformat()}Z"
|
||||
)
|
||||
result = client.timeline_get_lsn_by_timestamp(tenant_id, timeline_id_child, probe_timestamp)
|
||||
assert result["kind"] == "past"
|
||||
# make sure that we return the minimum lsn here at the start of the range
|
||||
assert Lsn(result["lsn"]) >= last_flush_lsn
|
||||
@@ -122,7 +113,6 @@ def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
new_timeline_id = env.neon_cli.create_branch("test_ts_of_lsn_api")
|
||||
endpoint_main = env.endpoints.create_start("test_ts_of_lsn_api")
|
||||
log.info("postgres is running on 'test_ts_of_lsn_api' branch")
|
||||
|
||||
cur = endpoint_main.connect().cursor()
|
||||
# Create table, and insert rows, each in a separate transaction
|
||||
|
||||
@@ -10,12 +10,12 @@ def test_migrations(neon_simple_env: NeonEnv):
|
||||
endpoint = env.endpoints.create("test_migrations")
|
||||
log_path = endpoint.endpoint_path() / "compute.log"
|
||||
|
||||
endpoint.respec(skip_pg_catalog_updates=False, features=["migrations"])
|
||||
endpoint.respec(skip_pg_catalog_updates=False)
|
||||
endpoint.start()
|
||||
|
||||
endpoint.wait_for_migrations()
|
||||
|
||||
num_migrations = 3
|
||||
num_migrations = 4
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("SELECT id FROM neon_migration.migration_id")
|
||||
@@ -24,7 +24,7 @@ def test_migrations(neon_simple_env: NeonEnv):
|
||||
|
||||
with open(log_path, "r") as log_file:
|
||||
logs = log_file.read()
|
||||
assert "INFO handle_migrations: Ran 3 migrations" in logs
|
||||
assert f"INFO handle_migrations: Ran {num_migrations} migrations" in logs
|
||||
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
@@ -18,7 +17,6 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env.neon_cli.create_branch("test_multixact", "empty")
|
||||
endpoint = env.endpoints.create_start("test_multixact")
|
||||
|
||||
log.info("postgres is running on 'test_multixact' branch")
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute(
|
||||
"""
|
||||
@@ -78,7 +76,6 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env.neon_cli.create_branch("test_multixact_new", "test_multixact", ancestor_start_lsn=lsn)
|
||||
endpoint_new = env.endpoints.create_start("test_multixact_new")
|
||||
|
||||
log.info("postgres is running on 'test_multixact_new' branch")
|
||||
next_multixact_id_new = endpoint_new.safe_psql(
|
||||
"SELECT next_multixact_id FROM pg_control_checkpoint()"
|
||||
)[0][0]
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
@@ -14,8 +13,6 @@ def test_neon_extension(neon_env_builder: NeonEnvBuilder):
|
||||
endpoint_main.respec(skip_pg_catalog_updates=False)
|
||||
endpoint_main.start()
|
||||
|
||||
log.info("postgres is running on 'test_create_extension_neon' branch")
|
||||
|
||||
with closing(endpoint_main.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT extversion from pg_extension where extname='neon'")
|
||||
|
||||
@@ -12,10 +12,10 @@ def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
env.neon_cli.create_branch("test_neon_superuser_subscriber")
|
||||
sub = env.endpoints.create("test_neon_superuser_subscriber")
|
||||
|
||||
pub.respec(skip_pg_catalog_updates=False, features=["migrations"])
|
||||
pub.respec(skip_pg_catalog_updates=False)
|
||||
pub.start()
|
||||
|
||||
sub.respec(skip_pg_catalog_updates=False, features=["migrations"])
|
||||
sub.respec(skip_pg_catalog_updates=False)
|
||||
sub.start()
|
||||
|
||||
pub.wait_for_migrations()
|
||||
@@ -76,3 +76,21 @@ def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
assert [r[0] for r in res] == [10, 20, 30, 40]
|
||||
|
||||
wait_until(10, 0.5, check_that_changes_propagated)
|
||||
|
||||
# Test that pg_monitor is working for neon_superuser role
|
||||
cur.execute("SELECT query from pg_stat_activity LIMIT 1")
|
||||
assert cur.fetchall()[0][0] != "<insufficient privilege>"
|
||||
# Test that pg_monitor is not working for non neon_superuser role without grant
|
||||
cur.execute("CREATE ROLE not_a_superuser LOGIN PASSWORD 'Password42!'")
|
||||
cur.execute("GRANT not_a_superuser TO neon_superuser WITH ADMIN OPTION")
|
||||
cur.execute("SET ROLE not_a_superuser")
|
||||
cur.execute("SELECT query from pg_stat_activity LIMIT 1")
|
||||
assert cur.fetchall()[0][0] == "<insufficient privilege>"
|
||||
cur.execute("RESET ROLE")
|
||||
# Test that pg_monitor is working for non neon_superuser role with grant
|
||||
cur.execute("GRANT pg_monitor TO not_a_superuser")
|
||||
cur.execute("SET ROLE not_a_superuser")
|
||||
cur.execute("SELECT query from pg_stat_activity LIMIT 1")
|
||||
assert cur.fetchall()[0][0] != "<insufficient privilege>"
|
||||
cur.execute("RESET ROLE")
|
||||
cur.execute("DROP ROLE not_a_superuser")
|
||||
|
||||
@@ -20,7 +20,6 @@ def test_old_request_lsn(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_old_request_lsn", "main")
|
||||
endpoint = env.endpoints.create_start("test_old_request_lsn")
|
||||
log.info("postgres is running on test_old_request_lsn branch")
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
@@ -197,6 +197,14 @@ def test_ondemand_download_timetravel(neon_env_builder: NeonEnvBuilder):
|
||||
##### Stop the first pageserver instance, erase all its data
|
||||
env.endpoints.stop_all()
|
||||
|
||||
# Stop safekeepers and take another checkpoint. The endpoints might
|
||||
# have written a few more bytes during shutdown.
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
current_lsn = Lsn(client.timeline_detail(tenant_id, timeline_id)["last_record_lsn"])
|
||||
|
||||
# wait until pageserver has successfully uploaded all the data to remote storage
|
||||
wait_for_upload(client, tenant_id, timeline_id, current_lsn)
|
||||
|
||||
|
||||
@@ -265,9 +265,7 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Having written a mixture of generation-aware and legacy index_part.json,
|
||||
# ensure the scrubber handles the situation as expected.
|
||||
metadata_summary = S3Scrubber(
|
||||
neon_env_builder.test_output_dir, neon_env_builder
|
||||
).scan_metadata()
|
||||
metadata_summary = S3Scrubber(neon_env_builder).scan_metadata()
|
||||
assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline
|
||||
assert metadata_summary["timeline_count"] == 1
|
||||
assert metadata_summary["timeline_shard_count"] == 1
|
||||
|
||||
@@ -498,7 +498,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
# Scrub the remote storage
|
||||
# ========================
|
||||
# This confirms that the scrubber isn't upset by the presence of the heatmap
|
||||
S3Scrubber(neon_env_builder.test_output_dir, neon_env_builder).scan_metadata()
|
||||
S3Scrubber(neon_env_builder).scan_metadata()
|
||||
|
||||
# Detach secondary and delete tenant
|
||||
# ===================================
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import asyncio
|
||||
from io import BytesIO
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv
|
||||
|
||||
|
||||
@@ -44,7 +43,6 @@ def test_parallel_copy(neon_simple_env: NeonEnv, n_parallel=5):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_parallel_copy", "empty")
|
||||
endpoint = env.endpoints.create_start("test_parallel_copy")
|
||||
log.info("postgres is running on 'test_parallel_copy' branch")
|
||||
|
||||
# Create test table
|
||||
conn = endpoint.connect()
|
||||
|
||||
@@ -16,7 +16,6 @@ def test_pitr_gc(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint_main = env.endpoints.create_start("main")
|
||||
log.info("postgres is running on 'main' branch")
|
||||
|
||||
main_pg_conn = endpoint_main.connect()
|
||||
main_cur = main_pg_conn.cursor()
|
||||
|
||||
@@ -390,14 +390,47 @@ def test_sql_over_http_batch(static_proxy: NeonProxy):
|
||||
assert result[0]["rows"] == [{"answer": 42}]
|
||||
|
||||
|
||||
def test_sql_over_http_batch_output_options(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create role http with login password 'http' superuser")
|
||||
|
||||
connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
|
||||
response = requests.post(
|
||||
f"https://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
|
||||
data=json.dumps(
|
||||
{
|
||||
"queries": [
|
||||
{"query": "select $1 as answer", "params": [42], "arrayMode": True},
|
||||
{"query": "select $1 as answer", "params": [42], "arrayMode": False},
|
||||
]
|
||||
}
|
||||
),
|
||||
headers={
|
||||
"Content-Type": "application/sql",
|
||||
"Neon-Connection-String": connstr,
|
||||
"Neon-Batch-Isolation-Level": "Serializable",
|
||||
"Neon-Batch-Read-Only": "false",
|
||||
"Neon-Batch-Deferrable": "false",
|
||||
},
|
||||
verify=str(static_proxy.test_output_dir / "proxy.crt"),
|
||||
)
|
||||
assert response.status_code == 200
|
||||
results = response.json()["results"]
|
||||
|
||||
assert results[0]["rowAsArray"]
|
||||
assert results[0]["rows"] == [["42"]]
|
||||
|
||||
assert not results[1]["rowAsArray"]
|
||||
assert results[1]["rows"] == [{"answer": "42"}]
|
||||
|
||||
|
||||
def test_sql_over_http_pool(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create user http_auth with password 'http' superuser")
|
||||
|
||||
def get_pid(status: int, pw: str) -> Any:
|
||||
def get_pid(status: int, pw: str, user="http_auth") -> Any:
|
||||
return static_proxy.http_query(
|
||||
GET_CONNECTION_PID_QUERY,
|
||||
[],
|
||||
user="http_auth",
|
||||
user=user,
|
||||
password=pw,
|
||||
expected_code=status,
|
||||
)
|
||||
@@ -418,23 +451,29 @@ def test_sql_over_http_pool(static_proxy: NeonProxy):
|
||||
|
||||
static_proxy.safe_psql("alter user http_auth with password 'http2'")
|
||||
|
||||
# after password change, should open a new connection to verify it
|
||||
pid2 = get_pid(200, "http2")["rows"][0]["pid"]
|
||||
assert pid1 != pid2
|
||||
# after password change, shouldn't open a new connection because it checks password in proxy.
|
||||
rows = get_pid(200, "http2")["rows"]
|
||||
assert rows == [{"pid": pid1}]
|
||||
|
||||
time.sleep(0.02)
|
||||
|
||||
# query should be on an existing connection
|
||||
pid = get_pid(200, "http2")["rows"][0]["pid"]
|
||||
assert pid in [pid1, pid2]
|
||||
|
||||
time.sleep(0.02)
|
||||
|
||||
# old password should not work
|
||||
res = get_pid(400, "http")
|
||||
# incorrect user shouldn't reveal that the user doesn't exists
|
||||
res = get_pid(400, "http", user="http_auth2")
|
||||
assert "password authentication failed for user" in res["message"]
|
||||
|
||||
|
||||
def test_sql_over_http_urlencoding(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create user \"http+auth$$\" with password '%+$^&*@!' superuser")
|
||||
|
||||
static_proxy.http_query(
|
||||
"select 1",
|
||||
[],
|
||||
user="http+auth$$",
|
||||
password="%+$^&*@!",
|
||||
expected_code=200,
|
||||
)
|
||||
|
||||
|
||||
# Beginning a transaction should not impact the next query,
|
||||
# which might come from a completely different client.
|
||||
def test_http_pool_begin(static_proxy: NeonProxy):
|
||||
|
||||
@@ -18,7 +18,6 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
env.neon_cli.create_branch("test_read_validation", "empty")
|
||||
|
||||
endpoint = env.endpoints.create_start("test_read_validation")
|
||||
log.info("postgres is running on 'test_read_validation' branch")
|
||||
|
||||
with closing(endpoint.connect()) as con:
|
||||
with con.cursor() as c:
|
||||
@@ -145,7 +144,6 @@ def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
env.pageserver.allowed_errors.append(".*invalid LSN\\(0\\) in request.*")
|
||||
|
||||
endpoint = env.endpoints.create_start("test_read_validation_neg")
|
||||
log.info("postgres is running on 'test_read_validation_neg' branch")
|
||||
|
||||
with closing(endpoint.connect()) as con:
|
||||
with con.cursor() as c:
|
||||
|
||||
@@ -16,7 +16,6 @@ def test_readonly_node(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_readonly_node", "empty")
|
||||
endpoint_main = env.endpoints.create_start("test_readonly_node")
|
||||
log.info("postgres is running on 'test_readonly_node' branch")
|
||||
|
||||
env.pageserver.allowed_errors.append(".*basebackup .* failed: invalid basebackup lsn.*")
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ def test_pageserver_recovery(neon_env_builder: NeonEnvBuilder):
|
||||
env.neon_cli.create_branch("test_pageserver_recovery", "main")
|
||||
|
||||
endpoint = env.endpoints.create_start("test_pageserver_recovery")
|
||||
log.info("postgres is running on 'test_pageserver_recovery' branch")
|
||||
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
|
||||
@@ -73,9 +73,6 @@ def test_remote_storage_backup_and_restore(
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# FIXME: Is this expected?
|
||||
".*marking .* as locally complete, while it doesnt exist in remote index.*",
|
||||
".*No timelines to attach received.*",
|
||||
".*Failed to get local tenant state.*",
|
||||
# FIXME retry downloads without throwing errors
|
||||
".*failed to load remote timeline.*",
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
tenant_get_shards,
|
||||
)
|
||||
from fixtures.remote_storage import s3_storage
|
||||
from fixtures.types import TimelineId
|
||||
from fixtures.types import TenantShardId, TimelineId
|
||||
from fixtures.workload import Workload
|
||||
|
||||
|
||||
@@ -82,4 +83,175 @@ def test_sharding_smoke(
|
||||
)
|
||||
assert timelines == {env.initial_timeline, timeline_b}
|
||||
|
||||
# TODO: test timeline deletion and tenant deletion (depends on change in attachment_service)
|
||||
|
||||
def test_sharding_split_unsharded(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Test that shard splitting works on a tenant created as unsharded (i.e. with
|
||||
ShardCount(0)).
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
# Check that we created with an unsharded TenantShardId: this is the default,
|
||||
# but check it in case we change the default in future
|
||||
assert env.attachment_service.inspect(TenantShardId(tenant_id, 0, 0)) is not None
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
|
||||
workload.init()
|
||||
workload.write_rows(256)
|
||||
workload.validate()
|
||||
|
||||
# Split one shard into two
|
||||
env.attachment_service.tenant_shard_split(tenant_id, shard_count=2)
|
||||
|
||||
# Check we got the shard IDs we expected
|
||||
assert env.attachment_service.inspect(TenantShardId(tenant_id, 0, 2)) is not None
|
||||
assert env.attachment_service.inspect(TenantShardId(tenant_id, 1, 2)) is not None
|
||||
|
||||
workload.validate()
|
||||
|
||||
|
||||
def test_sharding_split_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Test the basics of shard splitting:
|
||||
- The API results in more shards than we started with
|
||||
- The tenant's data remains readable
|
||||
|
||||
"""
|
||||
|
||||
# We will start with 4 shards and split into 8, then migrate all those
|
||||
# 8 shards onto separate pageservers
|
||||
shard_count = 4
|
||||
split_shard_count = 8
|
||||
neon_env_builder.num_pageservers = split_shard_count
|
||||
|
||||
# 1MiB stripes: enable getting some meaningful data distribution without
|
||||
# writing large quantities of data in this test. The stripe size is given
|
||||
# in number of 8KiB pages.
|
||||
stripe_size = 128
|
||||
|
||||
# Use S3-compatible remote storage so that we can scrub: this test validates
|
||||
# that the scrubber doesn't barf when it sees a sharded tenant.
|
||||
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
||||
neon_env_builder.enable_scrub_on_exit()
|
||||
|
||||
neon_env_builder.preserve_database_files = True
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_shard_count=shard_count, initial_tenant_shard_stripe_size=stripe_size
|
||||
)
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
|
||||
workload.init()
|
||||
|
||||
# Initial data
|
||||
workload.write_rows(256)
|
||||
|
||||
# Note which pageservers initially hold a shard after tenant creation
|
||||
pre_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)]
|
||||
|
||||
# For pageservers holding a shard, validate their ingest statistics
|
||||
# reflect a proper splitting of the WAL.
|
||||
for pageserver in env.pageservers:
|
||||
if pageserver.id not in pre_split_pageserver_ids:
|
||||
continue
|
||||
|
||||
metrics = pageserver.http_client().get_metrics_values(
|
||||
[
|
||||
"pageserver_wal_ingest_records_received_total",
|
||||
"pageserver_wal_ingest_records_committed_total",
|
||||
"pageserver_wal_ingest_records_filtered_total",
|
||||
]
|
||||
)
|
||||
|
||||
log.info(f"Pageserver {pageserver.id} metrics: {metrics}")
|
||||
|
||||
# Not everything received was committed
|
||||
assert (
|
||||
metrics["pageserver_wal_ingest_records_received_total"]
|
||||
> metrics["pageserver_wal_ingest_records_committed_total"]
|
||||
)
|
||||
|
||||
# Something was committed
|
||||
assert metrics["pageserver_wal_ingest_records_committed_total"] > 0
|
||||
|
||||
# Counts are self consistent
|
||||
assert (
|
||||
metrics["pageserver_wal_ingest_records_received_total"]
|
||||
== metrics["pageserver_wal_ingest_records_committed_total"]
|
||||
+ metrics["pageserver_wal_ingest_records_filtered_total"]
|
||||
)
|
||||
|
||||
# TODO: validate that shards have different sizes
|
||||
|
||||
workload.validate()
|
||||
|
||||
assert len(pre_split_pageserver_ids) == 4
|
||||
|
||||
def shards_on_disk(shard_ids):
|
||||
for pageserver in env.pageservers:
|
||||
for shard_id in shard_ids:
|
||||
if pageserver.tenant_dir(shard_id).exists():
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
old_shard_ids = [TenantShardId(tenant_id, i, shard_count) for i in range(0, shard_count)]
|
||||
# Before split, old shards exist
|
||||
assert shards_on_disk(old_shard_ids)
|
||||
|
||||
env.attachment_service.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
||||
|
||||
post_split_pageserver_ids = [loc["node_id"] for loc in env.attachment_service.locate(tenant_id)]
|
||||
# We should have split into 8 shards, on the same 4 pageservers we started on.
|
||||
assert len(post_split_pageserver_ids) == split_shard_count
|
||||
assert len(set(post_split_pageserver_ids)) == shard_count
|
||||
assert set(post_split_pageserver_ids) == set(pre_split_pageserver_ids)
|
||||
|
||||
# The old parent shards should no longer exist on disk
|
||||
assert not shards_on_disk(old_shard_ids)
|
||||
|
||||
workload.validate()
|
||||
|
||||
workload.churn_rows(256)
|
||||
|
||||
workload.validate()
|
||||
|
||||
# Run GC on all new shards, to check they don't barf or delete anything that breaks reads
|
||||
# (compaction was already run as part of churn_rows)
|
||||
all_shards = tenant_get_shards(env, tenant_id)
|
||||
for tenant_shard_id, pageserver in all_shards:
|
||||
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
|
||||
|
||||
# Restart all nodes, to check that the newly created shards are durable
|
||||
for ps in env.pageservers:
|
||||
ps.restart()
|
||||
|
||||
workload.validate()
|
||||
|
||||
migrate_to_pageserver_ids = list(
|
||||
set(p.id for p in env.pageservers) - set(pre_split_pageserver_ids)
|
||||
)
|
||||
assert len(migrate_to_pageserver_ids) == split_shard_count - shard_count
|
||||
|
||||
# Migrate shards away from the node where the split happened
|
||||
for ps_id in pre_split_pageserver_ids:
|
||||
shards_here = [
|
||||
tenant_shard_id
|
||||
for (tenant_shard_id, pageserver) in all_shards
|
||||
if pageserver.id == ps_id
|
||||
]
|
||||
assert len(shards_here) == 2
|
||||
migrate_shard = shards_here[0]
|
||||
destination = migrate_to_pageserver_ids.pop()
|
||||
|
||||
log.info(f"Migrating shard {migrate_shard} from {ps_id} to {destination}")
|
||||
env.neon_cli.tenant_migrate(migrate_shard, destination, timeout_secs=10)
|
||||
|
||||
workload.validate()
|
||||
|
||||
@@ -128,6 +128,38 @@ def test_sharding_service_smoke(
|
||||
assert counts[env.pageservers[2].id] == tenant_shard_count // 2
|
||||
|
||||
|
||||
def test_node_status_after_restart(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# Initially we have two online pageservers
|
||||
nodes = env.attachment_service.node_list()
|
||||
assert len(nodes) == 2
|
||||
|
||||
env.pageservers[1].stop()
|
||||
|
||||
env.attachment_service.stop()
|
||||
env.attachment_service.start()
|
||||
|
||||
# Initially readiness check should fail because we're trying to connect to the offline node
|
||||
assert env.attachment_service.ready() is False
|
||||
|
||||
def is_ready():
|
||||
assert env.attachment_service.ready() is True
|
||||
|
||||
wait_until(30, 1, is_ready)
|
||||
|
||||
# We loaded nodes from database on restart
|
||||
nodes = env.attachment_service.node_list()
|
||||
assert len(nodes) == 2
|
||||
|
||||
# We should still be able to create a tenant, because the pageserver which is still online
|
||||
# should have had its availabilty state set to Active.
|
||||
env.attachment_service.tenant_create(TenantId.generate())
|
||||
|
||||
|
||||
def test_sharding_service_passthrough(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
@@ -310,7 +342,7 @@ def test_sharding_service_compute_hook(
|
||||
notifications.append(request.json)
|
||||
return Response(status=200)
|
||||
|
||||
httpserver.expect_request("/notify", method="POST").respond_with_handler(handler)
|
||||
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -355,3 +387,27 @@ def test_sharding_service_compute_hook(
|
||||
assert notifications[1] == expect
|
||||
|
||||
wait_until(10, 1, received_restart_notification)
|
||||
|
||||
|
||||
def test_sharding_service_debug_apis(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Verify that occasional-use debug APIs work as expected. This is a lightweight test
|
||||
that just hits the endpoints to check that they don't bitrot.
|
||||
"""
|
||||
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.attachment_service.tenant_create(tenant_id, shard_count=2, shard_stripe_size=8192)
|
||||
|
||||
# These APIs are intentionally not implemented as methods on NeonAttachmentService, as
|
||||
# they're just for use in unanticipated circumstances.
|
||||
env.attachment_service.request(
|
||||
"POST", f"{env.attachment_service_api}/debug/v1/node/{env.pageservers[1].id}/drop"
|
||||
)
|
||||
assert len(env.attachment_service.node_list()) == 1
|
||||
|
||||
env.attachment_service.request(
|
||||
"POST", f"{env.attachment_service_api}/debug/v1/tenant/{tenant_id}/drop"
|
||||
)
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
|
||||
|
||||
@@ -13,15 +12,10 @@ def test_subxacts(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env.neon_cli.create_branch("test_subxacts", "empty")
|
||||
endpoint = env.endpoints.create_start("test_subxacts")
|
||||
|
||||
log.info("postgres is running on 'test_subxacts' branch")
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE TABLE t1(i int, j int);
|
||||
"""
|
||||
)
|
||||
cur.execute("CREATE TABLE t1(i int, j int);")
|
||||
|
||||
cur.execute("select pg_switch_wal();")
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
S3Scrubber,
|
||||
last_flush_lsn_upload,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
@@ -19,12 +20,13 @@ from fixtures.pageserver.utils import (
|
||||
assert_prefix_not_empty,
|
||||
poll_for_remote_storage_iterations,
|
||||
tenant_delete_wait_completed,
|
||||
wait_for_upload,
|
||||
wait_tenant_status_404,
|
||||
wait_until_tenant_active,
|
||||
wait_until_tenant_state,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind, available_s3_storages, s3_storage
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import run_pg_bench_small, wait_until
|
||||
from requests.exceptions import ReadTimeout
|
||||
|
||||
@@ -669,3 +671,39 @@ def test_tenant_delete_races_timeline_creation(
|
||||
|
||||
# Zero tenants remain (we deleted the default tenant)
|
||||
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 0
|
||||
|
||||
|
||||
def test_tenant_delete_scrubber(pg_bin: PgBin, neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Validate that creating and then deleting the tenant both survives the scrubber,
|
||||
and that one can run the scrubber without problems.
|
||||
"""
|
||||
|
||||
remote_storage_kind = RemoteStorageKind.MOCK_S3
|
||||
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
|
||||
scrubber = S3Scrubber(neon_env_builder)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
# create a tenant separate from the main tenant so that we have one remaining
|
||||
# after we deleted it, as the scrubber treats empty buckets as an error.
|
||||
(tenant_id, timeline_id) = env.neon_cli.create_tenant()
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
|
||||
run_pg_bench_small(pg_bin, endpoint.connstr())
|
||||
last_flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, last_flush_lsn)
|
||||
env.stop()
|
||||
|
||||
result = scrubber.scan_metadata()
|
||||
assert result["with_warnings"] == []
|
||||
|
||||
env.start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
|
||||
tenant_delete_wait_completed(ps_http, tenant_id, iterations)
|
||||
env.stop()
|
||||
|
||||
scrubber.scan_metadata()
|
||||
assert result["with_warnings"] == []
|
||||
|
||||
@@ -213,8 +213,6 @@ def test_tenant_relocation(
|
||||
|
||||
env.pageservers[0].allowed_errors.extend(
|
||||
[
|
||||
# FIXME: Is this expected?
|
||||
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*",
|
||||
# Needed for detach polling on the original pageserver
|
||||
f".*NotFound: tenant {tenant_id}.*",
|
||||
# We will dual-attach in this test, so stale generations are expected
|
||||
|
||||
@@ -285,7 +285,6 @@ def test_pageserver_with_empty_tenants(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*marking .* as locally complete, while it doesnt exist in remote index.*",
|
||||
".*load failed.*list timelines directory.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -61,11 +61,6 @@ async def all_tenants_workload(env: NeonEnv, tenants_endpoints):
|
||||
def test_tenants_many(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# FIXME: Is this expected?
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
|
||||
)
|
||||
|
||||
tenants_endpoints: List[Tuple[TenantId, Endpoint]] = []
|
||||
|
||||
for _ in range(1, 5):
|
||||
@@ -117,14 +112,6 @@ def test_tenants_attached_after_download(neon_env_builder: NeonEnvBuilder):
|
||||
##### First start, insert secret data and upload it to the remote storage
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
# FIXME: Are these expected?
|
||||
".*No timelines to attach received.*",
|
||||
".*marking .* as locally complete, while it doesnt exist in remote index.*",
|
||||
]
|
||||
)
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
@@ -223,9 +210,6 @@ def test_tenant_redownloads_truncated_file_on_startup(
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*removing local file .* because .*",
|
||||
# FIXME: Are these expected?
|
||||
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*",
|
||||
".*No timelines to attach received.*",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -651,9 +651,7 @@ def test_timeline_delete_works_for_remote_smoke(
|
||||
timeline_ids = [env.initial_timeline]
|
||||
for i in range(2):
|
||||
branch_timeline_id = env.neon_cli.create_branch(f"new{i}", "main")
|
||||
pg = env.endpoints.create_start(f"new{i}")
|
||||
|
||||
with pg.cursor() as cur:
|
||||
with env.endpoints.create_start(f"new{i}") as pg, pg.cursor() as cur:
|
||||
cur.execute("CREATE TABLE f (i integer);")
|
||||
cur.execute("INSERT INTO f VALUES (generate_series(1,1000));")
|
||||
current_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
|
||||
@@ -43,7 +43,6 @@ def test_timeline_size(neon_simple_env: NeonEnv):
|
||||
client.timeline_wait_logical_size(env.initial_tenant, new_timeline_id)
|
||||
|
||||
endpoint_main = env.endpoints.create_start("test_timeline_size")
|
||||
log.info("postgres is running on 'test_timeline_size' branch")
|
||||
|
||||
with closing(endpoint_main.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -79,7 +78,6 @@ def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
|
||||
)
|
||||
|
||||
endpoint_main = env.endpoints.create_start("test_timeline_size_createdropdb")
|
||||
log.info("postgres is running on 'test_timeline_size_createdropdb' branch")
|
||||
|
||||
with closing(endpoint_main.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
@@ -162,8 +160,6 @@ def test_timeline_size_quota_on_startup(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
endpoint_main.start()
|
||||
|
||||
log.info("postgres is running on 'test_timeline_size_quota_on_startup' branch")
|
||||
|
||||
with closing(endpoint_main.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE TABLE foo (t text)")
|
||||
@@ -231,8 +227,6 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
|
||||
endpoint_main.respec(skip_pg_catalog_updates=False)
|
||||
endpoint_main.start()
|
||||
|
||||
log.info("postgres is running on 'test_timeline_size_quota' branch")
|
||||
|
||||
with closing(endpoint_main.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE TABLE foo (t text)")
|
||||
@@ -585,7 +579,6 @@ def test_timeline_size_metrics(
|
||||
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
|
||||
port = port_distributor.get_port()
|
||||
with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
|
||||
vanilla_pg.configure([f"port={port}"])
|
||||
vanilla_pg.start()
|
||||
|
||||
# Create database based on template0 because we can't connect to template0
|
||||
|
||||
@@ -13,7 +13,6 @@ def test_twophase(neon_simple_env: NeonEnv):
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_twophase", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
log.info("postgres is running on 'test_twophase' branch")
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import pytest
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, fork_at_current_lsn
|
||||
|
||||
|
||||
#
|
||||
@@ -13,7 +14,6 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
env.neon_cli.create_branch("test_vm_bit_clear", "empty")
|
||||
endpoint = env.endpoints.create_start("test_vm_bit_clear")
|
||||
|
||||
log.info("postgres is running on 'test_vm_bit_clear' branch")
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
@@ -92,7 +92,6 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
# server at the right point-in-time avoids that full-page image.
|
||||
endpoint_new = env.endpoints.create_start("test_vm_bit_clear_new")
|
||||
|
||||
log.info("postgres is running on 'test_vm_bit_clear_new' branch")
|
||||
pg_new_conn = endpoint_new.connect()
|
||||
cur_new = pg_new_conn.cursor()
|
||||
|
||||
@@ -118,12 +117,20 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
# Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK
|
||||
# record.
|
||||
#
|
||||
# FIXME: This test is broken
|
||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/pull/6412#issuecomment-1902072541")
|
||||
def test_vm_bit_clear_on_heap_lock(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_vm_bit_clear_on_heap_lock(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_conf = {
|
||||
"checkpoint_distance": f"{128 * 1024}",
|
||||
"compaction_target_size": f"{128 * 1024}",
|
||||
"compaction_threshold": "1",
|
||||
# create image layers eagerly, so that GC can remove some layers
|
||||
"image_creation_threshold": "1",
|
||||
# set PITR interval to be small, so we can do GC
|
||||
"pitr_interval": "0 s",
|
||||
}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
|
||||
|
||||
env.neon_cli.create_branch("test_vm_bit_clear_on_heap_lock", "empty")
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_vm_bit_clear_on_heap_lock")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_vm_bit_clear_on_heap_lock",
|
||||
config_lines=[
|
||||
@@ -139,72 +146,88 @@ def test_vm_bit_clear_on_heap_lock(neon_simple_env: NeonEnv):
|
||||
|
||||
# Install extension containing function needed for test
|
||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
cur.execute("SELECT pg_switch_wal()")
|
||||
cur.execute("CREATE EXTENSION pageinspect")
|
||||
|
||||
# Create a test table and freeze it to set the all-frozen VM bit on all pages.
|
||||
cur.execute("CREATE TABLE vmtest_lock (id integer PRIMARY KEY)")
|
||||
cur.execute("INSERT INTO vmtest_lock SELECT g FROM generate_series(1, 50000) g")
|
||||
cur.execute("VACUUM FREEZE vmtest_lock")
|
||||
|
||||
cur.execute("VACUUM (FREEZE, DISABLE_PAGE_SKIPPING true) vmtest_lock")
|
||||
|
||||
# Lock a row. This clears the all-frozen VM bit for that page.
|
||||
cur.execute("BEGIN")
|
||||
cur.execute("SELECT * FROM vmtest_lock WHERE id = 40000 FOR UPDATE")
|
||||
|
||||
# Remember the XID. We will use it later to verify that we have consumed a lot of
|
||||
# XIDs after this.
|
||||
cur.execute("select pg_current_xact_id()")
|
||||
locking_xid = cur.fetchall()[0][0]
|
||||
locking_xid = int(cur.fetchall()[0][0])
|
||||
|
||||
# Stop and restart postgres, to clear the buffer cache.
|
||||
cur.execute("COMMIT")
|
||||
|
||||
# The VM page in shared buffer cache, and the same page as reconstructed
|
||||
# by the pageserver, should be equal.
|
||||
cur.execute("select get_raw_page( 'vmtest_lock', 'vm', 0 )")
|
||||
vm_page_in_cache = (cur.fetchall()[0][0])[:100].hex()
|
||||
cur.execute("select get_raw_page_at_lsn( 'vmtest_lock', 'vm', 0, pg_current_wal_insert_lsn() )")
|
||||
vm_page_at_pageserver = (cur.fetchall()[0][0])[:100].hex()
|
||||
|
||||
assert vm_page_at_pageserver == vm_page_in_cache
|
||||
|
||||
# The above assert is enough to verify the bug that was fixed in
|
||||
# commit 66fa176cc8. But for good measure, we also reproduce the
|
||||
# original problem that the missing VM page update caused. The
|
||||
# rest of the test does that.
|
||||
|
||||
# Kill and restart postgres, to clear the buffer cache.
|
||||
#
|
||||
# NOTE: clear_buffer_cache() will not do, because it evicts the dirty pages
|
||||
# in a "clean" way. Our neon extension will write a full-page image of the VM
|
||||
# page, and we want to avoid that.
|
||||
endpoint.stop()
|
||||
# page, and we want to avoid that. A clean shutdown will also not do, for the
|
||||
# same reason.
|
||||
endpoint.stop(mode="immediate")
|
||||
|
||||
endpoint.start()
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 ")
|
||||
tup = cur.fetchall()
|
||||
xmax_before = tup[0][1]
|
||||
|
||||
# Consume a lot of XIDs, so that anti-wraparound autovacuum kicks
|
||||
# in and the clog gets truncated. We set autovacuum_freeze_max_age to a very
|
||||
# low value, so it doesn't take all that many XIDs for autovacuum to kick in.
|
||||
for i in range(1000):
|
||||
#
|
||||
# We could use test_consume_xids() to consume XIDs much faster,
|
||||
# but it wouldn't speed up the overall test, because we'd still
|
||||
# need to wait for autovacuum to run.
|
||||
for _ in range(1000):
|
||||
cur.execute("select test_consume_xids(10000);")
|
||||
for _ in range(1000):
|
||||
cur.execute(
|
||||
"""
|
||||
CREATE TEMP TABLE othertable (i int) ON COMMIT DROP;
|
||||
do $$
|
||||
begin
|
||||
for i in 1..100000 loop
|
||||
-- Use a begin-exception block to generate a new subtransaction on each iteration
|
||||
begin
|
||||
insert into othertable values (i);
|
||||
exception when others then
|
||||
raise 'not expected %', sqlerrm;
|
||||
end;
|
||||
end loop;
|
||||
end;
|
||||
$$;
|
||||
"""
|
||||
"select get_raw_page_at_lsn( 'vmtest_lock', 'vm', 0, pg_current_wal_insert_lsn() )"
|
||||
)
|
||||
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 ")
|
||||
tup = cur.fetchall()
|
||||
log.info(f"tuple = {tup}")
|
||||
xmax = tup[0][1]
|
||||
assert xmax == xmax_before
|
||||
page = (cur.fetchall()[0][0])[:100].hex()
|
||||
log.info(f"VM page contents: {page}")
|
||||
|
||||
if i % 50 == 0:
|
||||
cur.execute("select datfrozenxid from pg_database where datname='postgres'")
|
||||
datfrozenxid = cur.fetchall()[0][0]
|
||||
if datfrozenxid > locking_xid:
|
||||
break
|
||||
cur.execute("select get_raw_page( 'vmtest_lock', 'vm', 0 )")
|
||||
page = (cur.fetchall()[0][0])[:100].hex()
|
||||
log.info(f"VM page contents in cache: {page}")
|
||||
|
||||
cur.execute("select min(datfrozenxid::text::int) from pg_database")
|
||||
datfrozenxid = int(cur.fetchall()[0][0])
|
||||
log.info(f"datfrozenxid {datfrozenxid} locking_xid: {locking_xid}")
|
||||
if datfrozenxid > locking_xid + 3000000:
|
||||
break
|
||||
time.sleep(0.5)
|
||||
|
||||
cur.execute("select pg_current_xact_id()")
|
||||
curr_xid = cur.fetchall()[0][0]
|
||||
assert int(curr_xid) - int(locking_xid) >= 100000
|
||||
curr_xid = int(cur.fetchall()[0][0])
|
||||
assert curr_xid - locking_xid >= 100000
|
||||
|
||||
# Perform GC in the pageserver. Otherwise the compute might still
|
||||
# be able to download the already-deleted SLRU segment from the
|
||||
# pageserver. That masks the original bug.
|
||||
env.pageserver.http_client().timeline_checkpoint(tenant_id, timeline_id)
|
||||
env.pageserver.http_client().timeline_compact(tenant_id, timeline_id)
|
||||
env.pageserver.http_client().timeline_gc(tenant_id, timeline_id, 0)
|
||||
|
||||
# Now, if the VM all-frozen bit was not correctly cleared on
|
||||
# replay, we will try to fetch the status of the XID that was
|
||||
@@ -214,3 +237,4 @@ def test_vm_bit_clear_on_heap_lock(neon_simple_env: NeonEnv):
|
||||
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 for update")
|
||||
tup = cur.fetchall()
|
||||
log.info(f"tuple = {tup}")
|
||||
cur.execute("commit transaction")
|
||||
|
||||
@@ -280,11 +280,6 @@ def test_broker(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_broker", "main")
|
||||
|
||||
# FIXME: Is this expected?
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
|
||||
)
|
||||
|
||||
endpoint = env.endpoints.create_start("test_broker")
|
||||
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
|
||||
|
||||
@@ -342,11 +337,6 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
neon_env_builder.auth_enabled = auth_enabled
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# FIXME: Is this expected?
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_safekeepers_wal_removal")
|
||||
endpoint = env.endpoints.create_start("test_safekeepers_wal_removal")
|
||||
|
||||
28
test_runner/sql_regress/expected/neon-test-utils.out
Normal file
28
test_runner/sql_regress/expected/neon-test-utils.out
Normal file
@@ -0,0 +1,28 @@
|
||||
-- Test the test utils in pgxn/neon_test_utils. We don't test that
|
||||
-- these actually consume resources like they should - that would be
|
||||
-- tricky - but at least we check that they don't crash.
|
||||
CREATE EXTENSION neon_test_utils;
|
||||
select test_consume_cpu(1);
|
||||
test_consume_cpu
|
||||
------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select test_consume_memory(20); -- Allocate 20 MB
|
||||
test_consume_memory
|
||||
---------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select test_release_memory(5); -- Release 5 MB
|
||||
test_release_memory
|
||||
---------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select test_release_memory(); -- Release the remaining 15 MB
|
||||
test_release_memory
|
||||
---------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
@@ -7,4 +7,5 @@
|
||||
test: neon-cid
|
||||
test: neon-rel-truncate
|
||||
test: neon-clog
|
||||
test: neon-test-utils
|
||||
test: neon-vacuum-full
|
||||
|
||||
11
test_runner/sql_regress/sql/neon-test-utils.sql
Normal file
11
test_runner/sql_regress/sql/neon-test-utils.sql
Normal file
@@ -0,0 +1,11 @@
|
||||
-- Test the test utils in pgxn/neon_test_utils. We don't test that
|
||||
-- these actually consume resources like they should - that would be
|
||||
-- tricky - but at least we check that they don't crash.
|
||||
|
||||
CREATE EXTENSION neon_test_utils;
|
||||
|
||||
select test_consume_cpu(1);
|
||||
|
||||
select test_consume_memory(20); -- Allocate 20 MB
|
||||
select test_release_memory(5); -- Release 5 MB
|
||||
select test_release_memory(); -- Release the remaining 15 MB
|
||||
Reference in New Issue
Block a user