mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 02:20:42 +00:00
Merge branch 'main' into cicd/debug-regress-tests-on-arm
This commit is contained in:
@@ -67,6 +67,7 @@ from fixtures.pageserver.utils import (
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import (
|
||||
LocalFsStorage,
|
||||
MockS3Server,
|
||||
RemoteStorage,
|
||||
RemoteStorageKind,
|
||||
@@ -4425,14 +4426,32 @@ class Safekeeper(LogUtils):
|
||||
def timeline_dir(self, tenant_id, timeline_id) -> Path:
|
||||
return self.data_dir / str(tenant_id) / str(timeline_id)
|
||||
|
||||
def list_uploaded_segments(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
tline_path = (
|
||||
self.env.repo_dir
|
||||
/ "local_fs_remote_storage"
|
||||
/ "safekeeper"
|
||||
/ str(tenant_id)
|
||||
/ str(timeline_id)
|
||||
)
|
||||
assert isinstance(self.env.safekeepers_remote_storage, LocalFsStorage)
|
||||
return self._list_segments_in_dir(
|
||||
tline_path, lambda name: ".metadata" not in name and ".___temp" not in name
|
||||
)
|
||||
|
||||
def list_segments(self, tenant_id, timeline_id) -> List[str]:
|
||||
"""
|
||||
Get list of segment names of the given timeline.
|
||||
"""
|
||||
tli_dir = self.timeline_dir(tenant_id, timeline_id)
|
||||
return self._list_segments_in_dir(
|
||||
tli_dir, lambda name: not name.startswith("safekeeper.control")
|
||||
)
|
||||
|
||||
def _list_segments_in_dir(self, path: Path, keep_filter: Callable[[str], bool]) -> list[str]:
|
||||
segments = []
|
||||
for _, _, filenames in os.walk(tli_dir):
|
||||
segments.extend([f for f in filenames if not f.startswith("safekeeper.control")])
|
||||
for _, _, filenames in os.walk(path):
|
||||
segments.extend([f for f in filenames if keep_filter(f)])
|
||||
segments.sort()
|
||||
return segments
|
||||
|
||||
|
||||
@@ -2091,3 +2091,47 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
== 0
|
||||
)
|
||||
|
||||
|
||||
def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder):
|
||||
# single unsharded tenant, two locations
|
||||
neon_env_builder.num_pageservers = 2
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 1}})
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
attached_id = int(env.storage_controller.locate(env.initial_tenant)[0]["node_id"])
|
||||
attached = next((ps for ps in env.pageservers if ps.id == attached_id))
|
||||
|
||||
def attached_is_draining():
|
||||
details = env.storage_controller.node_status(attached.id)
|
||||
assert details["scheduling"] == "Draining"
|
||||
|
||||
env.storage_controller.configure_failpoints(("sleepy-drain-loop", "return(10000)"))
|
||||
env.storage_controller.node_drain(attached.id)
|
||||
|
||||
wait_until(10, 0.5, attached_is_draining)
|
||||
|
||||
attached.restart()
|
||||
|
||||
# we are unable to reconfigure node while the operation is still ongoing
|
||||
with pytest.raises(
|
||||
StorageControllerApiException,
|
||||
match="Precondition failed: Ongoing background operation forbids configuring: drain.*",
|
||||
):
|
||||
env.storage_controller.node_configure(attached.id, {"scheduling": "Pause"})
|
||||
with pytest.raises(
|
||||
StorageControllerApiException,
|
||||
match="Precondition failed: Ongoing background operation forbids configuring: drain.*",
|
||||
):
|
||||
env.storage_controller.node_configure(attached.id, {"availability": "Offline"})
|
||||
|
||||
env.storage_controller.cancel_node_drain(attached.id)
|
||||
|
||||
def reconfigure_node_again():
|
||||
env.storage_controller.node_configure(attached.id, {"scheduling": "Pause"})
|
||||
|
||||
# allow for small delay between actually having cancelled and being able reconfigure again
|
||||
wait_until(4, 0.5, reconfigure_node_again)
|
||||
|
||||
@@ -49,7 +49,13 @@ from fixtures.remote_storage import (
|
||||
)
|
||||
from fixtures.safekeeper.http import SafekeeperHttpClient
|
||||
from fixtures.safekeeper.utils import are_walreceivers_absent
|
||||
from fixtures.utils import PropagatingThread, get_dir_size, query_scalar, start_in_background
|
||||
from fixtures.utils import (
|
||||
PropagatingThread,
|
||||
get_dir_size,
|
||||
query_scalar,
|
||||
start_in_background,
|
||||
wait_until,
|
||||
)
|
||||
|
||||
|
||||
def wait_lsn_force_checkpoint(
|
||||
@@ -63,6 +69,18 @@ def wait_lsn_force_checkpoint(
|
||||
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
log.info(f"pg_current_wal_flush_lsn is {lsn}, waiting for it on pageserver")
|
||||
|
||||
wait_lsn_force_checkpoint_at(lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
|
||||
|
||||
|
||||
def wait_lsn_force_checkpoint_at(
|
||||
lsn: Lsn,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
ps: NeonPageserver,
|
||||
pageserver_conn_options=None,
|
||||
):
|
||||
pageserver_conn_options = pageserver_conn_options or {}
|
||||
|
||||
auth_token = None
|
||||
if "password" in pageserver_conn_options:
|
||||
auth_token = pageserver_conn_options["password"]
|
||||
@@ -2304,3 +2322,138 @@ def test_s3_eviction(
|
||||
)
|
||||
|
||||
assert event_metrics_seen
|
||||
|
||||
|
||||
def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Verify that pulling timeline from a SK with an uploaded partial segment
|
||||
does not lead to consistency issues:
|
||||
1. Start 3 SKs - only use two
|
||||
2. Ingest a bit of WAL
|
||||
3. Wait for partial to be uploaded
|
||||
4. Pull timeline to the third SK
|
||||
6. Replace source with destination SK and start compute
|
||||
5. Wait for source SK to evict timeline
|
||||
6. Go back to initial compute SK config and validate that
|
||||
source SK can unevict the timeline (S3 state is consistent)
|
||||
"""
|
||||
neon_env_builder.auth_enabled = True
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
|
||||
|
||||
neon_env_builder.safekeeper_extra_opts = [
|
||||
"--enable-offload",
|
||||
"--delete-offloaded-wal",
|
||||
"--partial-backup-timeout",
|
||||
"500ms",
|
||||
"--control-file-save-interval",
|
||||
"500ms",
|
||||
"--eviction-min-resident=500ms",
|
||||
]
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"checkpoint_timeout": "100ms"})
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
|
||||
|
||||
log.info("use only first 2 safekeepers, 3rd will be seeded")
|
||||
endpoint = env.endpoints.create("main")
|
||||
endpoint.active_safekeepers = [1, 2]
|
||||
endpoint.start()
|
||||
endpoint.safe_psql("create table t(key int, value text)")
|
||||
endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'")
|
||||
|
||||
endpoint.stop()
|
||||
|
||||
def source_partial_segment_uploaded():
|
||||
first_segment_name = "000000010000000000000001"
|
||||
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
|
||||
candidate_seg = None
|
||||
for seg in segs:
|
||||
if "partial" in seg and "sk1" in seg and not seg.startswith(first_segment_name):
|
||||
candidate_seg = seg
|
||||
|
||||
if candidate_seg is not None:
|
||||
# The term might change, causing the segment to be gc-ed shortly after,
|
||||
# so give it a bit of time to make sure it's stable.
|
||||
time.sleep(2)
|
||||
|
||||
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
assert candidate_seg in segs
|
||||
return candidate_seg
|
||||
|
||||
raise Exception("Partial segment not uploaded yet")
|
||||
|
||||
source_partial_segment = wait_until(15, 1, source_partial_segment_uploaded)
|
||||
log.info(
|
||||
f"Uploaded segments before pull are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
|
||||
)
|
||||
log.info(f"Tracking source partial segment: {source_partial_segment}")
|
||||
|
||||
src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id)
|
||||
log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}")
|
||||
|
||||
pageserver_conn_options = {"password": env.auth_keys.generate_tenant_token(tenant_id)}
|
||||
wait_lsn_force_checkpoint_at(
|
||||
src_flush_lsn, tenant_id, timeline_id, env.pageserver, pageserver_conn_options
|
||||
)
|
||||
|
||||
dst_sk.pull_timeline([src_sk], tenant_id, timeline_id)
|
||||
|
||||
def evicted():
|
||||
evictions = src_sk.http_client().get_metric_value(
|
||||
"safekeeper_eviction_events_completed_total", {"kind": "evict"}
|
||||
)
|
||||
|
||||
if evictions is None or evictions == 0:
|
||||
raise Exception("Eviction did not happen on source safekeeper yet")
|
||||
|
||||
wait_until(30, 1, evicted)
|
||||
|
||||
endpoint.start(safekeepers=[2, 3])
|
||||
|
||||
def new_partial_segment_uploaded():
|
||||
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
for seg in segs:
|
||||
if "partial" in seg and "sk3" in seg:
|
||||
return seg
|
||||
|
||||
raise Exception("Partial segment not uploaded yet")
|
||||
|
||||
log.info(
|
||||
f"Uploaded segments before post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
|
||||
)
|
||||
|
||||
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
|
||||
wait_until(15, 1, new_partial_segment_uploaded)
|
||||
|
||||
log.info(
|
||||
f"Uploaded segments after post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
|
||||
)
|
||||
|
||||
# Allow for some gc iterations to happen and assert that the original
|
||||
# uploaded partial segment remains in place.
|
||||
time.sleep(5)
|
||||
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
assert source_partial_segment in segs
|
||||
|
||||
log.info(
|
||||
f"Uploaded segments at the end are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}"
|
||||
)
|
||||
|
||||
# Restart the endpoint in order to check that the source safekeeper
|
||||
# can unevict the timeline
|
||||
endpoint.stop()
|
||||
endpoint.start(safekeepers=[1, 2])
|
||||
|
||||
def unevicted():
|
||||
unevictions = src_sk.http_client().get_metric_value(
|
||||
"safekeeper_eviction_events_completed_total", {"kind": "restore"}
|
||||
)
|
||||
|
||||
if unevictions is None or unevictions == 0:
|
||||
raise Exception("Uneviction did not happen on source safekeeper yet")
|
||||
|
||||
wait_until(10, 1, unevicted)
|
||||
|
||||
Reference in New Issue
Block a user