mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-05 06:20:37 +00:00
Correctly remove orphaned objects in RemoteTimelineClient::delete_all (#5000)
Previously list_prefixes was incorrectly used for that purpose. Change to use list_files. Add a test. Some drive by refactorings on python side to move helpers out of specific test file to be widely accessible resolves https://github.com/neondatabase/neon/issues/4499
This commit is contained in:
@@ -851,7 +851,7 @@ impl RemoteTimelineClient {
|
||||
let remaining = backoff::retry(
|
||||
|| async {
|
||||
self.storage_impl
|
||||
.list_prefixes(Some(&timeline_storage_path))
|
||||
.list_files(Some(&timeline_storage_path))
|
||||
.await
|
||||
},
|
||||
|_e| false,
|
||||
|
||||
@@ -7,6 +7,9 @@ from pathlib import Path
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"
|
||||
|
||||
|
||||
class MockS3Server:
|
||||
@@ -89,6 +92,16 @@ def available_s3_storages() -> List[RemoteStorageKind]:
|
||||
class LocalFsStorage:
|
||||
root: Path
|
||||
|
||||
def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
||||
return self.root / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
|
||||
def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
||||
return self.timeline_path(tenant_id, timeline_id) / TIMELINE_INDEX_PART_FILE_NAME
|
||||
|
||||
def index_content(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
with self.index_path(tenant_id, timeline_id).open("r") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
@dataclass
|
||||
class S3Storage:
|
||||
|
||||
@@ -24,6 +24,7 @@ from fixtures.pageserver.utils import (
|
||||
wait_until_tenant_state,
|
||||
)
|
||||
from fixtures.remote_storage import (
|
||||
TIMELINE_INDEX_PART_FILE_NAME,
|
||||
LocalFsStorage,
|
||||
RemoteStorageKind,
|
||||
available_remote_storages,
|
||||
@@ -615,9 +616,7 @@ def test_timeline_deletion_with_files_stuck_in_upload_queue(
|
||||
|
||||
# to please mypy
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
remote_timeline_path = (
|
||||
env.remote_storage.root / "tenants" / str(tenant_id) / "timelines" / str(timeline_id)
|
||||
)
|
||||
remote_timeline_path = env.remote_storage.timeline_path(tenant_id, timeline_id)
|
||||
|
||||
assert not list(remote_timeline_path.iterdir())
|
||||
|
||||
@@ -722,15 +721,14 @@ def test_empty_branch_remote_storage_upload_on_restart(
|
||||
# index upload is now hitting the failpoint, it should block the shutdown
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
timeline_path = (
|
||||
Path("tenants") / str(env.initial_tenant) / "timelines" / str(new_branch_timeline_id)
|
||||
)
|
||||
|
||||
local_metadata = env.repo_dir / timeline_path / "metadata"
|
||||
local_metadata = env.timeline_dir(env.initial_tenant, new_branch_timeline_id) / "metadata"
|
||||
assert local_metadata.is_file()
|
||||
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
new_branch_on_remote_storage = env.remote_storage.root / timeline_path
|
||||
|
||||
new_branch_on_remote_storage = env.remote_storage.timeline_path(
|
||||
env.initial_tenant, new_branch_timeline_id
|
||||
)
|
||||
assert (
|
||||
not new_branch_on_remote_storage.exists()
|
||||
), "failpoint should had prohibited index_part.json upload"
|
||||
@@ -779,7 +777,7 @@ def test_empty_branch_remote_storage_upload_on_restart(
|
||||
assert_nothing_to_upload(client, env.initial_tenant, new_branch_timeline_id)
|
||||
|
||||
assert (
|
||||
new_branch_on_remote_storage / "index_part.json"
|
||||
new_branch_on_remote_storage / TIMELINE_INDEX_PART_FILE_NAME
|
||||
).is_file(), "uploads scheduled during initial load should had been awaited for"
|
||||
finally:
|
||||
create_thread.join()
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple
|
||||
@@ -225,10 +224,11 @@ def test_tenants_attached_after_download(
|
||||
# FIXME: test index_part.json getting downgraded from imaginary new version
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
|
||||
def test_tenant_redownloads_truncated_file_on_startup(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
remote_storage_kind = RemoteStorageKind.LOCAL_FS
|
||||
|
||||
# since we now store the layer file length metadata, we notice on startup that a layer file is of wrong size, and proceed to redownload it.
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=remote_storage_kind,
|
||||
@@ -237,6 +237,8 @@ def test_tenant_redownloads_truncated_file_on_startup(
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*removing local file .* because it has unexpected length.*"
|
||||
)
|
||||
@@ -279,7 +281,7 @@ def test_tenant_redownloads_truncated_file_on_startup(
|
||||
(path, expected_size) = local_layer_truncated
|
||||
|
||||
# ensure the same size is found from the index_part.json
|
||||
index_part = local_fs_index_part(env, tenant_id, timeline_id)
|
||||
index_part = env.remote_storage.index_content(tenant_id, timeline_id)
|
||||
assert index_part["layer_metadata"][path.name]["file_size"] == expected_size
|
||||
|
||||
## Start the pageserver. It will notice that the file size doesn't match, and
|
||||
@@ -309,7 +311,7 @@ def test_tenant_redownloads_truncated_file_on_startup(
|
||||
assert os.stat(path).st_size == expected_size, "truncated layer should had been re-downloaded"
|
||||
|
||||
# the remote side of local_layer_truncated
|
||||
remote_layer_path = local_fs_index_part_path(env, tenant_id, timeline_id).parent / path.name
|
||||
remote_layer_path = env.remote_storage.timeline_path(tenant_id, timeline_id) / path.name
|
||||
|
||||
# if the upload ever was ongoing, this check would be racy, but at least one
|
||||
# extra http request has been made in between so assume it's enough delay
|
||||
@@ -334,27 +336,3 @@ def test_tenant_redownloads_truncated_file_on_startup(
|
||||
assert (
|
||||
os.stat(remote_layer_path).st_size == expected_size
|
||||
), "truncated file should not had been uploaded after next checkpoint"
|
||||
|
||||
|
||||
def local_fs_index_part(env, tenant_id, timeline_id):
|
||||
"""
|
||||
Return json.load parsed index_part.json of tenant and timeline from LOCAL_FS
|
||||
"""
|
||||
timeline_path = local_fs_index_part_path(env, tenant_id, timeline_id)
|
||||
with open(timeline_path, "r") as timeline_file:
|
||||
return json.load(timeline_file)
|
||||
|
||||
|
||||
def local_fs_index_part_path(env, tenant_id, timeline_id):
|
||||
"""
|
||||
Return path to the LOCAL_FS index_part.json of the tenant and timeline.
|
||||
"""
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
return (
|
||||
env.remote_storage.root
|
||||
/ "tenants"
|
||||
/ str(tenant_id)
|
||||
/ "timelines"
|
||||
/ str(timeline_id)
|
||||
/ "index_part.json"
|
||||
)
|
||||
|
||||
@@ -27,6 +27,7 @@ from fixtures.pageserver.utils import (
|
||||
wait_until_timeline_state,
|
||||
)
|
||||
from fixtures.remote_storage import (
|
||||
LocalFsStorage,
|
||||
RemoteStorageKind,
|
||||
available_remote_storages,
|
||||
)
|
||||
@@ -762,3 +763,64 @@ def test_timeline_delete_works_for_remote_smoke(
|
||||
0.5,
|
||||
lambda: assert_prefix_empty(neon_env_builder),
|
||||
)
|
||||
|
||||
|
||||
def test_delete_orphaned_objects(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
remote_storage_kind = RemoteStorageKind.LOCAL_FS
|
||||
neon_env_builder.enable_remote_storage(remote_storage_kind, "test_delete_orphaned_objects")
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"image_creation_threshold": "100",
|
||||
}
|
||||
)
|
||||
|
||||
assert isinstance(env.remote_storage, LocalFsStorage)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
timeline_id = env.neon_cli.create_timeline("delete")
|
||||
with env.endpoints.create_start("delete") as endpoint:
|
||||
# generate enough layers
|
||||
pg_bin.run(["pgbench", "-i", "-I dtGvp", "-s1", endpoint.connstr()])
|
||||
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
|
||||
|
||||
# write orphaned file that is missing from the index
|
||||
remote_timeline_path = env.remote_storage.timeline_path(env.initial_tenant, timeline_id)
|
||||
orphans = [remote_timeline_path / f"orphan_{i}" for i in range(3)]
|
||||
for orphan in orphans:
|
||||
orphan.write_text("I shouldnt be there")
|
||||
|
||||
# trigger failpoint after orphaned file deletion to check that index_part is not deleted as well.
|
||||
failpoint = "timeline-delete-before-index-delete"
|
||||
ps_http.configure_failpoints((failpoint, "return"))
|
||||
|
||||
env.pageserver.allowed_errors.append(f".*failpoint: {failpoint}")
|
||||
|
||||
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
|
||||
|
||||
ps_http.timeline_delete(env.initial_tenant, timeline_id)
|
||||
timeline_info = wait_until_timeline_state(
|
||||
pageserver_http=ps_http,
|
||||
tenant_id=env.initial_tenant,
|
||||
timeline_id=timeline_id,
|
||||
expected_state="Broken",
|
||||
iterations=iterations,
|
||||
)
|
||||
|
||||
reason = timeline_info["state"]["Broken"]["reason"]
|
||||
assert reason.endswith(f"failpoint: {failpoint}"), reason
|
||||
|
||||
for orphan in orphans:
|
||||
assert not orphan.exists()
|
||||
assert env.pageserver.log_contains(
|
||||
f"deleting a file not referenced from index_part.json name={orphan.stem}"
|
||||
)
|
||||
|
||||
assert env.remote_storage.index_path(env.initial_tenant, timeline_id).exists()
|
||||
|
||||
Reference in New Issue
Block a user