diff --git a/.github/workflows/random-ops-test.yml b/.github/workflows/random-ops-test.yml new file mode 100644 index 0000000000..7c19537744 --- /dev/null +++ b/.github/workflows/random-ops-test.yml @@ -0,0 +1,93 @@ +name: Random Operations Test + +on: + schedule: + # * is a special character in YAML so you have to quote this string + # ┌───────────── minute (0 - 59) + # │ ┌───────────── hour (0 - 23) + # │ │ ┌───────────── day of the month (1 - 31) + # │ │ │ ┌───────────── month (1 - 12 or JAN-DEC) + # │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT) + - cron: '23 */2 * * *' # runs every 2 hours + workflow_dispatch: + inputs: + random_seed: + type: number + description: 'The random seed' + required: false + default: 0 + num_operations: + type: number + description: "The number of operations to test" + default: 250 + +defaults: + run: + shell: bash -euxo pipefail {0} + +permissions: {} + +env: + DEFAULT_PG_VERSION: 16 + PLATFORM: neon-captest-new + AWS_DEFAULT_REGION: eu-central-1 + +jobs: + run-random-rests: + env: + POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install + runs-on: small + permissions: + id-token: write + statuses: write + + strategy: + fail-fast: false + matrix: + pg-version: [16, 17] + + container: + image: ghcr.io/neondatabase/build-tools:pinned-bookworm + credentials: + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + options: --init + steps: + - name: Harden the runner (Audit all outbound calls) + uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0 + with: + egress-policy: audit + + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + + - name: Download Neon artifact + uses: ./.github/actions/download + with: + name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact + path: /tmp/neon/ + prefix: latest + aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} + + - name: Run tests + uses: ./.github/actions/run-python-test-set + with: + build_type: remote + test_selection: random_ops + run_in_parallel: false + extra_params: -m remote_cluster + pg_version: ${{ matrix.pg-version }} + aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} + env: + NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }} + RANDOM_SEED: ${{ inputs.random_seed }} + NUM_OPERATIONS: ${{ inputs.num_operations }} + + - name: Create Allure report + if: ${{ !cancelled() }} + id: create-allure-report + uses: ./.github/actions/allure-report-generate + with: + store-test-results-into-db: true + aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }} + env: + REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }} diff --git a/test_runner/fixtures/neon_api.py b/test_runner/fixtures/neon_api.py index df80f0683c..0cf5945458 100644 --- a/test_runner/fixtures/neon_api.py +++ b/test_runner/fixtures/neon_api.py @@ -22,19 +22,62 @@ def connection_parameters_to_env(params: dict[str, str]) -> dict[str, str]: } +# Some API calls not yet implemented. +# You may want to copy not-yet-implemented methods from the PR https://github.com/neondatabase/neon/pull/11305 class NeonAPI: def __init__(self, neon_api_key: str, neon_api_base_url: str): self.__neon_api_key = neon_api_key self.__neon_api_base_url = neon_api_base_url.strip("/") + self.retry_if_possible = False + self.attempts = 10 + self.sleep_before_retry = 1 + self.retries524 = 0 + self.retries4xx = 0 def __request(self, method: str | bytes, endpoint: str, **kwargs: Any) -> requests.Response: - if "headers" not in kwargs: - kwargs["headers"] = {} + kwargs["headers"] = kwargs.get("headers", {}) kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}" - resp = requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs) - log.debug("%s %s returned a %d: %s", method, endpoint, resp.status_code, resp.text) - resp.raise_for_status() + for attempt in range(self.attempts): + retry = False + resp = requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs) + if resp.status_code >= 400: + log.error( + "%s %s returned a %d: %s", + method, + endpoint, + resp.status_code, + resp.text if resp.status_code != 524 else "CloudFlare error page", + ) + else: + log.debug("%s %s returned a %d: %s", method, endpoint, resp.status_code, resp.text) + if not self.retry_if_possible: + resp.raise_for_status() + break + elif resp.status_code >= 400: + if resp.status_code == 422: + if resp.json()["message"] == "branch not ready yet": + retry = True + self.retries4xx += 1 + elif resp.status_code == 423 and resp.json()["message"] in { + "endpoint is in some transitive state, could not suspend", + "project already has running conflicting operations, scheduling of new ones is prohibited", + }: + retry = True + self.retries4xx += 1 + elif resp.status_code == 524: + log.info("The request was timed out, trying to get operations") + retry = True + self.retries524 += 1 + if retry: + log.info("Retrying, attempt %s/%s", attempt + 1, self.attempts) + time.sleep(self.sleep_before_retry) + continue + else: + resp.raise_for_status() + break + else: + raise RuntimeError("Max retry count is reached") return resp @@ -101,6 +144,96 @@ class NeonAPI: return cast("dict[str, Any]", resp.json()) + def create_branch( + self, + project_id: str, + branch_name: str | None = None, + parent_id: str | None = None, + parent_lsn: str | None = None, + parent_timestamp: str | None = None, + protected: bool | None = None, + archived: bool | None = None, + init_source: str | None = None, + add_endpoint=True, + ) -> dict[str, Any]: + data: dict[str, Any] = {} + if add_endpoint: + data["endpoints"] = [{"type": "read_write"}] + data["branch"] = {} + if parent_id: + data["branch"]["parent_id"] = parent_id + if branch_name: + data["branch"]["name"] = branch_name + if parent_lsn is not None: + data["branch"]["parent_lsn"] = parent_lsn + if parent_timestamp is not None: + data["branch"]["parent_timestamp"] = parent_timestamp + if protected is not None: + data["branch"]["protected"] = protected + if init_source is not None: + data["branch"]["init_source"] = init_source + if archived is not None: + data["branch"]["archived"] = archived + if not data["branch"]: + data.pop("branch") + resp = self.__request( + "POST", + f"/projects/{project_id}/branches", + headers={ + "Accept": "application/json", + "Content-Type": "application/json", + }, + json=data, + ) + return cast("dict[str, Any]", resp.json()) + + def get_branch_details(self, project_id: str, branch_id: str) -> dict[str, Any]: + resp = self.__request( + "GET", + f"/projects/{project_id}/branches/{branch_id}", + headers={ + "Accept": "application/json", + }, + ) + return cast("dict[str, Any]", resp.json()) + + def delete_branch(self, project_id: str, branch_id: str) -> dict[str, Any]: + resp = self.__request( + "DELETE", + f"/projects/{project_id}/branches/{branch_id}", + headers={ + "Accept": "application/json", + }, + ) + return cast("dict[str, Any]", resp.json()) + + def restore_branch( + self, + project_id: str, + branch_id: str, + source_branch_id: str, + source_lsn: str | None, + source_timestamp: str | None, + preserve_under_name: str | None, + ): + data = {"source_branch_id": source_branch_id} + if source_lsn: + data["source_lsn"] = source_lsn + if source_timestamp: + data["source_timestamp"] = source_timestamp + if preserve_under_name: + data["preserve_under_name"] = preserve_under_name + log.info("Data: %s", data) + resp = self.__request( + "POST", + f"/projects/{project_id}/branches/{branch_id}/restore", + headers={ + "Accept": "application/json", + }, + json=data, + ) + return cast("dict[str, Any]", resp.json()) + def start_endpoint( self, project_id: str, @@ -176,6 +309,10 @@ class NeonAPI: return cast("dict[str, Any]", resp.json()) + def delete_endpoint(self, project_id: str, endpoint_id: str) -> dict[str, Any]: + resp = self.__request("DELETE", f"/projects/{project_id}/endpoints/{endpoint_id}") + return cast("dict[str,Any]", resp.json()) + def get_connection_uri( self, project_id: str, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index e70ddc8e66..db2b68d082 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3185,6 +3185,7 @@ class PgBin: command: list[str], env: Env | None = None, cwd: str | Path | None = None, + stderr_pipe: Any | None = None, ) -> subprocess.Popen[Any]: """ Run one of the postgres binaries, not waiting for it to finish @@ -3202,7 +3203,9 @@ class PgBin: log.info(f"Running command '{' '.join(command)}'") env = self._build_env(env) self._log_env(env) - return subprocess.Popen(command, env=env, cwd=cwd, stdout=subprocess.PIPE, text=True) + return subprocess.Popen( + command, env=env, cwd=cwd, stdout=subprocess.PIPE, stderr=stderr_pipe, text=True + ) def run( self, diff --git a/test_runner/random_ops/README.md b/test_runner/random_ops/README.md new file mode 100644 index 0000000000..45163422a8 --- /dev/null +++ b/test_runner/random_ops/README.md @@ -0,0 +1,93 @@ +# Random Operations Test for Neon Stability + +## Problem Statement + +Neon needs robust testing of Neon's stability to ensure reliability for users. The random operations test addresses this by continuously exercising the API with unpredictable sequences of operations, helping to identify edge cases and potential issues that might not be caught by deterministic tests. + +### Key Components + +#### 1. Class Structure + +The test implements three main classes to model the Neon architecture: + +- **NeonProject**: Represents a Neon project and manages the lifecycle of branches and endpoints +- **NeonBranch**: Represents a branch within a project, with methods for creating child branches, endpoints, and performing point-in-time restores +- **NeonEndpoint**: Represents an endpoint (connection point) for a branch, with methods for managing benchmarks + +#### 2. Operations Tested + +The test randomly performs the following operations with weighted probabilities: + +- **Creating branches** +- **Deleting branches** +- **Adding read-only endpoints** +- **Deleting read-only endpoints** +- **Restoring branches to random points in time** + +#### 3. Load Generation + +Each branch and endpoint is loaded with `pgbench` to simulate real database workloads during testing. This ensures that the operations are performed against branches with actual data and ongoing transactions. + +#### 4. Error Handling + +The test includes robust error handling for various scenarios: +- Branch limit exceeded +- Connection timeouts +- Control plane timeouts (HTTP 524 errors) +- Benchmark failures + +#### 5. CI Integration + +The test is integrated into the CI pipeline via a GitHub workflow that runs daily, ensuring continuous validation of API stability. + +## How It Works + +1. The test creates a Neon project using the Public API +2. It initializes the main branch with pgbench data +3. It performs random operations according to the weighted probabilities +4. During each operation, it checks that all running benchmarks are still operational +5. The test cleans up by deleting the project at the end + +## Configuration + +The test can be configured with: +- `RANDOM_SEED`: Set a specific random seed for reproducible test runs +- `NEON_API_KEY`: API key for authentication +- `NEON_API_BASE_URL`: Base URL for the API (defaults to staging environment) +- `NUM_OPERATIONS`: The number of operations to be performed + +## Running the Test + +The test is designed to run in the CI environment but can also be executed locally: + +```bash +NEON_API_KEY=your_api_key ./scripts/pytest test_runner/random_ops/test_random_ops.py -m remote_cluster +``` + +To run with a specific random seed for reproducibility: + +```bash +RANDOM_SEED=12345 NEON_API_KEY=your_api_key ./scripts/pytest test_runner/random_ops/test_random_ops.py -m remote_cluster +``` + +To run with the custom number of operations: + +```bash +NUM_OPERATIONS=500 NEON_API_KEY=your_api_key ./scripts/pytest test_runner/random_ops/test_random_ops.py -m remote_cluster +``` + +## Benefits + +This test provides several key benefits: +1. **Comprehensive API testing**: Exercises multiple API endpoints in combination +2. **Edge case discovery**: Random sequences may uncover issues not found in deterministic tests +3. **Stability validation**: Continuous execution helps ensure long-term API reliability +4. **Regression prevention**: Detects if new changes break existing API functionality + +## Future Improvements + +Potential enhancements to the test could include: +1. Adding more API operations, e.g. `reset_to_parent`, `snapshot`, etc +2. Implementing more sophisticated load patterns +3. Adding metrics collection to measure API performance +4. Extending test duration for longer-term stability validation \ No newline at end of file diff --git a/test_runner/random_ops/test_random_ops.py b/test_runner/random_ops/test_random_ops.py new file mode 100644 index 0000000000..b3078ecac1 --- /dev/null +++ b/test_runner/random_ops/test_random_ops.py @@ -0,0 +1,463 @@ +""" +Run the random API tests on the cloud instance of Neon +""" + +from __future__ import annotations + +import os +import random +import subprocess +import time +from datetime import UTC, datetime, timedelta +from typing import TYPE_CHECKING, Any + +import pytest +from fixtures.log_helper import log +from requests import HTTPError + +if TYPE_CHECKING: + from pathlib import Path + + from fixtures.neon_api import NeonAPI + from fixtures.neon_fixtures import PgBin + from fixtures.pg_version import PgVersion + + +class NeonEndpoint: + """ + Neon Endpoint + Gets the output of the API call of an endpoint creation + """ + + def __init__(self, project: NeonProject, endpoint: dict[str, Any]): + self.project: NeonProject = project + self.id: str = endpoint["id"] + # The branch endpoint belongs to + self.branch: NeonBranch = project.branches[endpoint["branch_id"]] + self.type: str = endpoint["type"] + # add itself to the list of endpoints of the branch + self.branch.endpoints[self.id] = self + self.project.endpoints[self.id] = self + self.host: str = endpoint["host"] + self.benchmark: subprocess.Popen[Any] | None = None + # The connection environment is used when running benchmark + self.connect_env: dict[str, str] | None = None + if self.branch.connect_env: + self.connect_env = self.branch.connect_env.copy() + self.connect_env["PGHOST"] = self.host + + def delete(self): + self.project.delete_endpoint(self.id) + + def start_benchmark(self, clients=10): + return self.project.start_benchmark(self.id, clients=clients) + + def check_benchmark(self): + self.project.check_benchmark(self.id) + + def terminate_benchmark(self): + self.project.terminate_benchmark(self.id) + + +class NeonBranch: + """ + Neon Branch + Gets the output of the API call of the Neon Public API call of a branch creation as a first parameter + is_reset defines if the branch is a reset one i.e. created as a result of the reset API Call + """ + + def __init__(self, project, branch: dict[str, Any], is_reset=False): + self.id: str = branch["branch"]["id"] + self.desc = branch + self.project: NeonProject = project + self.neon_api: NeonAPI = project.neon_api + self.project_id: str = branch["branch"]["project_id"] + self.parent: NeonBranch | None = ( + self.project.branches[branch["branch"]["parent_id"]] + if "parent_id" in branch["branch"] + else None + ) + if is_reset: + self.project.reset_branches.add(self.id) + elif self.parent: + self.project.leaf_branches[self.id] = self + if self.parent is not None and self.parent.id in self.project.leaf_branches: + self.project.leaf_branches.pop(self.parent.id) + self.project.branches[self.id] = self + self.children: dict[str, NeonBranch] = {} + if self.parent is not None: + self.parent.children[self.id] = self + self.endpoints: dict[str, NeonEndpoint] = {} + self.connection_parameters: dict[str, str] | None = ( + branch["connection_uris"][0]["connection_parameters"] + if "connection_uris" in branch + else None + ) + self.benchmark: subprocess.Popen[Any] | None = None + self.updated_at: datetime = datetime.fromisoformat(branch["branch"]["updated_at"]) + self.connect_env: dict[str, str] | None = None + if self.connection_parameters: + self.connect_env = { + "PGHOST": self.connection_parameters["host"], + "PGUSER": self.connection_parameters["role"], + "PGDATABASE": self.connection_parameters["database"], + "PGPASSWORD": self.connection_parameters["password"], + "PGSSLMODE": "require", + } + + def __str__(self): + """ + Prints the branch's name with all the predecessors + (r) means the branch is a reset one + """ + return f"{self.id}{'(r)' if self.id in self.project.reset_branches else ''}, parent: {self.parent}" + + def create_child_branch(self) -> NeonBranch | None: + return self.project.create_branch(self.id) + + def create_ro_endpoint(self) -> NeonEndpoint: + return NeonEndpoint( + self.project, + self.neon_api.create_endpoint(self.project_id, self.id, "read_only", {})["endpoint"], + ) + + def delete(self) -> None: + self.project.delete_branch(self.id) + + def start_benchmark(self, clients=10) -> subprocess.Popen[Any]: + return self.project.start_benchmark(self.id, clients=clients) + + def check_benchmark(self) -> None: + self.project.check_benchmark(self.id) + + def terminate_benchmark(self) -> None: + self.project.terminate_benchmark(self.id) + + def restore_random_time(self) -> None: + """ + Does PITR, i.e. calls the reset API call on the same branch to the random time in the past + """ + min_time = self.updated_at + timedelta(seconds=1) + max_time = datetime.now(UTC) - timedelta(seconds=1) + target_time = (min_time + (max_time - min_time) * random.random()).replace(microsecond=0) + res = self.restore( + self.id, + source_timestamp=target_time.isoformat().replace("+00:00", "Z"), + preserve_under_name=self.project.gen_restore_name(), + ) + if res is None: + return + self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"]) + parent_id: str = res["branch"]["parent_id"] + # Creates an object for the parent branch + # After the reset operation a new parent branch is created + parent = NeonBranch( + self.project, self.neon_api.get_branch_details(self.project_id, parent_id), True + ) + self.project.branches[parent_id] = parent + self.parent = parent + parent.children[self.id] = self + self.project.wait() + + def restore( + self, + source_branch_id: str, + source_lsn: str | None = None, + source_timestamp: str | None = None, + preserve_under_name: str | None = None, + ) -> dict[str, Any] | None: + endpoints = [ep for ep in self.endpoints.values() if ep.type == "read_only"] + # Terminate all the benchmarks running to prevent errors. Errors in benchmark during pgbench are expected + for ep in endpoints: + ep.terminate_benchmark() + self.terminate_benchmark() + try: + res: dict[str, Any] = self.neon_api.restore_branch( + self.project_id, + self.id, + source_branch_id, + source_lsn, + source_timestamp, + preserve_under_name, + ) + except HTTPError as he: + if ( + he.response.status_code == 422 + and he.response.json()["code"] == "BRANCHES_LIMIT_EXCEEDED" + ): + log.info("Branch limit exceeded, skipping") + return None + else: + raise HTTPError(he) from he + self.project.wait() + self.start_benchmark() + for ep in endpoints: + ep.start_benchmark() + return res + + +class NeonProject: + """ + The project object + Calls the Public API to create a Neon Project + """ + + def __init__(self, neon_api: NeonAPI, pg_bin: PgBin, pg_version: PgVersion): + self.neon_api = neon_api + self.pg_bin = pg_bin + proj = self.neon_api.create_project( + pg_version, f"Automatic random API test {os.getenv('GITHUB_RUN_ID')}" + ) + self.id: str = proj["project"]["id"] + self.name: str = proj["project"]["name"] + self.connection_uri: str = proj["connection_uris"][0]["connection_uri"] + self.connection_parameters: dict[str, str] = proj["connection_uris"][0][ + "connection_parameters" + ] + self.pg_version: PgVersion = pg_version + # Leaf branches are the branches, which do not have children + self.leaf_branches: dict[str, NeonBranch] = {} + self.branches: dict[str, NeonBranch] = {} + self.reset_branches: set[str] = set() + self.main_branch: NeonBranch = NeonBranch(self, proj) + self.main_branch.connection_parameters = self.connection_parameters + self.endpoints: dict[str, NeonEndpoint] = {} + for endpoint in proj["endpoints"]: + NeonEndpoint(self, endpoint) + self.neon_api.wait_for_operation_to_finish(self.id) + self.benchmarks: dict[str, subprocess.Popen[Any]] = {} + self.restore_num: int = 0 + self.restart_pgbench_on_console_errors: bool = False + + def delete(self): + self.neon_api.delete_project(self.id) + + def create_branch(self, parent_id: str | None = None) -> NeonBranch | None: + self.wait() + try: + branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id) + except HTTPError as he: + if ( + he.response.status_code == 422 + and he.response.json()["code"] == "BRANCHES_LIMIT_EXCEEDED" + ): + log.info("Branch limit exceeded, skipping") + return None + else: + raise HTTPError(he) from he + new_branch = NeonBranch(self, branch_def) + self.wait() + return new_branch + + def delete_branch(self, branch_id: str) -> None: + parent = self.branches[branch_id].parent + if not parent or branch_id == self.main_branch.id: + raise RuntimeError("Cannot delete the main branch") + if branch_id not in self.leaf_branches and branch_id not in self.reset_branches: + raise RuntimeError(f"The branch {branch_id}, probably, has ancestors") + if branch_id not in self.branches: + raise RuntimeError(f"The branch with id {branch_id} is not found") + endpoints_to_delete = [ + ep for ep in self.branches[branch_id].endpoints.values() if ep.type == "read_only" + ] + for ep in endpoints_to_delete: + ep.delete() + if branch_id not in self.reset_branches: + self.terminate_benchmark(branch_id) + self.neon_api.delete_branch(self.id, branch_id) + if len(parent.children) == 1 and parent.id != self.main_branch.id: + self.leaf_branches[parent.id] = parent + parent.children.pop(branch_id) + if branch_id in self.leaf_branches: + self.leaf_branches.pop(branch_id) + else: + self.reset_branches.remove(branch_id) + self.branches.pop(branch_id) + self.wait() + if parent.id in self.reset_branches: + parent.delete() + + def delete_endpoint(self, endpoint_id: str) -> None: + self.terminate_benchmark(endpoint_id) + self.neon_api.delete_endpoint(self.id, endpoint_id) + self.endpoints[endpoint_id].branch.endpoints.pop(endpoint_id) + self.endpoints.pop(endpoint_id) + self.wait() + + def start_benchmark(self, target: str, clients: int = 10) -> subprocess.Popen[Any]: + if target in self.benchmarks: + raise RuntimeError(f"Benchmark was already started for {target}") + is_endpoint = target.startswith("ep") + read_only = is_endpoint and self.endpoints[target].type == "read_only" + cmd = ["pgbench", f"-c{clients}", "-T10800", "-Mprepared"] + if read_only: + cmd.extend(["-S", "-n"]) + target_object = self.endpoints[target] if is_endpoint else self.branches[target] + if target_object.connect_env is None: + raise RuntimeError(f"The connection environment is not defined for {target}") + log.info( + "running pgbench on %s, cmd: %s, host: %s", + target, + cmd, + target_object.connect_env["PGHOST"], + ) + pgbench = self.pg_bin.run_nonblocking( + cmd, env=target_object.connect_env, stderr_pipe=subprocess.PIPE + ) + self.benchmarks[target] = pgbench + target_object.benchmark = pgbench + time.sleep(2) + return pgbench + + def check_all_benchmarks(self) -> None: + for target in tuple(self.benchmarks.keys()): + self.check_benchmark(target) + + def check_benchmark(self, target) -> None: + rc = self.benchmarks[target].poll() + if rc is not None: + _, err = self.benchmarks[target].communicate() + log.error("STDERR: %s", err) + # if the benchmark failed due to irresponsible Control plane, + # just restart it + if self.restart_pgbench_on_console_errors and ( + "ERROR: Couldn't connect to compute node" in err + or "ERROR: Console request failed" in err + ): + log.info("Restarting benchmark for %s", target) + self.benchmarks.pop(target) + self.start_benchmark(target) + return + raise RuntimeError(f"The benchmark for {target} ended with code {rc}") + + def terminate_benchmark(self, target): + log.info("Terminating the benchmark %s", target) + target_endpoint = target.startswith("ep") + self.check_benchmark(target) + self.benchmarks[target].terminate() + self.benchmarks.pop(target) + if target_endpoint: + self.endpoints[target].benchmark = None + else: + self.branches[target].benchmark = None + + def wait(self): + """ + Wait for all the operations to be finished + """ + return self.neon_api.wait_for_operation_to_finish(self.id) + + def gen_restore_name(self): + self.restore_num += 1 + return f"restore{self.restore_num}" + + +@pytest.fixture() +def setup_class( + pg_version: PgVersion, + pg_bin: PgBin, + neon_api: NeonAPI, +): + neon_api.retry_if_possible = True + project = NeonProject(neon_api, pg_bin, pg_version) + log.info("Created a project with id %s, name %s", project.id, project.name) + yield pg_bin, project + log.info("Retried 524 errors: %s", neon_api.retries524) + log.info("Retried 4xx errors: %s", neon_api.retries4xx) + if neon_api.retries524 > 0: + print(f"::warning::Retried on 524 error {neon_api.retries524} times") + if neon_api.retries4xx > 0: + print(f"::warning::Retried on 4xx error {neon_api.retries4xx} times") + log.info("Removing the project") + project.delete() + + +def do_action(project: NeonProject, action: str) -> None: + """ + Runs the action + """ + log.info("Action: %s", action) + if action == "new_branch": + log.info("Trying to create a new branch") + parent = project.branches[ + random.choice(list(set(project.branches.keys()) - project.reset_branches)) + ] + log.info("Parent: %s", parent) + child = parent.create_child_branch() + if child is None: + return + log.info("Created branch %s", child) + child.start_benchmark() + elif action == "delete_branch": + if project.leaf_branches: + target = random.choice(list(project.leaf_branches.values())) + log.info("Trying to delete branch %s", target) + target.delete() + else: + log.info("Leaf branches not found, skipping") + elif action == "new_ro_endpoint": + ep = random.choice( + [br for br in project.branches.values() if br.id not in project.reset_branches] + ).create_ro_endpoint() + log.info("Created the RO endpoint with id %s branch: %s", ep.id, ep.branch.id) + ep.start_benchmark() + elif action == "delete_ro_endpoint": + ro_endpoints: list[NeonEndpoint] = [ + endpoint for endpoint in project.endpoints.values() if endpoint.type == "read_only" + ] + if ro_endpoints: + target_ep: NeonEndpoint = random.choice(ro_endpoints) + target_ep.delete() + log.info("endpoint %s deleted", target_ep.id) + else: + log.info("no read_only endpoints present, skipping") + elif action == "restore_random_time": + if project.leaf_branches: + br: NeonBranch = random.choice(list(project.leaf_branches.values())) + log.info("Restore %s", br) + br.restore_random_time() + else: + log.info("No leaf branches found") + else: + raise ValueError(f"The action {action} is unknown") + + +@pytest.mark.timeout(7200) +@pytest.mark.remote_cluster +def test_api_random( + setup_class, + pg_distrib_dir: Path, + test_output_dir: Path, +): + """ + Run the random API tests + """ + if seed_env := os.getenv("RANDOM_SEED"): + seed = int(seed_env) + else: + seed = 0 + if seed == 0: + seed = int(time.time()) + log.info("Using random seed: %s", seed) + random.seed(seed) + pg_bin, project = setup_class + # Here we can assign weights + ACTIONS = ( + ("new_branch", 1.5), + ("new_ro_endpoint", 1.4), + ("delete_ro_endpoint", 0.8), + ("delete_branch", 1.0), + ("restore_random_time", 1.2), + ) + if num_ops_env := os.getenv("NUM_OPERATIONS"): + num_operations = int(num_ops_env) + else: + num_operations = 250 + pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env) + for _ in range(num_operations): + log.info("Starting action #%s", _ + 1) + do_action( + project, random.choices([a[0] for a in ACTIONS], weights=[w[1] for w in ACTIONS])[0] + ) + project.check_all_benchmarks() + assert True