Add new operations to Random operations test (#12213)

## Problem
We did not test some Public API calls, such as using a timestamp to
create a branch, reset_to_parent.
## Summary of changes
Tests now include some other operations: reset_to_parent, a branch
creation from any time in the past, etc.
Currently, the API calls are only exposed; the semantics are not
verified.

---------

Co-authored-by: Alexey Masterov <alexey.masterov@databricks.com>
This commit is contained in:
a-masterov
2025-07-22 19:43:01 +02:00
committed by GitHub
parent 5b0972151c
commit b3844903e5
2 changed files with 86 additions and 26 deletions

View File

@@ -227,6 +227,16 @@ class NeonAPI:
) )
return cast("dict[str, Any]", resp.json()) return cast("dict[str, Any]", resp.json())
def reset_to_parent(self, project_id: str, branch_id: str) -> dict[str, Any]:
resp = self.__request(
"POST",
f"/projects/{project_id}/branches/{branch_id}/reset_to_parent",
headers={
"Accept": "application/json",
},
)
return cast("dict[str, Any]", resp.json())
def restore_branch( def restore_branch(
self, self,
project_id: str, project_id: str,

View File

@@ -96,6 +96,11 @@ class NeonBranch:
) )
self.benchmark: subprocess.Popen[Any] | None = None self.benchmark: subprocess.Popen[Any] | None = None
self.updated_at: datetime = datetime.fromisoformat(branch["branch"]["updated_at"]) self.updated_at: datetime = datetime.fromisoformat(branch["branch"]["updated_at"])
self.parent_timestamp: datetime = (
datetime.fromisoformat(branch["branch"]["parent_timestamp"])
if "parent_timestamp" in branch["branch"]
else datetime.fromtimestamp(0, tz=UTC)
)
self.connect_env: dict[str, str] | None = None self.connect_env: dict[str, str] | None = None
if self.connection_parameters: if self.connection_parameters:
self.connect_env = { self.connect_env = {
@@ -113,8 +118,18 @@ class NeonBranch:
""" """
return f"{self.id}{'(r)' if self.id in self.project.reset_branches else ''}, parent: {self.parent}" return f"{self.id}{'(r)' if self.id in self.project.reset_branches else ''}, parent: {self.parent}"
def create_child_branch(self) -> NeonBranch | None: def random_time(self) -> datetime:
return self.project.create_branch(self.id) min_time = max(
self.updated_at + timedelta(seconds=1),
self.project.min_time,
self.parent_timestamp + timedelta(seconds=1),
)
max_time = datetime.now(UTC) - timedelta(seconds=1)
log.info("min_time: %s, max_time: %s", min_time, max_time)
return (min_time + (max_time - min_time) * random.random()).replace(microsecond=0)
def create_child_branch(self, parent_timestamp: datetime | None = None) -> NeonBranch | None:
return self.project.create_branch(self.id, parent_timestamp)
def create_ro_endpoint(self) -> NeonEndpoint | None: def create_ro_endpoint(self) -> NeonEndpoint | None:
if not self.project.check_limit_endpoints(): if not self.project.check_limit_endpoints():
@@ -136,21 +151,33 @@ class NeonBranch:
def terminate_benchmark(self) -> None: def terminate_benchmark(self) -> None:
self.project.terminate_benchmark(self.id) self.project.terminate_benchmark(self.id)
def reset_to_parent(self) -> None:
for ep in self.project.endpoints.values():
if ep.type == "read_only":
ep.terminate_benchmark()
self.terminate_benchmark()
res = self.neon_api.reset_to_parent(self.project_id, self.id)
self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"])
self.parent_timestamp = datetime.fromisoformat(res["branch"]["parent_timestamp"])
self.project.wait()
self.start_benchmark()
for ep in self.project.endpoints.values():
if ep.type == "read_only":
ep.start_benchmark()
def restore_random_time(self) -> None: def restore_random_time(self) -> None:
""" """
Does PITR, i.e. calls the reset API call on the same branch to the random time in the past Does PITR, i.e. calls the reset API call on the same branch to the random time in the past
""" """
min_time = self.updated_at + timedelta(seconds=1)
max_time = datetime.now(UTC) - timedelta(seconds=1)
target_time = (min_time + (max_time - min_time) * random.random()).replace(microsecond=0)
res = self.restore( res = self.restore(
self.id, self.id,
source_timestamp=target_time.isoformat().replace("+00:00", "Z"), source_timestamp=self.random_time().isoformat().replace("+00:00", "Z"),
preserve_under_name=self.project.gen_restore_name(), preserve_under_name=self.project.gen_restore_name(),
) )
if res is None: if res is None:
return return
self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"]) self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"])
self.parent_timestamp = datetime.fromisoformat(res["branch"]["parent_timestamp"])
parent_id: str = res["branch"]["parent_id"] parent_id: str = res["branch"]["parent_id"]
# Creates an object for the parent branch # Creates an object for the parent branch
# After the reset operation a new parent branch is created # After the reset operation a new parent branch is created
@@ -225,6 +252,7 @@ class NeonProject:
self.restart_pgbench_on_console_errors: bool = False self.restart_pgbench_on_console_errors: bool = False
self.limits: dict[str, Any] = self.get_limits()["limits"] self.limits: dict[str, Any] = self.get_limits()["limits"]
self.read_only_endpoints_total: int = 0 self.read_only_endpoints_total: int = 0
self.min_time: datetime = datetime.now(UTC)
def get_limits(self) -> dict[str, Any]: def get_limits(self) -> dict[str, Any]:
return self.neon_api.get_project_limits(self.id) return self.neon_api.get_project_limits(self.id)
@@ -251,11 +279,20 @@ class NeonProject:
) )
return False return False
def create_branch(self, parent_id: str | None = None) -> NeonBranch | None: def create_branch(
self, parent_id: str | None = None, parent_timestamp: datetime | None = None
) -> NeonBranch | None:
self.wait() self.wait()
if not self.check_limit_branches(): if not self.check_limit_branches():
return None return None
branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id) if parent_timestamp:
log.info("Timestamp: %s", parent_timestamp)
parent_timestamp_str: str | None = None
if parent_timestamp:
parent_timestamp_str = parent_timestamp.isoformat().replace("+00:00", "Z")
branch_def = self.neon_api.create_branch(
self.id, parent_id=parent_id, parent_timestamp=parent_timestamp_str
)
new_branch = NeonBranch(self, branch_def) new_branch = NeonBranch(self, branch_def)
self.wait() self.wait()
return new_branch return new_branch
@@ -288,6 +325,14 @@ class NeonProject:
if parent.id in self.reset_branches: if parent.id in self.reset_branches:
parent.delete() parent.delete()
def get_random_leaf_branch(self) -> NeonBranch | None:
target: NeonBranch | None = None
if self.leaf_branches:
target = random.choice(list(self.leaf_branches.values()))
else:
log.info("No leaf branches found")
return target
def delete_endpoint(self, endpoint_id: str) -> None: def delete_endpoint(self, endpoint_id: str) -> None:
self.terminate_benchmark(endpoint_id) self.terminate_benchmark(endpoint_id)
self.neon_api.delete_endpoint(self.id, endpoint_id) self.neon_api.delete_endpoint(self.id, endpoint_id)
@@ -390,24 +435,22 @@ def do_action(project: NeonProject, action: str) -> bool:
Runs the action Runs the action
""" """
log.info("Action: %s", action) log.info("Action: %s", action)
if action == "new_branch": if action == "new_branch" or action == "new_branch_random_time":
log.info("Trying to create a new branch") use_random_time: bool = action == "new_branch_random_time"
log.info("Trying to create a new branch %s", "random time" if use_random_time else "")
parent = project.branches[ parent = project.branches[
random.choice(list(set(project.branches.keys()) - project.reset_branches)) random.choice(list(set(project.branches.keys()) - project.reset_branches))
] ]
child = parent.create_child_branch() child = parent.create_child_branch(parent.random_time() if use_random_time else None)
if child is None: if child is None:
return False return False
log.info("Created branch %s", child) log.info("Created branch %s", child)
child.start_benchmark() child.start_benchmark()
elif action == "delete_branch": elif action == "delete_branch":
if project.leaf_branches: if (target := project.get_random_leaf_branch()) is None:
target: NeonBranch = random.choice(list(project.leaf_branches.values()))
log.info("Trying to delete branch %s", target)
target.delete()
else:
log.info("Leaf branches not found, skipping")
return False return False
log.info("Trying to delete branch %s", target)
target.delete()
elif action == "new_ro_endpoint": elif action == "new_ro_endpoint":
ep = random.choice( ep = random.choice(
[br for br in project.branches.values() if br.id not in project.reset_branches] [br for br in project.branches.values() if br.id not in project.reset_branches]
@@ -427,13 +470,15 @@ def do_action(project: NeonProject, action: str) -> bool:
target_ep.delete() target_ep.delete()
log.info("endpoint %s deleted", target_ep.id) log.info("endpoint %s deleted", target_ep.id)
elif action == "restore_random_time": elif action == "restore_random_time":
if project.leaf_branches: if (target := project.get_random_leaf_branch()) is None:
br: NeonBranch = random.choice(list(project.leaf_branches.values()))
log.info("Restore %s", br)
br.restore_random_time()
else:
log.info("No leaf branches found")
return False return False
log.info("Restore %s", target)
target.restore_random_time()
elif action == "reset_to_parent":
if (target := project.get_random_leaf_branch()) is None:
return False
log.info("Reset to parent %s", target)
target.reset_to_parent()
else: else:
raise ValueError(f"The action {action} is unknown") raise ValueError(f"The action {action} is unknown")
return True return True
@@ -460,17 +505,22 @@ def test_api_random(
pg_bin, project = setup_class pg_bin, project = setup_class
# Here we can assign weights # Here we can assign weights
ACTIONS = ( ACTIONS = (
("new_branch", 1.5), ("new_branch", 1.2),
("new_branch_random_time", 0.5),
("new_ro_endpoint", 1.4), ("new_ro_endpoint", 1.4),
("delete_ro_endpoint", 0.8), ("delete_ro_endpoint", 0.8),
("delete_branch", 1.0), ("delete_branch", 1.2),
("restore_random_time", 1.2), ("restore_random_time", 0.9),
("reset_to_parent", 0.3),
) )
if num_ops_env := os.getenv("NUM_OPERATIONS"): if num_ops_env := os.getenv("NUM_OPERATIONS"):
num_operations = int(num_ops_env) num_operations = int(num_ops_env)
else: else:
num_operations = 250 num_operations = 250
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env) pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env)
# To not go to the past where pgbench tables do not exist
time.sleep(1)
project.min_time = datetime.now(UTC)
for _ in range(num_operations): for _ in range(num_operations):
log.info("Starting action #%s", _ + 1) log.info("Starting action #%s", _ + 1)
while not do_action( while not do_action(