mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
safekeeper: add endpoint resetting uploaded partial segment state.
Endpoint implementation sends msg to manager requesting to do the reset. Manager stops current partial backup upload task if it exists and performs the reset. Also slightly tweak eviction condition: all full segments before flush_lsn must be uploaded (and committed) and there must be only one segment left on disk (partial). This allows to evict timelines which started not on the first segment and didn't fill the whole segment (previous condition wasn't good because last_removed_segno was 0). ref https://github.com/neondatabase/neon/issues/8759
This commit is contained in:
@@ -4553,6 +4553,8 @@ class Safekeeper(LogUtils):
|
||||
def timeline_dir(self, tenant_id, timeline_id) -> Path:
|
||||
return self.data_dir / str(tenant_id) / str(timeline_id)
|
||||
|
||||
# List partial uploaded segments of this safekeeper. Works only for
|
||||
# RemoteStorageKind.LOCAL_FS.
|
||||
def list_uploaded_segments(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
tline_path = (
|
||||
self.env.repo_dir
|
||||
@@ -4562,9 +4564,11 @@ class Safekeeper(LogUtils):
|
||||
/ str(timeline_id)
|
||||
)
|
||||
assert isinstance(self.env.safekeepers_remote_storage, LocalFsStorage)
|
||||
return self._list_segments_in_dir(
|
||||
segs = self._list_segments_in_dir(
|
||||
tline_path, lambda name: ".metadata" not in name and ".___temp" not in name
|
||||
)
|
||||
mysegs = [s for s in segs if f"sk{self.id}" in s]
|
||||
return mysegs
|
||||
|
||||
def list_segments(self, tenant_id, timeline_id) -> List[str]:
|
||||
"""
|
||||
|
||||
@@ -174,6 +174,22 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def debug_dump_timeline(
|
||||
self, timeline_id: TimelineId, params: Optional[Dict[str, str]] = None
|
||||
) -> Any:
|
||||
params = params or {}
|
||||
params["timeline_id"] = str(timeline_id)
|
||||
dump = self.debug_dump(params)
|
||||
return dump["timelines"][0]
|
||||
|
||||
def get_partial_backup(self, timeline_id: TimelineId) -> Any:
|
||||
dump = self.debug_dump_timeline(timeline_id, {"dump_control_file": "true"})
|
||||
return dump["control_file"]["partial_backup"]
|
||||
|
||||
def get_eviction_state(self, timeline_id: TimelineId) -> Any:
|
||||
dump = self.debug_dump_timeline(timeline_id, {"dump_control_file": "true"})
|
||||
return dump["control_file"]["eviction_state"]
|
||||
|
||||
def pull_timeline(self, body: Dict[str, Any]) -> Dict[str, Any]:
|
||||
res = self.post(f"http://localhost:{self.port}/v1/pull_timeline", json=body)
|
||||
res.raise_for_status()
|
||||
@@ -228,6 +244,14 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def backup_partial_reset(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/backup_partial_reset",
|
||||
json={},
|
||||
)
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
|
||||
res = self.post(
|
||||
f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}",
|
||||
|
||||
@@ -72,6 +72,17 @@ def wait_lsn_force_checkpoint(
|
||||
wait_lsn_force_checkpoint_at(lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
|
||||
|
||||
|
||||
def wait_lsn_force_checkpoint_at_sk(
|
||||
safekeeper: Safekeeper,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
ps: NeonPageserver,
|
||||
pageserver_conn_options=None,
|
||||
):
|
||||
sk_flush_lsn = safekeeper.get_flush_lsn(tenant_id, timeline_id)
|
||||
wait_lsn_force_checkpoint_at(sk_flush_lsn, tenant_id, timeline_id, ps, pageserver_conn_options)
|
||||
|
||||
|
||||
def wait_lsn_force_checkpoint_at(
|
||||
lsn: Lsn,
|
||||
tenant_id: TenantId,
|
||||
@@ -79,6 +90,10 @@ def wait_lsn_force_checkpoint_at(
|
||||
ps: NeonPageserver,
|
||||
pageserver_conn_options=None,
|
||||
):
|
||||
"""
|
||||
Wait until pageserver receives given lsn, force checkpoint and wait for
|
||||
upload, i.e. remote_consistent_lsn advancement.
|
||||
"""
|
||||
pageserver_conn_options = pageserver_conn_options or {}
|
||||
|
||||
auth_token = None
|
||||
@@ -2330,6 +2345,77 @@ def test_s3_eviction(
|
||||
assert event_metrics_seen
|
||||
|
||||
|
||||
# Test resetting uploaded partial segment state.
|
||||
def test_backup_partial_reset(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
|
||||
# We want to upload/evict quickly, but not too quickly to check that s3 is
|
||||
# empty before next round of upload happens.
|
||||
# Note: this test fails with --delete-offloaded-wal, this is expected.
|
||||
neon_env_builder.safekeeper_extra_opts = [
|
||||
"--enable-offload",
|
||||
"--partial-backup-timeout",
|
||||
"1s",
|
||||
"--control-file-save-interval",
|
||||
"1s",
|
||||
"--eviction-min-resident=1s",
|
||||
]
|
||||
# XXX: pageserver currently connects to safekeeper as long as connection
|
||||
# manager doesn't remove its entry (default lagging_wal_timeout is 10s),
|
||||
# causing uneviction. It should be fixed to not reconnect if last
|
||||
# remote_consistent_lsn is communicated and there is nothing to fetch. Make
|
||||
# value lower to speed up the test.
|
||||
initial_tenant_conf = {
|
||||
"lagging_wal_timeout": "1s",
|
||||
}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
endpoint = env.endpoints.create("main")
|
||||
endpoint.start()
|
||||
endpoint.safe_psql("create table t(key int, value text)")
|
||||
endpoint.stop()
|
||||
sk = env.safekeepers[0]
|
||||
# eviction won't happen until remote_consistent_lsn catches up.
|
||||
wait_lsn_force_checkpoint_at_sk(sk, tenant_id, timeline_id, env.pageserver)
|
||||
|
||||
http_cli = env.safekeepers[0].http_client()
|
||||
|
||||
# wait until eviction happens
|
||||
def evicted():
|
||||
eviction_state = http_cli.get_eviction_state(timeline_id)
|
||||
log.info(f"eviction_state: {eviction_state}")
|
||||
if isinstance(eviction_state, str) and eviction_state == "Present":
|
||||
raise Exception("eviction didn't happen yet")
|
||||
|
||||
wait_until(30, 1, evicted)
|
||||
# it must have uploaded something
|
||||
uploaded_segs = sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
log.info(f"uploaded segments before reset: {uploaded_segs}")
|
||||
assert len(uploaded_segs) > 0
|
||||
|
||||
reset_res = http_cli.backup_partial_reset(tenant_id, timeline_id)
|
||||
log.info(f"reset res: {reset_res}")
|
||||
|
||||
# Backup_partial_reset must have reset the state and dropped s3 segment.
|
||||
#
|
||||
# Note: if listing takes more than --partial-backup-timeout test becomes
|
||||
# flaky because file might be reuploaded. With local fs it shouldn't be an
|
||||
# issue, but can add retry if this appears.
|
||||
uploaded_segs = sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
log.info(f"uploaded segments after reset: {uploaded_segs}")
|
||||
assert len(uploaded_segs) == 0
|
||||
|
||||
# calling second time should be ok
|
||||
http_cli.backup_partial_reset(tenant_id, timeline_id)
|
||||
|
||||
# inserting data should be ok
|
||||
endpoint.start()
|
||||
endpoint.safe_psql("insert into t values(1, 'hehe')")
|
||||
|
||||
|
||||
def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Verify that pulling timeline from a SK with an uploaded partial segment
|
||||
@@ -2357,7 +2443,16 @@ def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilde
|
||||
"--eviction-min-resident=500ms",
|
||||
]
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"checkpoint_timeout": "100ms"})
|
||||
# XXX: pageserver currently connects to safekeeper as long as connection
|
||||
# manager doesn't remove its entry (default lagging_wal_timeout is 10s),
|
||||
# causing uneviction. It should be fixed to not reconnect if last
|
||||
# remote_consistent_lsn is communicated and there is nothing to fetch. Until
|
||||
# this is fixed make value lower to speed up the test.
|
||||
initial_tenant_conf = {
|
||||
"lagging_wal_timeout": "1s",
|
||||
"checkpoint_timeout": "100ms",
|
||||
}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=initial_tenant_conf)
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
@@ -2421,7 +2516,7 @@ def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilde
|
||||
endpoint.start(safekeepers=[2, 3])
|
||||
|
||||
def new_partial_segment_uploaded():
|
||||
segs = src_sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
segs = dst_sk.list_uploaded_segments(tenant_id, timeline_id)
|
||||
for seg in segs:
|
||||
if "partial" in seg and "sk3" in seg:
|
||||
return seg
|
||||
|
||||
Reference in New Issue
Block a user