The initial implementation of the logical replication

This commit is contained in:
Alexey Masterov
2025-07-31 17:30:23 +02:00
parent f26987deef
commit a14d6a1f0c
2 changed files with 82 additions and 7 deletions

View File

@@ -106,6 +106,7 @@ class NeonAPI:
branch_name: str | None = None,
branch_role_name: str | None = None,
branch_database_name: str | None = None,
project_settings: dict[str, Any] | None = None,
) -> dict[str, Any]:
data: dict[str, Any] = {
"project": {
@@ -122,6 +123,8 @@ class NeonAPI:
data["project"]["branch"]["role_name"] = branch_role_name
if branch_database_name:
data["project"]["branch"]["database_name"] = branch_database_name
if project_settings:
data["project"]["settings"] = project_settings
resp = self.__request(
"POST",

View File

@@ -91,7 +91,13 @@ class NeonBranch:
is_reset defines if the branch is a reset one i.e. created as a result of the reset API Call
"""
def __init__(self, project, branch: dict[str, Any], is_reset=False):
def __init__(
self,
project,
branch: dict[str, Any],
is_reset=False,
primary_branch: NeonBranch | None = None,
):
self.id: str = branch["branch"]["id"]
self.desc = branch
self.name: str | None = None
@@ -140,12 +146,36 @@ class NeonBranch:
"PGPASSWORD": self.connection_parameters["password"],
"PGSSLMODE": "require",
}
self.replicas: dict[str, NeonBranch] = {}
self.primary_branch: NeonBranch | None = primary_branch
if primary_branch:
if not self.connection_parameters:
raise ValueError(
"connection_parameters is required when primary_branch is specified"
)
self.project.replicas[self.id] = self
primary_branch.replicas[self.id] = self
with psycopg2.connect(primary_branch.connstr()) as conn:
with conn.cursor() as cur:
cur.execute(f"CREATE PUBLICATION {self.id} FOR ALL TABLES")
conn.commit()
with psycopg2.connect(self.connstr()) as conn:
with conn.cursor() as cur:
cur.execute(
f"CREATE SUBSCRIPTION {self.id} CONNECTION '{primary_branch.connstr()}' PUBLICATION {self.id}"
)
conn.commit()
def __str__(self):
"""
Prints the branch's information with all the predecessors
"""
return f"{self.id}{f'({self.name})' if self.name and self.name != self.id else ''}{f'(restored_from: {self.restored_from})' if self.restored_from else ''}, parent: {self.parent}"
name = f"({self.name})" if self.name and self.name != self.id else ""
restored_from = f"(restored_from: {self.restored_from})" if self.restored_from else ""
ancestor = (
f" <- {self.primary_branch}" if self.primary_branch else f", parent: {self.parent}"
)
return f"{self.id}{name}{restored_from}{ancestor}"
def random_time(self) -> datetime:
min_time = max(
@@ -157,8 +187,10 @@ class NeonBranch:
log.info("min_time: %s, max_time: %s", min_time, max_time)
return (min_time + (max_time - min_time) * random.random()).replace(microsecond=0)
def create_child_branch(self, parent_timestamp: datetime | None = None) -> NeonBranch | None:
return self.project.create_branch(self.id, parent_timestamp)
def create_child_branch(
self, parent_timestamp: datetime | None = None, primary_branch: NeonBranch | None = None
) -> NeonBranch | None:
return self.project.create_branch(self.id, parent_timestamp, primary_branch=primary_branch)
def create_ro_endpoint(self) -> NeonEndpoint | None:
if not self.project.check_limit_endpoints():
@@ -249,6 +281,19 @@ class NeonBranch:
ep.start_benchmark()
return res
def create_logical_replica(self) -> NeonBranch | None:
if self.primary_branch is not None:
raise RuntimeError("The primary branch cannot be a logical replica")
if self.id in self.project.reset_branches:
raise RuntimeError("Reset branch cannot be a primary branch")
replica = self.create_child_branch(primary_branch=self)
return replica
def connstr(self):
if self.connection_parameters is None:
raise RuntimeError("Connection parameters are not defined")
return " ".join([f"{key}={value}" for key, value in self.connection_parameters.items()])
class NeonProject:
"""
@@ -260,7 +305,9 @@ class NeonProject:
self.neon_api = neon_api
self.pg_bin = pg_bin
proj = self.neon_api.create_project(
pg_version, f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}"
pg_version,
f"Automatic random API test GITHUB_RUN_ID={os.getenv('GITHUB_RUN_ID')}",
project_settings={"enable_logical_replication": True},
)
self.id: str = proj["project"]["id"]
self.name: str = proj["project"]["name"]
@@ -288,6 +335,7 @@ class NeonProject:
self.min_time: datetime = datetime.now(UTC)
self.snapshots: dict[str, NeonSnapshot] = {}
self.snapshot_num: int = 0
self.replicas: dict[str, NeonBranch] = {}
def get_limits(self) -> dict[str, Any]:
return self.neon_api.get_project_limits(self.id)
@@ -319,6 +367,7 @@ class NeonProject:
parent_id: str | None = None,
parent_timestamp: datetime | None = None,
is_reset: bool = False,
primary_branch: NeonBranch | None = None,
) -> NeonBranch | None:
self.wait()
if not self.check_limit_branches():
@@ -331,7 +380,7 @@ class NeonProject:
branch_def = self.neon_api.create_branch(
self.id, parent_id=parent_id, parent_timestamp=parent_timestamp_str
)
new_branch = NeonBranch(self, branch_def, is_reset)
new_branch = NeonBranch(self, branch_def, is_reset, primary_branch)
self.wait()
return new_branch
@@ -351,6 +400,17 @@ class NeonProject:
if branch_id not in self.reset_branches:
self.terminate_benchmark(branch_id)
self.neon_api.delete_branch(self.id, branch_id)
primary_branch = self.branches[branch_id].primary_branch
if primary_branch is not None:
with psycopg2.connect(primary_branch.connstr()) as conn:
with conn.cursor() as cur:
cur.execute(f"DROP PUBLICATION {branch_id}")
conn.commit()
parent.replicas.pop(branch_id)
self.replicas.pop(branch_id)
else:
for replica in self.branches[branch_id].replicas.values():
replica.delete()
if len(parent.children) == 1 and parent.parent is not None:
self.leaf_branches[parent.id] = parent
parent.children.pop(branch_id)
@@ -372,7 +432,11 @@ class NeonProject:
return target
def get_random_parent_branch(self) -> NeonBranch:
return self.branches[random.choice(list(set(self.branches.keys()) - self.reset_branches))]
return self.branches[
random.choice(
list(set(self.branches.keys()) - self.reset_branches - set(self.replicas.keys()))
)
]
def gen_branch_name(self) -> str:
self.branch_num += 1
@@ -658,6 +722,14 @@ def do_action(project: NeonProject, action: str) -> bool:
return False
snapshot_to_delete.delete()
log.info("Deleted snapshot %s", snapshot_to_delete)
elif action == "create_logical_replica":
primary: NeonBranch | None = project.get_random_parent_branch()
if primary is None:
return False
replica: NeonBranch | None = primary.create_logical_replica()
if replica is None:
return False
log.info("Created logical replica %s", replica)
else:
raise ValueError(f"The action {action} is unknown")
return True