extract work queue and use it to drive broken attach in parallel

This commit is contained in:
Christian Schwarz
2024-01-05 19:10:41 +00:00
parent 392e014a7f
commit fbcb1268bf
3 changed files with 60 additions and 28 deletions

View File

@@ -1,26 +1,21 @@
from dataclasses import dataclass
import json
import os
import shutil
import subprocess
from pathlib import Path
import time
from typing import Any, Callable, Dict, List, Tuple
import pytest
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgBin,
SnapshotDir,
last_flush_lsn_upload,
)
from fixtures.pageserver.utils import wait_until_tenant_active, wait_until_tenant_state
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.types import TenantId, TimelineId
import fixtures.pageserver.remote_storage
from fixtures import work_queue
@dataclass
class SingleTimeline:
@@ -28,12 +23,13 @@ class SingleTimeline:
timeline_id: TimelineId
tenants: List[TenantId]
def single_timeline(
neon_env_builder: NeonEnvBuilder,
snapshot_dir: SnapshotDir,
setup_template: Callable[[NeonEnv], Tuple[TenantId, TimelineId, Dict[str, Any]]],
ncopies: int,
) -> SingleTimeline:
) -> SingleTimeline:
"""
Create (or rehydrate from `snapshot_dir`) an env with `ncopies` copies
of a template tenant with a single timeline.
@@ -79,12 +75,17 @@ def single_timeline(
env.pageserver.allowed_errors.append(
".*attach failed, setting tenant state to Broken: attach-before-activate.*"
)
for tenant in tenants:
def attach_broken(tenant):
env.pageserver.tenant_attach(
tenant,
config=template_config.copy(),
)
time.sleep(0.1)
wait_until_tenant_state(ps_http, tenant, "Broken", 3)
work_queue.do(8, tenants, attach_broken)
env.pageserver.stop() # clears the failpoint as a side-effect
tenant_timelines = list(map(lambda tenant: (tenant, template_timeline), tenants))
fixtures.pageserver.remote_storage.copy_all_remote_layer_files_to_local_tenant_dir(

View File

@@ -14,6 +14,7 @@ from fixtures.pageserver.types import (
InvalidFileName,
parse_layer_file_name,
)
from fixtures import work_queue
def duplicate_one_tenant(env: NeonEnv, template_tenant: TenantId, new_tenant: TenantId):
@@ -56,27 +57,11 @@ def duplicate_one_tenant(env: NeonEnv, template_tenant: TenantId, new_tenant: Te
def duplicate_tenant(env: NeonEnv, template_tenant: TenantId, ncopies: int) -> List[TenantId]:
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
# duplicate the tenant in remote storage
def worker(queue: queue.Queue[Optional[TenantId]]):
while True:
tenant_id = queue.get()
if tenant_id is None:
return
duplicate_one_tenant(env, template_tenant, tenant_id)
def work(tenant_id):
duplicate_one_tenant(env, template_tenant, tenant_id)
new_tenants: List[TenantId] = [TenantId.generate() for _ in range(0, ncopies)]
duplications: queue.Queue[Optional[TenantId]] = queue.Queue()
for t in new_tenants:
duplications.put(t)
workers = []
for _ in range(0, 8): # TODO: use nproc instead of hard-coded count
w = threading.Thread(target=worker, args=[duplications])
workers.append(w)
w.start()
duplications.put(None)
for w in workers:
w.join()
work_queue.do(8, new_tenants, work)
return new_tenants

View File

@@ -0,0 +1,46 @@
import queue
import threading
from typing import Callable, Generic, List, Optional, TypeVar
T = TypeVar("T")
U = TypeVar("U")
V = TypeVar("V")
def do(nthreads: int, inputs: List[T], work_fn: Callable[[T], U]) -> List[U]:
class Item(Generic[V]):
_item: V
def __init__(self, item: V):
self._item = item
# duplicate the tenant in remote storage
def worker(input_queue: queue.Queue[Item[T]], output_queue: queue.Queue[U]):
while True:
item = input_queue.get()
if item is None:
return
output = work_fn(item._item)
output_queue.put(Item(output))
input_queue: queue.Queue[Optional["Item[T]"]] = queue.Queue()
output_queue: queue.Queue[Optional["Item[U]"]] = queue.Queue()
for t in inputs:
input_queue.put(Item(t))
workers = []
for _ in range(0, nthreads):
w = threading.Thread(target=worker, args=[input_queue, output_queue])
workers.append(w)
w.start()
input_queue.put(None)
for w in workers:
w.join()
outputs = []
while True:
try:
output = output_queue.get(block=False)
outputs.append(output)
except queue.Empty:
break
return outputs