From fbcb1268bf4ef80cc3430ec2b56228fd051fb47b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 5 Jan 2024 19:10:41 +0000 Subject: [PATCH] extract work queue and use it to drive broken attach in parallel --- .../fixtures/pageserver/many_tenants.py | 19 ++++---- .../fixtures/pageserver/remote_storage.py | 23 ++-------- test_runner/fixtures/work_queue.py | 46 +++++++++++++++++++ 3 files changed, 60 insertions(+), 28 deletions(-) create mode 100644 test_runner/fixtures/work_queue.py diff --git a/test_runner/fixtures/pageserver/many_tenants.py b/test_runner/fixtures/pageserver/many_tenants.py index f46c83a257..d83299a3c8 100644 --- a/test_runner/fixtures/pageserver/many_tenants.py +++ b/test_runner/fixtures/pageserver/many_tenants.py @@ -1,26 +1,21 @@ from dataclasses import dataclass -import json import os import shutil -import subprocess -from pathlib import Path import time from typing import Any, Callable, Dict, List, Tuple -import pytest -from fixtures.benchmark_fixture import NeonBenchmarker from fixtures.log_helper import log from fixtures.neon_fixtures import ( NeonEnv, NeonEnvBuilder, - PgBin, SnapshotDir, - last_flush_lsn_upload, ) from fixtures.pageserver.utils import wait_until_tenant_active, wait_until_tenant_state from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind from fixtures.types import TenantId, TimelineId import fixtures.pageserver.remote_storage +from fixtures import work_queue + @dataclass class SingleTimeline: @@ -28,12 +23,13 @@ class SingleTimeline: timeline_id: TimelineId tenants: List[TenantId] + def single_timeline( neon_env_builder: NeonEnvBuilder, snapshot_dir: SnapshotDir, setup_template: Callable[[NeonEnv], Tuple[TenantId, TimelineId, Dict[str, Any]]], ncopies: int, -) -> SingleTimeline: +) -> SingleTimeline: """ Create (or rehydrate from `snapshot_dir`) an env with `ncopies` copies of a template tenant with a single timeline. @@ -79,12 +75,17 @@ def single_timeline( env.pageserver.allowed_errors.append( ".*attach failed, setting tenant state to Broken: attach-before-activate.*" ) - for tenant in tenants: + + def attach_broken(tenant): env.pageserver.tenant_attach( tenant, config=template_config.copy(), ) + time.sleep(0.1) wait_until_tenant_state(ps_http, tenant, "Broken", 3) + + work_queue.do(8, tenants, attach_broken) + env.pageserver.stop() # clears the failpoint as a side-effect tenant_timelines = list(map(lambda tenant: (tenant, template_timeline), tenants)) fixtures.pageserver.remote_storage.copy_all_remote_layer_files_to_local_tenant_dir( diff --git a/test_runner/fixtures/pageserver/remote_storage.py b/test_runner/fixtures/pageserver/remote_storage.py index 034eb163d6..0fbc80ce83 100644 --- a/test_runner/fixtures/pageserver/remote_storage.py +++ b/test_runner/fixtures/pageserver/remote_storage.py @@ -14,6 +14,7 @@ from fixtures.pageserver.types import ( InvalidFileName, parse_layer_file_name, ) +from fixtures import work_queue def duplicate_one_tenant(env: NeonEnv, template_tenant: TenantId, new_tenant: TenantId): @@ -56,27 +57,11 @@ def duplicate_one_tenant(env: NeonEnv, template_tenant: TenantId, new_tenant: Te def duplicate_tenant(env: NeonEnv, template_tenant: TenantId, ncopies: int) -> List[TenantId]: assert isinstance(env.pageserver_remote_storage, LocalFsStorage) - # duplicate the tenant in remote storage - def worker(queue: queue.Queue[Optional[TenantId]]): - while True: - tenant_id = queue.get() - if tenant_id is None: - return - duplicate_one_tenant(env, template_tenant, tenant_id) + def work(tenant_id): + duplicate_one_tenant(env, template_tenant, tenant_id) new_tenants: List[TenantId] = [TenantId.generate() for _ in range(0, ncopies)] - duplications: queue.Queue[Optional[TenantId]] = queue.Queue() - for t in new_tenants: - duplications.put(t) - workers = [] - for _ in range(0, 8): # TODO: use nproc instead of hard-coded count - w = threading.Thread(target=worker, args=[duplications]) - workers.append(w) - w.start() - duplications.put(None) - for w in workers: - w.join() - + work_queue.do(8, new_tenants, work) return new_tenants diff --git a/test_runner/fixtures/work_queue.py b/test_runner/fixtures/work_queue.py new file mode 100644 index 0000000000..68c190d25d --- /dev/null +++ b/test_runner/fixtures/work_queue.py @@ -0,0 +1,46 @@ +import queue +import threading +from typing import Callable, Generic, List, Optional, TypeVar + +T = TypeVar("T") +U = TypeVar("U") +V = TypeVar("V") + + +def do(nthreads: int, inputs: List[T], work_fn: Callable[[T], U]) -> List[U]: + class Item(Generic[V]): + _item: V + + def __init__(self, item: V): + self._item = item + + # duplicate the tenant in remote storage + def worker(input_queue: queue.Queue[Item[T]], output_queue: queue.Queue[U]): + while True: + item = input_queue.get() + if item is None: + return + output = work_fn(item._item) + output_queue.put(Item(output)) + + input_queue: queue.Queue[Optional["Item[T]"]] = queue.Queue() + output_queue: queue.Queue[Optional["Item[U]"]] = queue.Queue() + for t in inputs: + input_queue.put(Item(t)) + workers = [] + for _ in range(0, nthreads): + w = threading.Thread(target=worker, args=[input_queue, output_queue]) + workers.append(w) + w.start() + input_queue.put(None) + for w in workers: + w.join() + + outputs = [] + while True: + try: + output = output_queue.get(block=False) + outputs.append(output) + except queue.Empty: + break + return outputs