mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 05:30:37 +00:00
Compare commits
69 Commits
ci-run/pr-
...
amasterov/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f26987deef | ||
|
|
7c2022c1b5 | ||
|
|
c233deb1c2 | ||
|
|
d550b67c5f | ||
|
|
2ca2b05ab5 | ||
|
|
5e1057b860 | ||
|
|
bb6127f495 | ||
|
|
a41c00e7c1 | ||
|
|
76832488d0 | ||
|
|
3ce2c15c10 | ||
|
|
1c3f49e231 | ||
|
|
b982cf6c84 | ||
|
|
b1b23cdc8e | ||
|
|
556e9cb781 | ||
|
|
8edea1dea3 | ||
|
|
f5a553a8e5 | ||
|
|
7423c393c6 | ||
|
|
c3a7158e62 | ||
|
|
848dcd7540 | ||
|
|
783dfe3cce | ||
|
|
cdc2ea110f | ||
|
|
c7e1183da4 | ||
|
|
6763925a4d | ||
|
|
3bcdbe30f1 | ||
|
|
22975426b7 | ||
|
|
31c6f66a49 | ||
|
|
287e01fdf9 | ||
|
|
91c81cc5e5 | ||
|
|
a8354b0aa3 | ||
|
|
1102e2aff0 | ||
|
|
f6a61c9492 | ||
|
|
cbf8e248fc | ||
|
|
f0f30076cc | ||
|
|
42544cf145 | ||
|
|
28b25092ad | ||
|
|
b77a1fae04 | ||
|
|
73ed7ade70 | ||
|
|
74626b94a8 | ||
|
|
4ca6d8cecf | ||
|
|
bf0be50df9 | ||
|
|
1adc95758e | ||
|
|
03e994f9c7 | ||
|
|
f0671c996e | ||
|
|
829cb5fe59 | ||
|
|
561083524d | ||
|
|
009303e31f | ||
|
|
0e42cac589 | ||
|
|
f5cebcaf6a | ||
|
|
5861d0f9b2 | ||
|
|
dbedf11191 | ||
|
|
1e20c4f2b2 | ||
|
|
018f95115a | ||
|
|
f222256225 | ||
|
|
17b5f5e090 | ||
|
|
9bf5d69c01 | ||
|
|
f816b3d90e | ||
|
|
1ec1a82d3d | ||
|
|
e97c1d2684 | ||
|
|
94cfd3f22e | ||
|
|
f45ea8fe6b | ||
|
|
1443ba65d3 | ||
|
|
185f4de0fe | ||
|
|
efb08f82cd | ||
|
|
c31563f551 | ||
|
|
fd6c2cba01 | ||
|
|
899f4a1e77 | ||
|
|
e95fcfa0d5 | ||
|
|
0ccc649299 | ||
|
|
fe2abf3531 |
@@ -79,6 +79,7 @@ class NeonAPI:
|
||||
elif resp.status_code == 423 and resp.json()["message"] in {
|
||||
"endpoint is in some transitive state, could not suspend",
|
||||
"project already has running conflicting operations, scheduling of new ones is prohibited",
|
||||
"snapshot is in transition",
|
||||
}:
|
||||
retry = True
|
||||
self.retries4xx += 1
|
||||
@@ -355,6 +356,63 @@ class NeonAPI:
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def create_snapshot(
|
||||
self,
|
||||
project_id: str,
|
||||
branch_id: str,
|
||||
lsn: str | None = None,
|
||||
timestamp: str | None = None,
|
||||
name: str | None = None,
|
||||
expires_at: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
params: dict[str, Any] = {
|
||||
"lsn": lsn,
|
||||
"timestamp": timestamp,
|
||||
"name": name,
|
||||
"expires_at": expires_at,
|
||||
}
|
||||
params = {key: value for key, value in params.items() if value is not None}
|
||||
resp = self.__request(
|
||||
"POST",
|
||||
f"/projects/{project_id}/branches/{branch_id}/snapshot",
|
||||
params=params,
|
||||
json={},
|
||||
headers={
|
||||
"Accept": "application/json",
|
||||
},
|
||||
)
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def delete_snapshot(self, project_id: str, snapshot_id: str) -> dict[str, Any]:
|
||||
resp = self.__request("DELETE", f"/projects/{project_id}/snapshots/{snapshot_id}")
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def restore_snapshot(
|
||||
self,
|
||||
project_id: str,
|
||||
snapshot_id: str,
|
||||
target_branch_id: str,
|
||||
name: str | None = None,
|
||||
finalize_restore: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
data: dict[str, Any] = {
|
||||
"target_branch_id": target_branch_id,
|
||||
"finalize_restore": finalize_restore,
|
||||
}
|
||||
if name is not None:
|
||||
data["name"] = name
|
||||
log.info("Restore snapshot data: %s", data)
|
||||
resp = self.__request(
|
||||
"POST",
|
||||
f"/projects/{project_id}/snapshots/{snapshot_id}/restore",
|
||||
json=data,
|
||||
headers={
|
||||
"Accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def delete_endpoint(self, project_id: str, endpoint_id: str) -> dict[str, Any]:
|
||||
resp = self.__request("DELETE", f"/projects/{project_id}/endpoints/{endpoint_id}")
|
||||
return cast("dict[str,Any]", resp.json())
|
||||
@@ -396,6 +454,14 @@ class NeonAPI:
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_branch_endpoints(self, project_id: str, branch_id: str) -> dict[str, Any]:
|
||||
resp = self.__request(
|
||||
"GET",
|
||||
f"/projects/{project_id}/branches/{branch_id}/endpoints",
|
||||
headers={"Accept": "application/json", "Content-Type": "application/json"},
|
||||
)
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_endpoints(self, project_id: str) -> dict[str, Any]:
|
||||
resp = self.__request(
|
||||
"GET",
|
||||
|
||||
@@ -11,6 +11,7 @@ import time
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
|
||||
@@ -22,6 +23,29 @@ if TYPE_CHECKING:
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
class NeonSnapshot:
|
||||
"""
|
||||
A snapshot of the Neon Branch
|
||||
Gets the output of the API call af a snapshot creation
|
||||
"""
|
||||
|
||||
def __init__(self, project: NeonProject, snapshot: dict[str, Any]):
|
||||
self.project: NeonProject = project
|
||||
snapshot = snapshot["snapshot"]
|
||||
self.id: str = snapshot["id"]
|
||||
self.name: str = snapshot["name"]
|
||||
self.created_at: datetime = datetime.fromisoformat(snapshot["created_at"])
|
||||
self.source_branch: NeonBranch = project.branches[snapshot["source_branch_id"]]
|
||||
project.snapshots[self.id] = self
|
||||
self.restored: bool = False
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"id: {self.id}, name: {self.name}, created_at: {self.created_at}"
|
||||
|
||||
def delete(self) -> None:
|
||||
self.project.delete_snapshot(self.id)
|
||||
|
||||
|
||||
class NeonEndpoint:
|
||||
"""
|
||||
Neon Endpoint
|
||||
@@ -70,6 +94,12 @@ class NeonBranch:
|
||||
def __init__(self, project, branch: dict[str, Any], is_reset=False):
|
||||
self.id: str = branch["branch"]["id"]
|
||||
self.desc = branch
|
||||
self.name: str | None = None
|
||||
if "name" in branch["branch"]:
|
||||
self.name = branch["branch"]["name"]
|
||||
self.restored_from: str | None = None
|
||||
if "restored_from" in branch["branch"]:
|
||||
self.restored_from = branch["branch"]["restored_from"]
|
||||
self.project: NeonProject = project
|
||||
self.neon_api: NeonAPI = project.neon_api
|
||||
self.project_id: str = branch["branch"]["project_id"]
|
||||
@@ -113,10 +143,9 @@ class NeonBranch:
|
||||
|
||||
def __str__(self):
|
||||
"""
|
||||
Prints the branch's name with all the predecessors
|
||||
(r) means the branch is a reset one
|
||||
Prints the branch's information with all the predecessors
|
||||
"""
|
||||
return f"{self.id}{'(r)' if self.id in self.project.reset_branches else ''}, parent: {self.parent}"
|
||||
return f"{self.id}{f'({self.name})' if self.name and self.name != self.id else ''}{f'(restored_from: {self.restored_from})' if self.restored_from else ''}, parent: {self.parent}"
|
||||
|
||||
def random_time(self) -> datetime:
|
||||
min_time = max(
|
||||
@@ -152,6 +181,9 @@ class NeonBranch:
|
||||
self.project.terminate_benchmark(self.id)
|
||||
|
||||
def reset_to_parent(self) -> None:
|
||||
"""
|
||||
Resets the branch to the parent branch
|
||||
"""
|
||||
for ep in self.project.endpoints.values():
|
||||
if ep.type == "read_only":
|
||||
ep.terminate_benchmark()
|
||||
@@ -240,6 +272,7 @@ class NeonProject:
|
||||
# Leaf branches are the branches, which do not have children
|
||||
self.leaf_branches: dict[str, NeonBranch] = {}
|
||||
self.branches: dict[str, NeonBranch] = {}
|
||||
self.branch_num: int = 0
|
||||
self.reset_branches: set[str] = set()
|
||||
self.main_branch: NeonBranch = NeonBranch(self, proj)
|
||||
self.main_branch.connection_parameters = self.connection_parameters
|
||||
@@ -253,6 +286,8 @@ class NeonProject:
|
||||
self.limits: dict[str, Any] = self.get_limits()["limits"]
|
||||
self.read_only_endpoints_total: int = 0
|
||||
self.min_time: datetime = datetime.now(UTC)
|
||||
self.snapshots: dict[str, NeonSnapshot] = {}
|
||||
self.snapshot_num: int = 0
|
||||
|
||||
def get_limits(self) -> dict[str, Any]:
|
||||
return self.neon_api.get_project_limits(self.id)
|
||||
@@ -280,7 +315,10 @@ class NeonProject:
|
||||
return False
|
||||
|
||||
def create_branch(
|
||||
self, parent_id: str | None = None, parent_timestamp: datetime | None = None
|
||||
self,
|
||||
parent_id: str | None = None,
|
||||
parent_timestamp: datetime | None = None,
|
||||
is_reset: bool = False,
|
||||
) -> NeonBranch | None:
|
||||
self.wait()
|
||||
if not self.check_limit_branches():
|
||||
@@ -293,14 +331,14 @@ class NeonProject:
|
||||
branch_def = self.neon_api.create_branch(
|
||||
self.id, parent_id=parent_id, parent_timestamp=parent_timestamp_str
|
||||
)
|
||||
new_branch = NeonBranch(self, branch_def)
|
||||
new_branch = NeonBranch(self, branch_def, is_reset)
|
||||
self.wait()
|
||||
return new_branch
|
||||
|
||||
def delete_branch(self, branch_id: str) -> None:
|
||||
parent = self.branches[branch_id].parent
|
||||
if not parent or branch_id == self.main_branch.id:
|
||||
raise RuntimeError("Cannot delete the main branch")
|
||||
raise RuntimeError("Cannot delete the main branch or a branch restored from a snapshot")
|
||||
if branch_id not in self.leaf_branches and branch_id not in self.reset_branches:
|
||||
raise RuntimeError(f"The branch {branch_id}, probably, has ancestors")
|
||||
if branch_id not in self.branches:
|
||||
@@ -313,7 +351,7 @@ class NeonProject:
|
||||
if branch_id not in self.reset_branches:
|
||||
self.terminate_benchmark(branch_id)
|
||||
self.neon_api.delete_branch(self.id, branch_id)
|
||||
if len(parent.children) == 1 and parent.id != self.main_branch.id:
|
||||
if len(parent.children) == 1 and parent.parent is not None:
|
||||
self.leaf_branches[parent.id] = parent
|
||||
parent.children.pop(branch_id)
|
||||
if branch_id in self.leaf_branches:
|
||||
@@ -333,6 +371,22 @@ class NeonProject:
|
||||
log.info("No leaf branches found")
|
||||
return target
|
||||
|
||||
def get_random_parent_branch(self) -> NeonBranch:
|
||||
return self.branches[random.choice(list(set(self.branches.keys()) - self.reset_branches))]
|
||||
|
||||
def gen_branch_name(self) -> str:
|
||||
self.branch_num += 1
|
||||
return f"branch{self.branch_num}"
|
||||
|
||||
def get_random_snapshot(self) -> NeonSnapshot | None:
|
||||
snapshot: NeonSnapshot | None = None
|
||||
avail_snapshots = [sn for sn in self.snapshots.values() if not sn.restored]
|
||||
if avail_snapshots:
|
||||
snapshot = random.choice(avail_snapshots)
|
||||
else:
|
||||
log.info("No snapshots found")
|
||||
return snapshot
|
||||
|
||||
def delete_endpoint(self, endpoint_id: str) -> None:
|
||||
self.terminate_benchmark(endpoint_id)
|
||||
self.neon_api.delete_endpoint(self.id, endpoint_id)
|
||||
@@ -409,6 +463,116 @@ class NeonProject:
|
||||
self.restore_num += 1
|
||||
return f"restore{self.restore_num}"
|
||||
|
||||
def gen_snapshot_name(self) -> str:
|
||||
self.snapshot_num += 1
|
||||
return f"snapshot{self.snapshot_num}"
|
||||
|
||||
def create_snapshot(
|
||||
self,
|
||||
lsn: str | None = None,
|
||||
timestamp: datetime | None = None,
|
||||
) -> NeonSnapshot:
|
||||
"""
|
||||
Create a new Neon snapshot for the current project
|
||||
Two optional arguments: lsn and timestamp are mutually exclusive
|
||||
they instruct to create a snapshot with the specific lns or timestamp
|
||||
"""
|
||||
snapshot_name = self.gen_snapshot_name()
|
||||
with psycopg2.connect(self.connection_uri) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# We will check the value we set now after the snapshot restored to verify consistency
|
||||
cur.execute(
|
||||
f"INSERT INTO sanity_check (name, value) VALUES "
|
||||
f"('snapsot_name', '{snapshot_name}') ON CONFLICT (name) DO UPDATE SET value = EXCLUDED.value"
|
||||
)
|
||||
conn.commit()
|
||||
snapshot = NeonSnapshot(
|
||||
self,
|
||||
self.neon_api.create_snapshot(
|
||||
self.id,
|
||||
self.main_branch.id,
|
||||
lsn,
|
||||
timestamp.isoformat().replace("+00:00", "Z") if timestamp else None,
|
||||
snapshot_name,
|
||||
),
|
||||
)
|
||||
self.wait()
|
||||
# Now we taint the value after the snapshot was taken
|
||||
cur.execute("UPDATE sanity_check SET value = 'tainted' || value")
|
||||
conn.commit()
|
||||
return snapshot
|
||||
|
||||
def delete_snapshot(self, snapshot_id: str) -> None:
|
||||
"""
|
||||
Deletes the snapshot with the given id
|
||||
"""
|
||||
self.wait()
|
||||
self.neon_api.delete_snapshot(self.id, snapshot_id)
|
||||
self.snapshots.pop(snapshot_id)
|
||||
self.wait()
|
||||
|
||||
def restore_snapshot(self, snapshot_id: str) -> NeonBranch | None:
|
||||
"""
|
||||
Creates a new Neon branch for the current project, then restores the snapshot
|
||||
with the given id
|
||||
"""
|
||||
target_branch = self.get_random_parent_branch().create_child_branch()
|
||||
if not target_branch:
|
||||
return None
|
||||
self.snapshots[snapshot_id].restored = True
|
||||
new_branch_def: dict[str, Any] = self.neon_api.restore_snapshot(
|
||||
self.id,
|
||||
snapshot_id,
|
||||
target_branch.id,
|
||||
self.gen_branch_name(),
|
||||
)
|
||||
self.wait()
|
||||
new_branch_def = self.neon_api.get_branch_details(self.id, new_branch_def["branch"]["id"])
|
||||
# The restored branch will lose the parent afterward, but it has it during the restoration.
|
||||
# So, we delete parent_id
|
||||
new_branch_def["branch"].pop("parent_id")
|
||||
new_branch = NeonBranch(self, new_branch_def)
|
||||
log.info("Restored snapshot to the branch: %s", new_branch)
|
||||
target_branch_def = self.neon_api.get_branch_details(self.id, target_branch.id)
|
||||
if "name" in target_branch_def["branch"]:
|
||||
target_branch.name = target_branch_def["branch"]["name"]
|
||||
if new_branch.connection_parameters is None:
|
||||
if not new_branch.endpoints:
|
||||
for ep in self.neon_api.get_branch_endpoints(self.id, new_branch.id)["endpoints"]:
|
||||
if ep["id"] not in self.endpoints:
|
||||
NeonEndpoint(self, ep)
|
||||
new_branch.connection_parameters = self.connection_parameters.copy()
|
||||
for ep in new_branch.endpoints.values():
|
||||
if ep.type == "read_write":
|
||||
new_branch.connection_parameters["host"] = ep.host
|
||||
break
|
||||
new_branch.connect_env = {
|
||||
"PGHOST": new_branch.connection_parameters["host"],
|
||||
"PGUSER": new_branch.connection_parameters["role"],
|
||||
"PGDATABASE": new_branch.connection_parameters["database"],
|
||||
"PGPASSWORD": new_branch.connection_parameters["password"],
|
||||
"PGSSLMODE": "require",
|
||||
}
|
||||
with psycopg2.connect(
|
||||
host=new_branch.connection_parameters["host"],
|
||||
port=5432,
|
||||
user=new_branch.connection_parameters["role"],
|
||||
password=new_branch.connection_parameters["password"],
|
||||
database=new_branch.connection_parameters["database"],
|
||||
) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT value FROM sanity_check WHERE name = 'snapsot_name'")
|
||||
snapshot_name = None
|
||||
if row := cur.fetchone():
|
||||
snapshot_name = row[0]
|
||||
# We verify here that the value we select from the table matches with the snapshot name
|
||||
# To ensure consistency
|
||||
assert snapshot_name == self.snapshots[snapshot_id].name
|
||||
self.wait()
|
||||
target_branch.start_benchmark()
|
||||
new_branch.start_benchmark()
|
||||
return new_branch
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def setup_class(
|
||||
@@ -438,9 +602,7 @@ def do_action(project: NeonProject, action: str) -> bool:
|
||||
if action == "new_branch" or action == "new_branch_random_time":
|
||||
use_random_time: bool = action == "new_branch_random_time"
|
||||
log.info("Trying to create a new branch %s", "random time" if use_random_time else "")
|
||||
parent = project.branches[
|
||||
random.choice(list(set(project.branches.keys()) - project.reset_branches))
|
||||
]
|
||||
parent = project.get_random_parent_branch()
|
||||
child = parent.create_child_branch(parent.random_time() if use_random_time else None)
|
||||
if child is None:
|
||||
return False
|
||||
@@ -479,6 +641,23 @@ def do_action(project: NeonProject, action: str) -> bool:
|
||||
return False
|
||||
log.info("Reset to parent %s", target)
|
||||
target.reset_to_parent()
|
||||
elif action == "create_snapshot":
|
||||
snapshot = project.create_snapshot()
|
||||
if snapshot is None:
|
||||
return False
|
||||
log.info("Created snapshot %s", snapshot)
|
||||
elif action == "restore_snapshot":
|
||||
if (snapshot_to_restore := project.get_random_snapshot()) is None:
|
||||
return False
|
||||
log.info("Restoring snapshot %s", snapshot_to_restore)
|
||||
if project.restore_snapshot(snapshot_to_restore.id) is None:
|
||||
return False
|
||||
elif action == "delete_snapshot":
|
||||
snapshot_to_delete = project.get_random_snapshot()
|
||||
if snapshot_to_delete is None:
|
||||
return False
|
||||
snapshot_to_delete.delete()
|
||||
log.info("Deleted snapshot %s", snapshot_to_delete)
|
||||
else:
|
||||
raise ValueError(f"The action {action} is unknown")
|
||||
return True
|
||||
@@ -512,12 +691,28 @@ def test_api_random(
|
||||
("delete_branch", 1.2),
|
||||
("restore_random_time", 0.9),
|
||||
("reset_to_parent", 0.3),
|
||||
("create_snapshot", 0.2),
|
||||
("restore_snapshot", 0.1),
|
||||
("delete_snapshot", 0.1),
|
||||
)
|
||||
if num_ops_env := os.getenv("NUM_OPERATIONS"):
|
||||
num_operations = int(num_ops_env)
|
||||
else:
|
||||
num_operations = 250
|
||||
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env)
|
||||
# Create a table for sanity check
|
||||
# We are going to leve some control values there to check, e.g., after restoring a snapshot
|
||||
pg_bin.run(
|
||||
[
|
||||
"psql",
|
||||
"-c",
|
||||
"CREATE TABLE IF NOT EXISTS sanity_check (name VARCHAR NOT NULL PRIMARY KEY, value VARCHAR)",
|
||||
],
|
||||
env=project.main_branch.connect_env,
|
||||
)
|
||||
# To not go to the past where pgbench tables do not exist
|
||||
time.sleep(1)
|
||||
project.min_time = datetime.now(UTC)
|
||||
# To not go to the past where pgbench tables do not exist
|
||||
time.sleep(1)
|
||||
project.min_time = datetime.now(UTC)
|
||||
|
||||
Reference in New Issue
Block a user