Files
neon/test_runner/fixtures/broker.py
Christian Schwarz 42613d4c30 refactor(NeonEnv): shutdown of child processes (#6327)
Also shuts down `Broker`, which, before this PR, we did start in
`start()` but relied on the fixture to stop. Do it a bit earlier so
that, after `NeonEnv.stop()` returns, there are no child processes using
`repo_dir`.


Also, drive-by-fixes inverted logic around `ps_assert_metric_no_errors`,
missed during https://github.com/neondatabase/neon/pull/6295

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2024-01-12 10:23:21 +01:00

64 lines
1.8 KiB
Python

import subprocess
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
from fixtures.log_helper import log
@dataclass
class NeonBroker:
"""An object managing storage_broker instance"""
logfile: Path
port: int
neon_binpath: Path
handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon
def listen_addr(self):
return f"127.0.0.1:{self.port}"
def client_url(self):
return f"http://{self.listen_addr()}"
def check_status(self):
return True # TODO
def try_start(self):
if self.handle is not None:
log.debug(f"storage_broker is already running on port {self.port}")
return
listen_addr = self.listen_addr()
log.info(f'starting storage_broker to listen incoming connections at "{listen_addr}"')
with open(self.logfile, "wb") as logfile:
args = [
str(self.neon_binpath / "storage_broker"),
f"--listen-addr={listen_addr}",
]
self.handle = subprocess.Popen(args, stdout=logfile, stderr=logfile)
# wait for start
started_at = time.time()
while True:
try:
self.check_status()
except Exception as e:
elapsed = time.time() - started_at
if elapsed > 5:
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for storage_broker start: {e}"
) from e
time.sleep(0.5)
else:
break # success
def stop(self, immediate: bool = False):
if self.handle is not None:
if immediate:
self.handle.kill()
else:
self.handle.terminate()
self.handle.wait()