mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 02:20:42 +00:00
reuable abstraction for many tenants fixture
This commit is contained in:
@@ -1,12 +1,8 @@
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
import time
|
||||
from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
@@ -16,123 +12,60 @@ from fixtures.neon_fixtures import (
|
||||
SnapshotDir,
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_until_tenant_active, wait_until_tenant_state
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
import fixtures.pageserver.remote_storage
|
||||
import fixtures.pageserver.many_tenants
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@pytest.mark.timeout(1000)
|
||||
def snapshotting_env(
|
||||
def getpage_throughput_fixture(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
test_snapshot_dir: SnapshotDir,
|
||||
) -> Tuple[NeonEnv, TimelineId, List[TenantId]]:
|
||||
"""
|
||||
The fixture prepares environment or restores it from a snapshot.
|
||||
|
||||
The logic is the following:
|
||||
- if the snapshot directory exists, the snapshot is restored from it
|
||||
- if there is no snapshot, the environment is initialized from scratch and stored in a snapshot
|
||||
- if the fixture is executed on CI (it has CI=true in the environment), the snapshot is not saved
|
||||
"""
|
||||
|
||||
save_snapshot = os.getenv("CI", "false") != "true"
|
||||
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
# create our template tenant
|
||||
tenant_config_mgmt_api = {
|
||||
"gc_period": "0s",
|
||||
"checkpoint_timeout": "10 years",
|
||||
"compaction_period": "20 s",
|
||||
"compaction_threshold": 10,
|
||||
"compaction_target_size": 134217728,
|
||||
"checkpoint_distance": 268435456,
|
||||
"image_creation_threshold": 3,
|
||||
}
|
||||
|
||||
if test_snapshot_dir.is_initialized():
|
||||
save_snapshot = False
|
||||
env = neon_env_builder.from_repo_dir(test_snapshot_dir.path)
|
||||
ps_http = env.pageserver.http_client()
|
||||
tenants = list(
|
||||
{TenantId(t.name) for t in (test_snapshot_dir.path.glob("pageserver_*/tenants/*"))}
|
||||
)
|
||||
template_timeline = env.initial_timeline
|
||||
|
||||
neon_env_builder.start()
|
||||
else:
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
remote_storage = env.pageserver_remote_storage
|
||||
assert isinstance(remote_storage, LocalFsStorage)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
# clean up the useless default tenant
|
||||
ps_http.tenant_delete(env.initial_tenant)
|
||||
|
||||
tenant_config_cli = {k: str(v) for k, v in tenant_config_mgmt_api.items()}
|
||||
|
||||
template_tenant, template_timeline = env.neon_cli.create_tenant(
|
||||
conf=tenant_config_cli, set_default=True
|
||||
)
|
||||
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s5", ep.connstr()])
|
||||
) -> fixtures.pageserver.many_tenants.SingleTimeline:
|
||||
def setup_template(env: NeonEnv):
|
||||
# create our template tenant
|
||||
config = {
|
||||
"gc_period": "0s",
|
||||
"checkpoint_timeout": "10 years",
|
||||
"compaction_period": "20 s",
|
||||
"compaction_threshold": 10,
|
||||
"compaction_target_size": 134217728,
|
||||
"checkpoint_distance": 268435456,
|
||||
"image_creation_threshold": 3,
|
||||
}
|
||||
template_tenant, template_timeline = env.neon_cli.create_tenant(set_default=True)
|
||||
env.pageserver.tenant_detach(template_tenant)
|
||||
env.pageserver.tenant_attach(template_tenant, config)
|
||||
# with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
|
||||
# pg_bin.run_capture(["pgbench", "-i", "-s5", ep.connstr()])
|
||||
# last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
|
||||
ep = env.endpoints.create_start("main", tenant_id=template_tenant)
|
||||
ep.safe_psql("create table foo(b text)")
|
||||
for i in range(0, 8):
|
||||
ep.safe_psql("insert into foo(b) values ('some text')")
|
||||
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
|
||||
ps_http.tenant_detach(template_tenant)
|
||||
ep.stop_and_destroy()
|
||||
return (template_tenant, template_timeline, config)
|
||||
|
||||
# duplicate the template 20 times tenants in localfs storage
|
||||
tenants = fixtures.pageserver.remote_storage.duplicate_tenant(env, template_tenant, 20)
|
||||
|
||||
# In theory we could just attach all the tenants, force on-demand downloads via mgmt API, and be done.
|
||||
# However, on-demand downloads are quite slow ATM.
|
||||
# => do the on-demand downloads in Python.
|
||||
assert ps_http.tenant_list() == []
|
||||
# make the attach fail after it created enough on-disk state to retry loading
|
||||
# the tenant next startup, but before it can start background loops that would start download
|
||||
ps_http.configure_failpoints(("attach-before-activate", "return"))
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*attach failed, setting tenant state to Broken: attach-before-activate.*"
|
||||
)
|
||||
for tenant in tenants:
|
||||
env.pageserver.tenant_attach(
|
||||
tenant,
|
||||
config=tenant_config_mgmt_api.copy(),
|
||||
)
|
||||
wait_until_tenant_state(ps_http, tenant, "Broken", 3)
|
||||
env.pageserver.stop() # clears the failpoint as a side-effect
|
||||
tenant_timelines = list(map(lambda tenant: (tenant, template_timeline), tenants))
|
||||
fixtures.pageserver.remote_storage.copy_all_remote_layer_files_to_local_tenant_dir(
|
||||
env, tenant_timelines
|
||||
)
|
||||
env.pageserver.start()
|
||||
|
||||
for tenant in tenants:
|
||||
wait_until_tenant_active(ps_http, tenant)
|
||||
|
||||
# ensure all layers are resident for predictiable performance
|
||||
for tenant in tenants:
|
||||
for timeline in ps_http.tenant_status(tenant)["timelines"]:
|
||||
info = ps_http.layer_map_info(tenant, timeline)
|
||||
for layer in info.historic_layers:
|
||||
assert not layer.remote
|
||||
|
||||
# take snapshot after download all layers so tenant dir restoration is fast
|
||||
if save_snapshot:
|
||||
shutil.copytree(env.repo_dir, test_snapshot_dir.path)
|
||||
test_snapshot_dir.set_initialized()
|
||||
|
||||
return env, template_timeline, tenants
|
||||
return fixtures.pageserver.many_tenants.single_timeline(
|
||||
neon_env_builder,
|
||||
test_snapshot_dir,
|
||||
setup_template,
|
||||
20_000,
|
||||
)
|
||||
|
||||
|
||||
def test_getpage_throughput(
|
||||
snapshotting_env: Tuple[NeonEnv, TimelineId, List[TenantId]],
|
||||
getpage_throughput_fixture: fixtures.pageserver.many_tenants.SingleTimeline,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
env, template_timeline, tenants = snapshotting_env
|
||||
env, template_timeline, tenants = (
|
||||
getpage_throughput_fixture.env,
|
||||
getpage_throughput_fixture.timeline_id,
|
||||
getpage_throughput_fixture.tenants,
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# run the benchmark with one client per timeline, each doing 10k requests to random keys.
|
||||
|
||||
Reference in New Issue
Block a user