mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 19:10:38 +00:00
Merge remote-tracking branch 'origin/main' into arpad/lsn_by_ts
This commit is contained in:
@@ -361,7 +361,6 @@ class PgProtocol:
|
||||
|
||||
@dataclass
|
||||
class AuthKeys:
|
||||
pub: str
|
||||
priv: str
|
||||
|
||||
def generate_token(self, *, scope: str, **token_data: str) -> str:
|
||||
@@ -877,9 +876,31 @@ class NeonEnv:
|
||||
|
||||
@cached_property
|
||||
def auth_keys(self) -> AuthKeys:
|
||||
pub = (Path(self.repo_dir) / "auth_public_key.pem").read_text()
|
||||
priv = (Path(self.repo_dir) / "auth_private_key.pem").read_text()
|
||||
return AuthKeys(pub=pub, priv=priv)
|
||||
return AuthKeys(priv=priv)
|
||||
|
||||
def regenerate_keys_at(self, privkey_path: Path, pubkey_path: Path):
|
||||
# compare generate_auth_keys() in local_env.rs
|
||||
subprocess.run(
|
||||
["openssl", "genpkey", "-algorithm", "ed25519", "-out", privkey_path],
|
||||
cwd=self.repo_dir,
|
||||
check=True,
|
||||
)
|
||||
|
||||
subprocess.run(
|
||||
[
|
||||
"openssl",
|
||||
"pkey",
|
||||
"-in",
|
||||
privkey_path,
|
||||
"-pubout",
|
||||
"-out",
|
||||
pubkey_path,
|
||||
],
|
||||
cwd=self.repo_dir,
|
||||
check=True,
|
||||
)
|
||||
del self.auth_keys
|
||||
|
||||
def generate_endpoint_id(self) -> str:
|
||||
"""
|
||||
|
||||
@@ -189,6 +189,10 @@ class PageserverHttpClient(requests.Session):
|
||||
assert res_json is None
|
||||
return res_json
|
||||
|
||||
def reload_auth_validation_keys(self):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/reload_auth_validation_keys")
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_list(self) -> List[Dict[Any, Any]]:
|
||||
res = self.get(f"http://localhost:{self.port}/v1/tenant")
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -1,12 +1,35 @@
|
||||
import os
|
||||
from contextlib import closing
|
||||
from pathlib import Path
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgProtocol
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgProtocol,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
|
||||
def assert_client_authorized(env: NeonEnv, http_client: PageserverHttpClient):
|
||||
http_client.timeline_create(
|
||||
pg_version=env.pg_version,
|
||||
tenant_id=env.initial_tenant,
|
||||
new_timeline_id=TimelineId.generate(),
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
)
|
||||
|
||||
|
||||
def assert_client_not_authorized(env: NeonEnv, http_client: PageserverHttpClient):
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match="Forbidden: JWT authentication error",
|
||||
):
|
||||
assert_client_authorized(env, http_client)
|
||||
|
||||
|
||||
def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.auth_enabled = True
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -27,30 +50,14 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
|
||||
ps.safe_psql("set FOO", password=pageserver_token)
|
||||
|
||||
# tenant can create branches
|
||||
tenant_http_client.timeline_create(
|
||||
pg_version=env.pg_version,
|
||||
tenant_id=env.initial_tenant,
|
||||
new_timeline_id=TimelineId.generate(),
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
)
|
||||
assert_client_authorized(env, tenant_http_client)
|
||||
|
||||
# console can create branches for tenant
|
||||
pageserver_http_client.timeline_create(
|
||||
pg_version=env.pg_version,
|
||||
tenant_id=env.initial_tenant,
|
||||
new_timeline_id=TimelineId.generate(),
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
)
|
||||
assert_client_authorized(env, pageserver_http_client)
|
||||
|
||||
# fail to create branch using token with different tenant_id
|
||||
with pytest.raises(
|
||||
PageserverApiException, match="Forbidden: Tenant id mismatch. Permission denied"
|
||||
):
|
||||
invalid_tenant_http_client.timeline_create(
|
||||
pg_version=env.pg_version,
|
||||
tenant_id=env.initial_tenant,
|
||||
new_timeline_id=TimelineId.generate(),
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
)
|
||||
with pytest.raises(PageserverApiException, match="Forbidden: JWT authentication error"):
|
||||
assert_client_authorized(env, invalid_tenant_http_client)
|
||||
|
||||
# create tenant using management token
|
||||
pageserver_http_client.tenant_create(TenantId.generate())
|
||||
@@ -58,7 +65,7 @@ def test_pageserver_auth(neon_env_builder: NeonEnvBuilder):
|
||||
# fail to create tenant using tenant token
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match="Forbidden: Attempt to access management api with tenant scope. Permission denied",
|
||||
match="Forbidden: JWT authentication error",
|
||||
):
|
||||
tenant_http_client.tenant_create(TenantId.generate())
|
||||
|
||||
@@ -82,6 +89,96 @@ def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
assert cur.fetchone() == (5000050000,)
|
||||
|
||||
|
||||
def test_pageserver_multiple_keys(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.auth_enabled = True
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.append(".*Authentication error: InvalidSignature.*")
|
||||
env.pageserver.allowed_errors.append(".*Unauthorized: malformed jwt token.*")
|
||||
|
||||
pageserver_token_old = env.auth_keys.generate_pageserver_token()
|
||||
pageserver_http_client_old = env.pageserver.http_client(pageserver_token_old)
|
||||
|
||||
pageserver_http_client_old.reload_auth_validation_keys()
|
||||
|
||||
# This test is to ensure that the pageserver supports multiple keys.
|
||||
# The neon_local tool generates one key pair at a hardcoded path by default.
|
||||
# As a preparation for our test, move the public key of the key pair into a
|
||||
# directory at the same location as the hardcoded path by:
|
||||
# 1. moving the the file at `configured_pub_key_path` to a temporary location
|
||||
# 2. creating a new directory at `configured_pub_key_path`
|
||||
# 3. moving the file from the temporary location into the newly created directory
|
||||
configured_pub_key_path = Path(env.repo_dir) / "auth_public_key.pem"
|
||||
os.rename(configured_pub_key_path, Path(env.repo_dir) / "auth_public_key.pem.file")
|
||||
os.mkdir(configured_pub_key_path)
|
||||
os.rename(
|
||||
Path(env.repo_dir) / "auth_public_key.pem.file",
|
||||
configured_pub_key_path / "auth_public_key_old.pem",
|
||||
)
|
||||
|
||||
# Add a new key pair
|
||||
# This invalidates env.auth_keys and makes them be regenerated
|
||||
env.regenerate_keys_at(
|
||||
Path("auth_private_key.pem"), Path("auth_public_key.pem/auth_public_key_new.pem")
|
||||
)
|
||||
|
||||
# Reload the keys on the pageserver side
|
||||
pageserver_http_client_old.reload_auth_validation_keys()
|
||||
|
||||
# We can continue doing things using the old token
|
||||
assert_client_authorized(env, pageserver_http_client_old)
|
||||
|
||||
pageserver_token_new = env.auth_keys.generate_pageserver_token()
|
||||
pageserver_http_client_new = env.pageserver.http_client(pageserver_token_new)
|
||||
|
||||
# The new token also works
|
||||
assert_client_authorized(env, pageserver_http_client_new)
|
||||
|
||||
# Remove the old token and reload
|
||||
os.remove(Path(env.repo_dir) / "auth_public_key.pem" / "auth_public_key_old.pem")
|
||||
pageserver_http_client_old.reload_auth_validation_keys()
|
||||
|
||||
# Reloading fails now with the old token, but the new token still works
|
||||
assert_client_not_authorized(env, pageserver_http_client_old)
|
||||
assert_client_authorized(env, pageserver_http_client_new)
|
||||
|
||||
|
||||
def test_pageserver_key_reload(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.auth_enabled = True
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.append(".*Authentication error: InvalidSignature.*")
|
||||
env.pageserver.allowed_errors.append(".*Unauthorized: malformed jwt token.*")
|
||||
|
||||
pageserver_token_old = env.auth_keys.generate_pageserver_token()
|
||||
pageserver_http_client_old = env.pageserver.http_client(pageserver_token_old)
|
||||
|
||||
pageserver_http_client_old.reload_auth_validation_keys()
|
||||
|
||||
# Regenerate the keys
|
||||
env.regenerate_keys_at(Path("auth_private_key.pem"), Path("auth_public_key.pem"))
|
||||
|
||||
# Reload the keys on the pageserver side
|
||||
pageserver_http_client_old.reload_auth_validation_keys()
|
||||
|
||||
# Next attempt fails as we use the old auth token
|
||||
with pytest.raises(
|
||||
PageserverApiException,
|
||||
match="Forbidden: JWT authentication error",
|
||||
):
|
||||
pageserver_http_client_old.reload_auth_validation_keys()
|
||||
|
||||
# same goes for attempts trying to create a timeline
|
||||
assert_client_not_authorized(env, pageserver_http_client_old)
|
||||
|
||||
pageserver_token_new = env.auth_keys.generate_pageserver_token()
|
||||
pageserver_http_client_new = env.pageserver.http_client(pageserver_token_new)
|
||||
|
||||
# timeline creation works with the new token
|
||||
assert_client_authorized(env, pageserver_http_client_new)
|
||||
|
||||
# reloading also works with the new token
|
||||
pageserver_http_client_new.reload_auth_validation_keys()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("auth_enabled", [False, True])
|
||||
def test_auth_failures(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
neon_env_builder.auth_enabled = auth_enabled
|
||||
|
||||
60
test_runner/regress/test_bad_connection.py
Normal file
60
test_runner/regress/test_bad_connection.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import random
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.pageserver.allowed_errors.append(".*simulated connection error.*")
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
env.neon_cli.create_branch("test_compute_pageserver_connection_stress")
|
||||
endpoint = env.endpoints.create_start("test_compute_pageserver_connection_stress")
|
||||
|
||||
# Enable failpoint after starting everything else up so that loading initial
|
||||
# basebackup doesn't fail
|
||||
pageserver_http.configure_failpoints(("simulated-bad-compute-connection", "50%return(15)"))
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# Create table, and insert some rows. Make it big enough that it doesn't fit in
|
||||
# shared_buffers, otherwise the SELECT after restart will just return answer
|
||||
# from shared_buffers without hitting the page server, which defeats the point
|
||||
# of this test.
|
||||
cur.execute("CREATE TABLE foo (t text)")
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
FROM generate_series(1, 100000) g
|
||||
"""
|
||||
)
|
||||
|
||||
# Verify that the table is larger than shared_buffers
|
||||
cur.execute(
|
||||
"""
|
||||
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
|
||||
from pg_settings where name = 'shared_buffers'
|
||||
"""
|
||||
)
|
||||
row = cur.fetchone()
|
||||
assert row is not None
|
||||
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
|
||||
assert int(row[0]) < int(row[1])
|
||||
|
||||
cur.execute("SELECT count(*) FROM foo")
|
||||
assert cur.fetchone() == (100000,)
|
||||
|
||||
end_time = time.time() + 30
|
||||
times_executed = 0
|
||||
while time.time() < end_time:
|
||||
if random.random() < 0.5:
|
||||
cur.execute("INSERT INTO foo VALUES ('stas'), ('heikki')")
|
||||
else:
|
||||
cur.execute("SELECT t FROM foo ORDER BY RANDOM() LIMIT 10")
|
||||
cur.fetchall()
|
||||
times_executed += 1
|
||||
log.info(f"Workload executed {times_executed} times")
|
||||
@@ -26,6 +26,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
|
||||
".*will not become active. Current state: Broken.*",
|
||||
".*failed to load metadata.*",
|
||||
".*load failed.*load local timeline.*",
|
||||
".*layer loading failed permanently: load layer: .*",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
import asyncio
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
|
||||
|
||||
def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
num_connections = 3
|
||||
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
@@ -16,15 +20,24 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
alt_pageserver_id = env.pageservers[1].id
|
||||
env.pageservers[1].tenant_attach(env.initial_tenant)
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
pg_conns = [endpoint.connect() for i in range(num_connections)]
|
||||
curs = [pg_conn.cursor() for pg_conn in pg_conns]
|
||||
|
||||
def execute(statement: str):
|
||||
for cur in curs:
|
||||
cur.execute(statement)
|
||||
|
||||
def fetchone():
|
||||
results = [cur.fetchone() for cur in curs]
|
||||
assert all(result == results[0] for result in results)
|
||||
return results[0]
|
||||
|
||||
# Create table, and insert some rows. Make it big enough that it doesn't fit in
|
||||
# shared_buffers, otherwise the SELECT after restart will just return answer
|
||||
# from shared_buffers without hitting the page server, which defeats the point
|
||||
# of this test.
|
||||
cur.execute("CREATE TABLE foo (t text)")
|
||||
cur.execute(
|
||||
curs[0].execute("CREATE TABLE foo (t text)")
|
||||
curs[0].execute(
|
||||
"""
|
||||
INSERT INTO foo
|
||||
SELECT 'long string to consume some space' || g
|
||||
@@ -33,25 +46,25 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
|
||||
# Verify that the table is larger than shared_buffers
|
||||
cur.execute(
|
||||
curs[0].execute(
|
||||
"""
|
||||
select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size
|
||||
from pg_settings where name = 'shared_buffers'
|
||||
"""
|
||||
)
|
||||
row = cur.fetchone()
|
||||
row = curs[0].fetchone()
|
||||
assert row is not None
|
||||
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
|
||||
assert int(row[0]) < int(row[1])
|
||||
|
||||
cur.execute("SELECT count(*) FROM foo")
|
||||
assert cur.fetchone() == (100000,)
|
||||
execute("SELECT count(*) FROM foo")
|
||||
assert fetchone() == (100000,)
|
||||
|
||||
endpoint.reconfigure(pageserver_id=alt_pageserver_id)
|
||||
|
||||
# Verify that the neon.pageserver_connstring GUC is set to the correct thing
|
||||
cur.execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
|
||||
connstring = cur.fetchone()
|
||||
execute("SELECT setting FROM pg_settings WHERE name='neon.pageserver_connstring'")
|
||||
connstring = fetchone()
|
||||
assert connstring is not None
|
||||
expected_connstring = f"postgresql://no_user:@localhost:{env.pageservers[1].service_port.pg}"
|
||||
assert expected_connstring == expected_connstring
|
||||
@@ -60,5 +73,45 @@ def test_change_pageserver(neon_env_builder: NeonEnvBuilder):
|
||||
0
|
||||
].stop() # Stop the old pageserver just to make sure we're reading from the new one
|
||||
|
||||
cur.execute("SELECT count(*) FROM foo")
|
||||
assert cur.fetchone() == (100000,)
|
||||
execute("SELECT count(*) FROM foo")
|
||||
assert fetchone() == (100000,)
|
||||
|
||||
# Try failing back, and this time we will stop the current pageserver before reconfiguring
|
||||
# the endpoint. Whereas the previous reconfiguration was like a healthy migration, this
|
||||
# is more like what happens in an unexpected pageserver failure.
|
||||
env.pageservers[0].start()
|
||||
env.pageservers[1].stop()
|
||||
|
||||
endpoint.reconfigure(pageserver_id=env.pageservers[0].id)
|
||||
|
||||
execute("SELECT count(*) FROM foo")
|
||||
assert fetchone() == (100000,)
|
||||
|
||||
env.pageservers[0].stop()
|
||||
env.pageservers[1].start()
|
||||
|
||||
# Test a (former) bug where a child process spins without updating its connection string
|
||||
# by executing a query separately. This query will hang until we issue the reconfigure.
|
||||
async def reconfigure_async():
|
||||
await asyncio.sleep(
|
||||
1
|
||||
) # Sleep for 1 second just to make sure we actually started our count(*) query
|
||||
endpoint.reconfigure(pageserver_id=env.pageservers[1].id)
|
||||
|
||||
def execute_count():
|
||||
execute("SELECT count(*) FROM FOO")
|
||||
|
||||
async def execute_and_reconfigure():
|
||||
task_exec = asyncio.to_thread(execute_count)
|
||||
task_reconfig = asyncio.create_task(reconfigure_async())
|
||||
await asyncio.gather(
|
||||
task_exec,
|
||||
task_reconfig,
|
||||
)
|
||||
|
||||
asyncio.run(execute_and_reconfigure())
|
||||
assert fetchone() == (100000,)
|
||||
|
||||
# One final check that nothing hangs
|
||||
execute("SELECT count(*) FROM foo")
|
||||
assert fetchone() == (100000,)
|
||||
|
||||
@@ -79,11 +79,27 @@ def test_lsn_mapping_old(neon_env_builder: NeonEnvBuilder):
|
||||
def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
new_timeline_id = env.neon_cli.create_branch("test_lsn_mapping")
|
||||
endpoint_main = env.endpoints.create_start("test_lsn_mapping")
|
||||
log.info("postgres is running on 'test_lsn_mapping' branch")
|
||||
tenant_id, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# disable default GC and compaction
|
||||
"gc_period": "1000 m",
|
||||
"compaction_period": "0 s",
|
||||
"gc_horizon": f"{1024 ** 2}",
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
}
|
||||
)
|
||||
|
||||
timeline_id = env.neon_cli.create_branch("test_lsn_mapping", tenant_id=tenant_id)
|
||||
endpoint_main = env.endpoints.create_start("test_lsn_mapping", tenant_id=tenant_id)
|
||||
timeline_id = endpoint_main.safe_psql("show neon.timeline_id")[0][0]
|
||||
log.info("postgres is running on 'main' branch")
|
||||
|
||||
cur = endpoint_main.connect().cursor()
|
||||
|
||||
# Obtain an lsn before all write operations on this branch
|
||||
start_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_lsn()"))
|
||||
|
||||
# Create table, and insert rows, each in a separate transaction
|
||||
# Disable `synchronous_commit` to make this initialization go faster.
|
||||
# XXX: on my laptop this test takes 7s, and setting `synchronous_commit=off`
|
||||
@@ -106,29 +122,33 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
cur.execute("INSERT INTO foo VALUES (-1)")
|
||||
|
||||
# Wait until WAL is received by pageserver
|
||||
wait_for_last_flush_lsn(env, endpoint_main, env.initial_tenant, new_timeline_id)
|
||||
last_flush_lsn = wait_for_last_flush_lsn(env, endpoint_main, tenant_id, timeline_id)
|
||||
|
||||
with env.pageserver.http_client() as client:
|
||||
# Check edge cases
|
||||
# Timestamp is in the future
|
||||
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
)
|
||||
assert result["kind"] in ["present", "future"]
|
||||
# make sure that we return a well advanced lsn here
|
||||
assert Lsn(result["lsn"]) > start_lsn
|
||||
|
||||
# Timestamp is in the unreachable past
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
)
|
||||
assert result["kind"] == "past"
|
||||
# make sure that we return the minimum lsn here at the start of the range
|
||||
assert Lsn(result["lsn"]) < start_lsn
|
||||
|
||||
# Probe a bunch of timestamps in the valid range
|
||||
for i in range(1, len(tbl), 100):
|
||||
probe_timestamp = tbl[i][1]
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
)
|
||||
assert result["kind"] not in ["past", "nodata"]
|
||||
lsn = result["lsn"]
|
||||
@@ -136,12 +156,29 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
# Launch a new read-only node at that LSN, and check that only the rows
|
||||
# that were supposed to be committed at that point in time are visible.
|
||||
endpoint_here = env.endpoints.create_start(
|
||||
branch_name="test_lsn_mapping", endpoint_id="ep-lsn_mapping_read", lsn=lsn
|
||||
branch_name="test_lsn_mapping",
|
||||
endpoint_id="ep-lsn_mapping_read",
|
||||
lsn=lsn,
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
assert endpoint_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
|
||||
|
||||
endpoint_here.stop_and_destroy()
|
||||
|
||||
# Do the "past" check again at a new branch to ensure that we don't return something before the branch cutoff
|
||||
timeline_id_child = env.neon_cli.create_branch(
|
||||
"test_lsn_mapping_child", tenant_id=tenant_id, ancestor_branch_name="test_lsn_mapping"
|
||||
)
|
||||
|
||||
# Timestamp is in the unreachable past
|
||||
probe_timestamp = tbl[0][1] - timedelta(hours=10)
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id_child, f"{probe_timestamp.isoformat()}Z", 2
|
||||
)
|
||||
assert result["kind"] == "past"
|
||||
# make sure that we return the minimum lsn here at the start of the range
|
||||
assert Lsn(result["lsn"]) >= last_flush_lsn
|
||||
|
||||
|
||||
# Test pageserver get_timestamp_of_lsn API
|
||||
def test_ts_of_lsn_api(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
@@ -366,11 +366,17 @@ def test_deletion_queue_recovery(
|
||||
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
|
||||
|
||||
if validate_before == ValidateBefore.VALIDATE:
|
||||
# At this point, one or more DeletionLists have been written. We have set a failpoint
|
||||
# to prevent them successfully executing, but we want to see them get validated.
|
||||
#
|
||||
# We await _some_ validations instead of _all_ validations, because our execution failpoint
|
||||
# will prevent validation proceeding for any but the first DeletionList. Usually the workload
|
||||
# just generates one, but if it generates two due to timing, then we must not expect that the
|
||||
# second one will be validated.
|
||||
def assert_some_validations():
|
||||
assert get_deletion_queue_validated(ps_http) > 0
|
||||
|
||||
def assert_validation_complete():
|
||||
assert get_deletion_queue_submitted(ps_http) == get_deletion_queue_validated(ps_http)
|
||||
|
||||
wait_until(20, 1, assert_validation_complete)
|
||||
wait_until(20, 1, assert_some_validations)
|
||||
|
||||
# The validatated keys statistic advances before the header is written, so we
|
||||
# also wait to see the header hit the disk: this seems paranoid but the race
|
||||
@@ -380,6 +386,11 @@ def test_deletion_queue_recovery(
|
||||
|
||||
wait_until(20, 1, assert_header_written)
|
||||
|
||||
# If we will lose attachment, then our expectation on restart is that only the ones
|
||||
# we already validated will execute. Act like only those were present in the queue.
|
||||
if keep_attachment == KeepAttachment.LOSE:
|
||||
before_restart_depth = get_deletion_queue_validated(ps_http)
|
||||
|
||||
log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued")
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
@@ -402,11 +413,13 @@ def test_deletion_queue_recovery(
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
wait_until(10, 1, lambda: assert_deletion_queue(ps_http, lambda n: n == 0))
|
||||
|
||||
if keep_attachment == KeepAttachment.KEEP or validate_before == ValidateBefore.VALIDATE:
|
||||
if keep_attachment == KeepAttachment.KEEP:
|
||||
# - If we kept the attachment, then our pre-restart deletions should execute
|
||||
# because on re-attach they were from the immediately preceding generation
|
||||
# - If we validated before restart, then the deletions should execute because the
|
||||
# deletion queue header records a validated deletion list sequence number.
|
||||
assert get_deletion_queue_executed(ps_http) == before_restart_depth
|
||||
elif validate_before == ValidateBefore.VALIDATE:
|
||||
# - If we validated before restart, then we should execute however many keys were
|
||||
# validated before restart.
|
||||
assert get_deletion_queue_executed(ps_http) == before_restart_depth
|
||||
else:
|
||||
env.pageserver.allowed_errors.extend([".*Dropping stale deletions.*"])
|
||||
|
||||
@@ -17,10 +17,6 @@ def test_pageserver_restarts_under_worload(neon_simple_env: NeonEnv, pg_bin: PgB
|
||||
n_restarts = 10
|
||||
scale = 10
|
||||
|
||||
# Pageserver currently logs requests on non-active tenants at error level
|
||||
# https://github.com/neondatabase/neon/issues/5784
|
||||
env.pageserver.allowed_errors.append(".* will not become active. Current state: Stopping.*")
|
||||
|
||||
def run_pgbench(connstr: str):
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
|
||||
|
||||
Reference in New Issue
Block a user