mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
## Problem neon_fixtures.py has grown to unmanageable size. It attracts conflicts. When adding specific utils under for example `fixtures/pageserver` things sometimes need to import stuff from `neon_fixtures.py` which creates circular import. This is usually only needed for type annotations, so `typing.TYPE_CHECKING` flag can mask the issue. Nevertheless I believe that splitting neon_fixtures.py into smaller parts is a better approach. Currently the PR contains small things, but I plan to continue and move NeonEnv to its own `fixtures.env` module. To keep the diff small I think this PR can already be merged to cause less conflicts. UPD: it looks like currently its not really possible to fully avoid usage of `typing.TYPE_CHECKING`, because some components directly depend on each other. I e Env -> Cli -> Env cycle. But its still worth it to avoid it in as many places as possible. And decreasing neon_fixture's size still makes sense.
61 lines
1.7 KiB
Python
61 lines
1.7 KiB
Python
import subprocess
|
|
import time
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
from fixtures.log_helper import log
|
|
|
|
|
|
@dataclass
|
|
class NeonBroker:
|
|
"""An object managing storage_broker instance"""
|
|
|
|
logfile: Path
|
|
port: int
|
|
neon_binpath: Path
|
|
handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon
|
|
|
|
def listen_addr(self):
|
|
return f"127.0.0.1:{self.port}"
|
|
|
|
def client_url(self):
|
|
return f"http://{self.listen_addr()}"
|
|
|
|
def check_status(self):
|
|
return True # TODO
|
|
|
|
def try_start(self):
|
|
if self.handle is not None:
|
|
log.debug(f"storage_broker is already running on port {self.port}")
|
|
return
|
|
|
|
listen_addr = self.listen_addr()
|
|
log.info(f'starting storage_broker to listen incoming connections at "{listen_addr}"')
|
|
with open(self.logfile, "wb") as logfile:
|
|
args = [
|
|
str(self.neon_binpath / "storage_broker"),
|
|
f"--listen-addr={listen_addr}",
|
|
]
|
|
self.handle = subprocess.Popen(args, stdout=logfile, stderr=logfile)
|
|
|
|
# wait for start
|
|
started_at = time.time()
|
|
while True:
|
|
try:
|
|
self.check_status()
|
|
except Exception as e:
|
|
elapsed = time.time() - started_at
|
|
if elapsed > 5:
|
|
raise RuntimeError(
|
|
f"timed out waiting {elapsed:.0f}s for storage_broker start: {e}"
|
|
) from e
|
|
time.sleep(0.5)
|
|
else:
|
|
break # success
|
|
|
|
def stop(self):
|
|
if self.handle is not None:
|
|
self.handle.terminate()
|
|
self.handle.wait()
|