mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 01:50:38 +00:00
do the on-demand downloads in Python, it's faster; plus some cleanups and renamings
This commit is contained in:
@@ -1003,7 +1003,10 @@ impl Tenant {
|
||||
// IndexPart is the source of truth.
|
||||
self.clean_up_timelines(&existent_timelines)?;
|
||||
|
||||
failpoint_support::sleep_millis_async!("attach-before-activate");
|
||||
fail::fail_point!("attach-before-activate", |_| {
|
||||
anyhow::bail!("attach-before-activate");
|
||||
});
|
||||
failpoint_support::sleep_millis_async!("attach-before-activate-sleep");
|
||||
|
||||
info!("Done");
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ echo run the following commands
|
||||
cat <<EOF
|
||||
# test suite run
|
||||
export TEST_OUTPUT="/mnt/test_output"
|
||||
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/test_pageserver.py
|
||||
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/test_pageserver_pagebench.py
|
||||
|
||||
# for interactive use
|
||||
export NEON_REPO_DIR="$(readlink -f ./bench_repo_dir)/repo"
|
||||
|
||||
133
test_runner/fixtures/pageserver/remote_storage.py
Normal file
133
test_runner/fixtures/pageserver/remote_storage.py
Normal file
@@ -0,0 +1,133 @@
|
||||
from pathlib import Path
|
||||
import queue
|
||||
import shutil
|
||||
import subprocess
|
||||
import threading
|
||||
from typing import Any, List, Optional, Tuple
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
)
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.remote_storage import LocalFsStorage
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.types import (
|
||||
InvalidFileName,
|
||||
parse_layer_file_name,
|
||||
)
|
||||
|
||||
|
||||
def duplicate_one_tenant(env: NeonEnv, template_tenant: TenantId, new_tenant: TenantId):
|
||||
remote_storage = env.pageserver_remote_storage
|
||||
assert isinstance(remote_storage, LocalFsStorage)
|
||||
|
||||
src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines"
|
||||
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
|
||||
|
||||
assert isinstance(remote_storage, LocalFsStorage)
|
||||
dst_timelines_dir: Path = remote_storage.tenant_path(new_tenant) / "timelines"
|
||||
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
|
||||
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
|
||||
|
||||
for tl in src_timelines_dir.iterdir():
|
||||
src_tl_dir = src_timelines_dir / tl.name
|
||||
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
|
||||
dst_tl_dir = dst_timelines_dir / tl.name
|
||||
dst_tl_dir.mkdir(parents=False, exist_ok=False)
|
||||
for file in tl.iterdir():
|
||||
shutil.copy2(file, dst_tl_dir)
|
||||
if "__" in file.name:
|
||||
cmd: List[str] = [
|
||||
str(
|
||||
env.neon_binpath / "pagectl"
|
||||
), # TODO: abstract this like the other binaries
|
||||
"layer",
|
||||
"rewrite-summary",
|
||||
str(dst_tl_dir / file.name),
|
||||
"--new-tenant-id",
|
||||
str(new_tenant),
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
else:
|
||||
# index_part etc need no patching
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def duplicate_tenant(env: NeonEnv, template_tenant: TenantId, ncopies: int) -> List[TenantId]:
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
|
||||
# duplicate the tenant in remote storage
|
||||
def worker(queue: queue.Queue[Optional[TenantId]]):
|
||||
while True:
|
||||
tenant_id = queue.get()
|
||||
if tenant_id is None:
|
||||
return
|
||||
duplicate_one_tenant(env, template_tenant, tenant_id)
|
||||
|
||||
new_tenants: List[TenantId] = [TenantId.generate() for _ in range(0, ncopies)]
|
||||
duplications: queue.Queue[Optional[TenantId]] = queue.Queue()
|
||||
for t in new_tenants:
|
||||
duplications.put(t)
|
||||
workers = []
|
||||
for _ in range(0, 8): # TODO: use nproc instead of hard-coded count
|
||||
w = threading.Thread(target=worker, args=[duplications])
|
||||
workers.append(w)
|
||||
w.start()
|
||||
duplications.put(None)
|
||||
for w in workers:
|
||||
w.join()
|
||||
|
||||
return new_tenants
|
||||
|
||||
|
||||
def local_layer_name_from_remote_name(remote_name: str) -> str:
|
||||
try:
|
||||
return parse_layer_file_name(remote_name).to_str()
|
||||
except InvalidFileName:
|
||||
comps = remote_name.rsplit("-", 1)
|
||||
if len(comps) == 1:
|
||||
raise InvalidFileName("no generation suffix found")
|
||||
else:
|
||||
assert len(comps) == 2
|
||||
layer_file_name, _generation = comps
|
||||
try:
|
||||
return parse_layer_file_name(layer_file_name).to_str()
|
||||
except InvalidFileName:
|
||||
raise
|
||||
|
||||
|
||||
def copy_all_remote_layer_files_to_local_tenant_dir(
|
||||
env: NeonEnv, tenant_timelines: List[Tuple[TenantId, TimelineId]]
|
||||
):
|
||||
remote_storage = env.pageserver_remote_storage
|
||||
assert isinstance(remote_storage, LocalFsStorage)
|
||||
work: queue.Queue[Any] = queue.Queue()
|
||||
for tenant, timeline in tenant_timelines:
|
||||
remote_timeline_path = remote_storage.timeline_path(tenant, timeline)
|
||||
local_timeline_path = env.pageserver.timeline_dir(tenant, timeline)
|
||||
local_timeline_path.mkdir(parents=True, exist_ok=True)
|
||||
downloads = {}
|
||||
for remote_layer in remote_timeline_path.glob("*__*"):
|
||||
local_name = local_layer_name_from_remote_name(remote_layer.name)
|
||||
assert not local_name in downloads, "remote storage must have had split brain"
|
||||
downloads[local_name] = remote_layer
|
||||
for local_name, remote_path in downloads.items():
|
||||
work.put((remote_path, local_timeline_path / local_name))
|
||||
|
||||
def copy_layer_worker(queue):
|
||||
while True:
|
||||
item = queue.get()
|
||||
if item is None:
|
||||
return
|
||||
remote_path, local_path = item
|
||||
# not copy2, so it looks like a recent download, in case that's relevant to e.g. eviction
|
||||
shutil.copy(remote_path, local_path, follow_symlinks=False)
|
||||
|
||||
workers = []
|
||||
for _ in range(0, 8): # TODO: use nproc instead of hard-coded count
|
||||
w = threading.Thread(target=copy_layer_worker, args=[work])
|
||||
workers.append(w)
|
||||
w.start()
|
||||
work.put(None)
|
||||
for w in workers:
|
||||
w.join()
|
||||
@@ -107,7 +107,7 @@ def parse_layer_file_name(file_name: str) -> LayerFileName:
|
||||
except InvalidFileName:
|
||||
pass
|
||||
|
||||
raise ValueError()
|
||||
raise InvalidFileName("neither image nor delta layer")
|
||||
|
||||
|
||||
def is_future_layer(layer_file_name: LayerFileName, disk_consistent_lsn: Lsn):
|
||||
|
||||
@@ -3,15 +3,23 @@ import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
import time
|
||||
from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, SnapshotDir, last_flush_lsn_upload
|
||||
from fixtures.pageserver.utils import wait_until_tenant_active
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
SnapshotDir,
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_until_tenant_active, wait_until_tenant_state
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
import fixtures.pageserver.remote_storage
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@@ -45,7 +53,6 @@ def snapshotting_env(
|
||||
"image_creation_threshold": 3,
|
||||
}
|
||||
|
||||
|
||||
if test_snapshot_dir.is_initialized():
|
||||
save_snapshot = False
|
||||
env = neon_env_builder.from_repo_dir(test_snapshot_dir.path)
|
||||
@@ -71,66 +78,46 @@ def snapshotting_env(
|
||||
template_tenant, template_timeline = env.neon_cli.create_tenant(
|
||||
conf=tenant_config_cli, set_default=True
|
||||
)
|
||||
template_tenant_gen = int(ps_http.tenant_status(template_tenant)["generation"])
|
||||
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s50", ep.connstr()])
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s5", ep.connstr()])
|
||||
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
|
||||
ps_http.tenant_detach(template_tenant)
|
||||
|
||||
# stop PS just for good measure
|
||||
env.pageserver.stop()
|
||||
# duplicate the template 20 times tenants in localfs storage
|
||||
tenants = fixtures.pageserver.remote_storage.duplicate_tenant(env, template_tenant, 20)
|
||||
|
||||
# duplicate the tenant in remote storage
|
||||
src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines"
|
||||
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
|
||||
tenants = [template_tenant]
|
||||
for i in range(0, 100):
|
||||
new_tenant = TenantId.generate()
|
||||
tenants.append(new_tenant)
|
||||
log.info("Duplicating tenant #%s: %s", i, new_tenant)
|
||||
|
||||
dst_timelines_dir: Path = remote_storage.tenant_path(new_tenant) / "timelines"
|
||||
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
|
||||
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
|
||||
|
||||
for tl in src_timelines_dir.iterdir():
|
||||
src_tl_dir = src_timelines_dir / tl.name
|
||||
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
|
||||
dst_tl_dir = dst_timelines_dir / tl.name
|
||||
dst_tl_dir.mkdir(parents=False, exist_ok=False)
|
||||
for file in tl.iterdir():
|
||||
shutil.copy2(file, dst_tl_dir)
|
||||
if "__" in file.name:
|
||||
cmd: List[str] = [
|
||||
str(
|
||||
env.neon_binpath / "pagectl"
|
||||
), # TODO: abstract this like the other binaries
|
||||
"layer",
|
||||
"rewrite-summary",
|
||||
str(dst_tl_dir / file.name),
|
||||
"--new-tenant-id",
|
||||
str(new_tenant),
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
else:
|
||||
# index_part etc need no patching
|
||||
pass
|
||||
|
||||
env.pageserver.start()
|
||||
# In theory we could just attach all the tenants, force on-demand downloads via mgmt API, and be done.
|
||||
# However, on-demand downloads are quite slow ATM.
|
||||
# => do the on-demand downloads in Python.
|
||||
assert ps_http.tenant_list() == []
|
||||
# make the attach fail after it created enough on-disk state to retry loading
|
||||
# the tenant next startup, but before it can start background loops that would start download
|
||||
ps_http.configure_failpoints(("attach-before-activate", "return"))
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*attach failed, setting tenant state to Broken: attach-before-activate.*"
|
||||
)
|
||||
for tenant in tenants:
|
||||
ps_http.tenant_attach(
|
||||
tenant, config=tenant_config_mgmt_api, generation=template_tenant_gen + 1
|
||||
env.pageserver.tenant_attach(
|
||||
tenant,
|
||||
config=tenant_config_mgmt_api.copy(),
|
||||
)
|
||||
env.attachment_service.attach_hook_issue(tenant, env.pageserver.id)
|
||||
wait_until_tenant_state(ps_http, tenant, "Broken", 3)
|
||||
env.pageserver.stop() # clears the failpoint as a side-effect
|
||||
tenant_timelines = list(map(lambda tenant: (tenant, template_timeline), tenants))
|
||||
fixtures.pageserver.remote_storage.copy_all_remote_layer_files_to_local_tenant_dir(
|
||||
env, tenant_timelines
|
||||
)
|
||||
env.pageserver.start()
|
||||
|
||||
for tenant in tenants:
|
||||
wait_until_tenant_active(ps_http, tenant)
|
||||
|
||||
# ensure all layers are resident for predictiable performance
|
||||
# TODO: ensure all kinds of eviction are disabled (per-tenant, disk-usage-based)
|
||||
for tenant in tenants:
|
||||
ps_http.download_all_layers(tenant, template_timeline)
|
||||
for timeline in ps_http.tenant_status(tenant)["timelines"]:
|
||||
info = ps_http.layer_map_info(tenant, timeline)
|
||||
for layer in info.historic_layers:
|
||||
assert not layer.remote
|
||||
|
||||
# take snapshot after download all layers so tenant dir restoration is fast
|
||||
if save_snapshot:
|
||||
@@ -484,7 +484,7 @@ def test_detach_while_attaching(
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
|
||||
# And re-attach
|
||||
pageserver_http.configure_failpoints([("attach-before-activate", "return(5000)")])
|
||||
pageserver_http.configure_failpoints([("attach-before-activate-sleep", "return(5000)")])
|
||||
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
|
||||
@@ -691,7 +691,7 @@ def test_ignore_while_attaching(
|
||||
# Detach it
|
||||
pageserver_http.tenant_detach(tenant_id)
|
||||
# And re-attach, but stop attach task_mgr task from completing
|
||||
pageserver_http.configure_failpoints([("attach-before-activate", "return(5000)")])
|
||||
pageserver_http.configure_failpoints([("attach-before-activate-sleep", "return(5000)")])
|
||||
env.pageserver.tenant_attach(tenant_id)
|
||||
# Run ignore on the task, thereby cancelling the attach.
|
||||
# XXX This should take priority over attach, i.e., it should cancel the attach task.
|
||||
@@ -716,7 +716,7 @@ def test_ignore_while_attaching(
|
||||
), "Only ignored tenant should be missing"
|
||||
|
||||
# Calling load will bring the tenant back online
|
||||
pageserver_http.configure_failpoints([("attach-before-activate", "off")])
|
||||
pageserver_http.configure_failpoints([("attach-before-activate-sleep", "off")])
|
||||
env.pageserver.tenant_load(tenant_id)
|
||||
|
||||
wait_until_tenant_state(pageserver_http, tenant_id, "Active", 5)
|
||||
|
||||
Reference in New Issue
Block a user