Files
neon/test_runner/fixtures/broker.py
Dmitry Rodionov 1497a42296 tests: split neon_fixtures.py (#4871)
## Problem

neon_fixtures.py has grown to unmanageable size. It attracts conflicts.

When adding specific utils under for example `fixtures/pageserver`
things sometimes need to import stuff from `neon_fixtures.py` which
creates circular import. This is usually only needed for type
annotations, so `typing.TYPE_CHECKING` flag can mask the issue.
Nevertheless I believe that splitting neon_fixtures.py into smaller
parts is a better approach.

Currently the PR contains small things, but I plan to continue and move
NeonEnv to its own `fixtures.env` module. To keep the diff small I think
this PR can already be merged to cause less conflicts.

UPD: it looks like currently its not really possible to fully avoid
usage of `typing.TYPE_CHECKING`, because some components directly depend
on each other. I e Env -> Cli -> Env cycle. But its still worth it to
avoid it in as many places as possible. And decreasing neon_fixture's
size still makes sense.
2023-08-03 17:20:24 +03:00

61 lines
1.7 KiB
Python

import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
from fixtures.log_helper import log
@dataclass
class NeonBroker:
"""An object managing storage_broker instance"""
logfile: Path
port: int
neon_binpath: Path
handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon
def listen_addr(self):
return f"127.0.0.1:{self.port}"
def client_url(self):
return f"http://{self.listen_addr()}"
def check_status(self):
return True # TODO
def try_start(self):
if self.handle is not None:
log.debug(f"storage_broker is already running on port {self.port}")
return
listen_addr = self.listen_addr()
log.info(f'starting storage_broker to listen incoming connections at "{listen_addr}"')
with open(self.logfile, "wb") as logfile:
args = [
str(self.neon_binpath / "storage_broker"),
f"--listen-addr={listen_addr}",
]
self.handle = subprocess.Popen(args, stdout=logfile, stderr=logfile)
# wait for start
started_at = time.time()
while True:
try:
self.check_status()
except Exception as e:
elapsed = time.time() - started_at
if elapsed > 5:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for storage_broker start: {e}"
) from e
time.sleep(0.5)
else:
break # success
def stop(self):
if self.handle is not None:
self.handle.terminate()
self.handle.wait()