diff --git a/test_runner/fixtures/neon_api.py b/test_runner/fixtures/neon_api.py index b26bcb286c..8d447c837f 100644 --- a/test_runner/fixtures/neon_api.py +++ b/test_runner/fixtures/neon_api.py @@ -227,6 +227,16 @@ class NeonAPI: ) return cast("dict[str, Any]", resp.json()) + def reset_to_parent(self, project_id: str, branch_id: str) -> dict[str, Any]: + resp = self.__request( + "POST", + f"/projects/{project_id}/branches/{branch_id}/reset_to_parent", + headers={ + "Accept": "application/json", + }, + ) + return cast("dict[str, Any]", resp.json()) + def restore_branch( self, project_id: str, diff --git a/test_runner/random_ops/test_random_ops.py b/test_runner/random_ops/test_random_ops.py index b106e9b729..aae17e2fc4 100644 --- a/test_runner/random_ops/test_random_ops.py +++ b/test_runner/random_ops/test_random_ops.py @@ -96,6 +96,11 @@ class NeonBranch: ) self.benchmark: subprocess.Popen[Any] | None = None self.updated_at: datetime = datetime.fromisoformat(branch["branch"]["updated_at"]) + self.parent_timestamp: datetime = ( + datetime.fromisoformat(branch["branch"]["parent_timestamp"]) + if "parent_timestamp" in branch["branch"] + else datetime.fromtimestamp(0, tz=UTC) + ) self.connect_env: dict[str, str] | None = None if self.connection_parameters: self.connect_env = { @@ -113,8 +118,18 @@ class NeonBranch: """ return f"{self.id}{'(r)' if self.id in self.project.reset_branches else ''}, parent: {self.parent}" - def create_child_branch(self) -> NeonBranch | None: - return self.project.create_branch(self.id) + def random_time(self) -> datetime: + min_time = max( + self.updated_at + timedelta(seconds=1), + self.project.min_time, + self.parent_timestamp + timedelta(seconds=1), + ) + max_time = datetime.now(UTC) - timedelta(seconds=1) + log.info("min_time: %s, max_time: %s", min_time, max_time) + return (min_time + (max_time - min_time) * random.random()).replace(microsecond=0) + + def create_child_branch(self, parent_timestamp: datetime | None = None) -> NeonBranch | None: + return self.project.create_branch(self.id, parent_timestamp) def create_ro_endpoint(self) -> NeonEndpoint | None: if not self.project.check_limit_endpoints(): @@ -136,21 +151,33 @@ class NeonBranch: def terminate_benchmark(self) -> None: self.project.terminate_benchmark(self.id) + def reset_to_parent(self) -> None: + for ep in self.project.endpoints.values(): + if ep.type == "read_only": + ep.terminate_benchmark() + self.terminate_benchmark() + res = self.neon_api.reset_to_parent(self.project_id, self.id) + self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"]) + self.parent_timestamp = datetime.fromisoformat(res["branch"]["parent_timestamp"]) + self.project.wait() + self.start_benchmark() + for ep in self.project.endpoints.values(): + if ep.type == "read_only": + ep.start_benchmark() + def restore_random_time(self) -> None: """ Does PITR, i.e. calls the reset API call on the same branch to the random time in the past """ - min_time = self.updated_at + timedelta(seconds=1) - max_time = datetime.now(UTC) - timedelta(seconds=1) - target_time = (min_time + (max_time - min_time) * random.random()).replace(microsecond=0) res = self.restore( self.id, - source_timestamp=target_time.isoformat().replace("+00:00", "Z"), + source_timestamp=self.random_time().isoformat().replace("+00:00", "Z"), preserve_under_name=self.project.gen_restore_name(), ) if res is None: return self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"]) + self.parent_timestamp = datetime.fromisoformat(res["branch"]["parent_timestamp"]) parent_id: str = res["branch"]["parent_id"] # Creates an object for the parent branch # After the reset operation a new parent branch is created @@ -225,6 +252,7 @@ class NeonProject: self.restart_pgbench_on_console_errors: bool = False self.limits: dict[str, Any] = self.get_limits()["limits"] self.read_only_endpoints_total: int = 0 + self.min_time: datetime = datetime.now(UTC) def get_limits(self) -> dict[str, Any]: return self.neon_api.get_project_limits(self.id) @@ -251,11 +279,20 @@ class NeonProject: ) return False - def create_branch(self, parent_id: str | None = None) -> NeonBranch | None: + def create_branch( + self, parent_id: str | None = None, parent_timestamp: datetime | None = None + ) -> NeonBranch | None: self.wait() if not self.check_limit_branches(): return None - branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id) + if parent_timestamp: + log.info("Timestamp: %s", parent_timestamp) + parent_timestamp_str: str | None = None + if parent_timestamp: + parent_timestamp_str = parent_timestamp.isoformat().replace("+00:00", "Z") + branch_def = self.neon_api.create_branch( + self.id, parent_id=parent_id, parent_timestamp=parent_timestamp_str + ) new_branch = NeonBranch(self, branch_def) self.wait() return new_branch @@ -288,6 +325,14 @@ class NeonProject: if parent.id in self.reset_branches: parent.delete() + def get_random_leaf_branch(self) -> NeonBranch | None: + target: NeonBranch | None = None + if self.leaf_branches: + target = random.choice(list(self.leaf_branches.values())) + else: + log.info("No leaf branches found") + return target + def delete_endpoint(self, endpoint_id: str) -> None: self.terminate_benchmark(endpoint_id) self.neon_api.delete_endpoint(self.id, endpoint_id) @@ -390,24 +435,22 @@ def do_action(project: NeonProject, action: str) -> bool: Runs the action """ log.info("Action: %s", action) - if action == "new_branch": - log.info("Trying to create a new branch") + if action == "new_branch" or action == "new_branch_random_time": + use_random_time: bool = action == "new_branch_random_time" + log.info("Trying to create a new branch %s", "random time" if use_random_time else "") parent = project.branches[ random.choice(list(set(project.branches.keys()) - project.reset_branches)) ] - child = parent.create_child_branch() + child = parent.create_child_branch(parent.random_time() if use_random_time else None) if child is None: return False log.info("Created branch %s", child) child.start_benchmark() elif action == "delete_branch": - if project.leaf_branches: - target: NeonBranch = random.choice(list(project.leaf_branches.values())) - log.info("Trying to delete branch %s", target) - target.delete() - else: - log.info("Leaf branches not found, skipping") + if (target := project.get_random_leaf_branch()) is None: return False + log.info("Trying to delete branch %s", target) + target.delete() elif action == "new_ro_endpoint": ep = random.choice( [br for br in project.branches.values() if br.id not in project.reset_branches] @@ -427,13 +470,15 @@ def do_action(project: NeonProject, action: str) -> bool: target_ep.delete() log.info("endpoint %s deleted", target_ep.id) elif action == "restore_random_time": - if project.leaf_branches: - br: NeonBranch = random.choice(list(project.leaf_branches.values())) - log.info("Restore %s", br) - br.restore_random_time() - else: - log.info("No leaf branches found") + if (target := project.get_random_leaf_branch()) is None: return False + log.info("Restore %s", target) + target.restore_random_time() + elif action == "reset_to_parent": + if (target := project.get_random_leaf_branch()) is None: + return False + log.info("Reset to parent %s", target) + target.reset_to_parent() else: raise ValueError(f"The action {action} is unknown") return True @@ -460,17 +505,22 @@ def test_api_random( pg_bin, project = setup_class # Here we can assign weights ACTIONS = ( - ("new_branch", 1.5), + ("new_branch", 1.2), + ("new_branch_random_time", 0.5), ("new_ro_endpoint", 1.4), ("delete_ro_endpoint", 0.8), - ("delete_branch", 1.0), - ("restore_random_time", 1.2), + ("delete_branch", 1.2), + ("restore_random_time", 0.9), + ("reset_to_parent", 0.3), ) if num_ops_env := os.getenv("NUM_OPERATIONS"): num_operations = int(num_ops_env) else: num_operations = 250 pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env) + # To not go to the past where pgbench tables do not exist + time.sleep(1) + project.min_time = datetime.now(UTC) for _ in range(num_operations): log.info("Starting action #%s", _ + 1) while not do_action(