Intermediate state for process-local Endpoints when accessing shared state

This commit is contained in:
Matthias van de Meent
2024-09-13 02:20:31 +01:00
parent 8ecbe975cd
commit 31192ee655

View File

@@ -1,21 +1,36 @@
from typing import Optional, List, Dict, Any
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, List, Dict, Any, Iterator
import pytest
from fixtures.common_types import TimelineId, Lsn
from fixtures.neon_fixtures import PgProtocol, Safekeeper
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgProtocol, Safekeeper
from fixtures.pg_version import PgVersion
"""
In this file most important resources are exposed as function-level fixtures
that depend on session-level resources like pageservers and safekeepers.
The main rationale here is that we don't need to initialize a new SK/PS/etc
every time we want to test something that has branching: we can just as well
reuse still-available PageServers from other tests of the same kind.
"""
@pytest.fixture(scope="session")
def shared_storage_repo_path(build_type: str, base_dir: Path) -> Path:
return base_dir / f"shared[{build_type}]" / "repo"
class TestEndpoint(PgProtocol):
def __init__(self, **kwargs: Any):
def __init__(self, compute: Endpoint, **kwargs: Any):
super().__init__(**kwargs)
def start(self):
pass
class TestTimeline:
primary: Optional[TestEndpoint]
secondaries: List[TestEndpoint]
def __init__(self):
def __init__(self, name: str, tenant: 'TestTenant'):
self.primary = None
def start_primary(self) -> Optional[TestEndpoint]:
@@ -25,11 +40,81 @@ class TestTimeline:
return self.primary
class TestPageserver:
pass
"""
We only need one pageserver for every build_type.
"""
def __init__(
self,
build_type: str,
bin_path: Path,
repo_path: Path,
port: int
):
self.build_type = build_type
self.bin_path = bin_path
self.repo_path = repo_path
self.port = port
self.started = False
def _start(self):
pass
def _stop(self):
pass
@pytest.fixture(scope="session")
def shared_pageserver() -> Iterator[TestPageserver]:
yield None
class TestSafekeeper:
pass
"""
We only need one safekeeper for every build_type.
"""
def __init__(
self,
build_type: str,
bin_path: Path,
repo_path: Path,
port: int
):
self.build_type = build_type
self.bin_path = bin_path
self.repo_path = repo_path
self.port = port
self.started = False
def _start(self):
pass
def _stop(self):
pass
@pytest.fixture(scope="session")
def shared_safekeeper(
request,
port: int,
build_type: str,
neon_binpath: Path,
shared_storage_repo_path: Path
) -> Iterator[TestSafekeeper]:
sk = TestSafekeeper(
build_type=build_type,
bin_path=(neon_binpath / "safekeeper"),
port=port,
repo_path=shared_storage_repo_path,
)
sk.start()
yield sk
sk.stop()
class TestTenant:
"""
@@ -38,29 +123,20 @@ class TestTenant:
"""
def __init__(
self,
pageserver: TestPageserver,
safekeeper: Safekeeper,
main_timeline_name: Optional[str] = "main",
self,
pageserver: TestPageserver,
safekeeper: TestSafekeeper,
):
self.pageserver = pageserver
self.safekeeper = safekeeper
self.timeline_map: Dict[str, TestTimeline] = dict()
self.timeline_map[main_timeline_name] = self.primary_timeline
def create_timeline(
self,
name: str,
parent_name: Optional[str],
parent_id: Optional[TimelineId],
branch_at: Optional[Lsn],
self,
name: str,
parent_name: Optional[str],
branch_at: Optional[Lsn],
) -> TestTimeline:
if (tlid := self.timeline_map.get(name)) is not None:
return tlid
return self._new_timeline()
def _new_timeline(self) -> TestTimeline:
pass
def stop(self):
for it in self.active_computes:
it.stop()