Add retrying in Random ops test if parent branch is not found. (#12513)

## Problem
Due to a lag in replication, we sometimes cannot get the parent branch
definition just after completion of the Public API restore call. This
leads to the test failures.
https://databricks.atlassian.net/browse/LKB-279
## Summary of changes
The workaround is implemented. Now test retries up to 30 seconds,
waiting for the branch definition to appear.

---------

Co-authored-by: Alexey Masterov <alexey.masterov@databricks.com>
This commit is contained in:
a-masterov
2025-07-09 17:28:04 +02:00
committed by GitHub
parent 5c0de4ee8c
commit 78a6daa874

View File

@@ -117,7 +117,9 @@ class NeonBranch:
def create_child_branch(self) -> NeonBranch | None:
return self.project.create_branch(self.id)
def create_ro_endpoint(self) -> NeonEndpoint:
def create_ro_endpoint(self) -> NeonEndpoint | None:
if not self.project.check_limit_endpoints():
return None
return NeonEndpoint(
self.project,
self.neon_api.create_endpoint(self.project_id, self.id, "read_only", {})["endpoint"],
@@ -151,11 +153,26 @@ class NeonBranch:
return
self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"])
parent_id: str = res["branch"]["parent_id"]
# XXX Retry get parent details to work around the issue
# https://databricks.atlassian.net/browse/LKB-279
target_time = datetime.now() + timedelta(seconds=30)
while datetime.now() < target_time:
try:
parent_def = self.neon_api.get_branch_details(self.project_id, parent_id)
except HTTPError as he:
if he.response.status_code == 404:
log.info("Branch not found, waiting...")
time.sleep(1)
else:
raise HTTPError(he) from he
else:
break
else:
raise RuntimeError(f"Branch {parent_id} not found")
# Creates an object for the parent branch
# After the reset operation a new parent branch is created
parent = NeonBranch(
self.project, self.neon_api.get_branch_details(self.project_id, parent_id), True
)
parent = NeonBranch(self.project, parent_def, True)
self.project.branches[parent_id] = parent
self.parent = parent
parent.children[self.id] = self
@@ -168,29 +185,21 @@ class NeonBranch:
source_timestamp: str | None = None,
preserve_under_name: str | None = None,
) -> dict[str, Any] | None:
if not self.project.check_limit_branches():
return None
endpoints = [ep for ep in self.endpoints.values() if ep.type == "read_only"]
# Terminate all the benchmarks running to prevent errors. Errors in benchmark during pgbench are expected
for ep in endpoints:
ep.terminate_benchmark()
self.terminate_benchmark()
try:
res: dict[str, Any] = self.neon_api.restore_branch(
self.project_id,
self.id,
source_branch_id,
source_lsn,
source_timestamp,
preserve_under_name,
)
except HTTPError as he:
if (
he.response.status_code == 422
and he.response.json()["code"] == "BRANCHES_LIMIT_EXCEEDED"
):
log.info("Branch limit exceeded, skipping")
return None
else:
raise HTTPError(he) from he
res: dict[str, Any] = self.neon_api.restore_branch(
self.project_id,
self.id,
source_branch_id,
source_lsn,
source_timestamp,
preserve_under_name,
)
self.project.wait()
self.start_benchmark()
for ep in endpoints:
@@ -239,19 +248,30 @@ class NeonProject:
def delete(self) -> None:
self.neon_api.delete_project(self.id)
def check_limit_branches(self) -> bool:
if self.limits["max_branches"] == -1 or len(self.branches) < self.limits["max_branches"]:
return True
log.info("branch limit exceeded (%s/%s)", len(self.branches), self.limits["max_branches"])
return False
def check_limit_endpoints(self) -> bool:
if (
self.limits["max_read_only_endpoints"] == -1
or self.read_only_endpoints_total < self.limits["max_read_only_endpoints"]
):
return True
log.info(
"Maximum read only endpoint limit exceeded (%s/%s)",
self.read_only_endpoints_total,
self.limits["max_read_only_endpoints"],
)
return False
def create_branch(self, parent_id: str | None = None) -> NeonBranch | None:
self.wait()
try:
branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id)
except HTTPError as he:
if (
he.response.status_code == 422
and he.response.json()["code"] == "BRANCHES_LIMIT_EXCEEDED"
):
log.info("Branch limit exceeded, skipping")
return None
else:
raise HTTPError(he) from he
if not self.check_limit_branches():
return None
branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id)
new_branch = NeonBranch(self, branch_def)
self.wait()
return new_branch
@@ -388,17 +408,9 @@ def do_action(project: NeonProject, action: str) -> bool:
log.info("Action: %s", action)
if action == "new_branch":
log.info("Trying to create a new branch")
if 0 <= project.limits["max_branches"] <= len(project.branches):
log.info(
"Maximum branch limit exceeded (%s of %s)",
len(project.branches),
project.limits["max_branches"],
)
return False
parent = project.branches[
random.choice(list(set(project.branches.keys()) - project.reset_branches))
]
log.info("Parent: %s", parent)
child = parent.create_child_branch()
if child is None:
return False
@@ -413,16 +425,11 @@ def do_action(project: NeonProject, action: str) -> bool:
log.info("Leaf branches not found, skipping")
return False
elif action == "new_ro_endpoint":
if 0 <= project.limits["max_read_only_endpoints"] <= project.read_only_endpoints_total:
log.info(
"Maximum read only endpoint limit exceeded (%s of %s)",
project.read_only_endpoints_total,
project.limits["max_read_only_endpoints"],
)
return False
ep = random.choice(
[br for br in project.branches.values() if br.id not in project.reset_branches]
).create_ro_endpoint()
if ep is None:
return False
log.info("Created the RO endpoint with id %s branch: %s", ep.id, ep.branch.id)
ep.start_benchmark()
elif action == "delete_ro_endpoint":