tests: add basic coverage for sharding (#6380)

## Problem

The support for sharding in the pageserver was written before
https://github.com/neondatabase/neon/pull/6205 landed, so when it landed
we couldn't directly test sharding.

## Summary of changes

- Add `test_sharding_smoke` which tests the basics of creating a
sharding tenant, creating a timeline within it, checking that data
within it is distributed.
- Add modes to pg_regress tests for running with 4 shards as well as
with 1.
This commit is contained in:
John Spray
2024-01-26 14:40:47 +00:00
committed by GitHub
parent 5b34d5f561
commit 55b7cde665
5 changed files with 170 additions and 23 deletions

View File

@@ -1033,7 +1033,23 @@ impl WalIngest {
// Copy content
debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks);
for blknum in 0..nblocks {
debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel);
// Sharding:
// - src and dst are always on the same shard, because they differ only by dbNode, and
// dbNode is not included in the hash inputs for sharding.
// - This WAL command is replayed on all shards, but each shard only copies the blocks
// that belong to it.
let src_key = rel_block_to_key(src_rel, blknum);
if !self.shard.is_key_local(&src_key) {
debug!(
"Skipping non-local key {} during XLOG_DBASE_CREATE",
src_key
);
continue;
}
debug!(
"copying block {} from {} ({}) to {}",
blknum, src_rel, src_key, dst_rel
);
let content = modification
.tline

View File

@@ -21,12 +21,21 @@ class Workload:
- reads, checking we get the right data (`validate`)
"""
def __init__(self, env: NeonEnv, tenant_id: TenantId, timeline_id: TimelineId):
def __init__(
self,
env: NeonEnv,
tenant_id: TenantId,
timeline_id: TimelineId,
branch_name: Optional[str] = None,
):
self.env = env
self.tenant_id = tenant_id
self.timeline_id = timeline_id
self.table = "foo"
# By default, use the default branch name for initial tenant in NeonEnv
self.branch_name = branch_name or "main"
self.expect_rows = 0
self.churn_cursor = 0
@@ -35,7 +44,7 @@ class Workload:
def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint:
if self._endpoint is None:
self._endpoint = self.env.endpoints.create(
"main",
self.branch_name,
tenant_id=self.tenant_id,
pageserver_id=pageserver_id,
endpoint_id="ep-workload",

View File

@@ -1,4 +1,6 @@
import random
from contextlib import closing
from typing import Optional
import pytest
from fixtures.log_helper import log
@@ -141,18 +143,24 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
# Test that repeatedly kills and restarts the page server, while the
# safekeeper and compute node keep running.
@pytest.mark.timeout(540)
def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder, build_type: str):
@pytest.mark.parametrize("shard_count", [None, 4])
def test_pageserver_chaos(
neon_env_builder: NeonEnvBuilder, build_type: str, shard_count: Optional[int]
):
if build_type == "debug":
pytest.skip("times out in debug builds")
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
# these can happen, if we shutdown at a good time. to be fixed as part of #5172.
message = ".*duplicated L1 layer layer=.*"
env.pageserver.allowed_errors.append(message)
for ps in env.pageservers:
ps.allowed_errors.append(message)
# Use a tiny checkpoint distance, to create a lot of layers quickly.
# That allows us to stress the compaction and layer flushing logic more.
@@ -192,13 +200,19 @@ def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder, build_type: str):
log.info(f"shared_buffers is {row[0]}, table size {row[1]}")
assert int(row[0]) < int(row[1])
# We run "random" kills using a fixed seed, to improve reproducibility if a test
# failure is related to a particular order of operations.
seed = 0xDEADBEEF
rng = random.Random(seed)
# Update the whole table, then immediately kill and restart the pageserver
for i in range(1, 15):
endpoint.safe_psql("UPDATE foo set updates = updates + 1")
# This kills the pageserver immediately, to simulate a crash
env.pageserver.stop(immediate=True)
env.pageserver.start()
to_kill = rng.choice(env.pageservers)
to_kill.stop(immediate=True)
to_kill.start()
# Check that all the updates are visible
num_updates = endpoint.safe_psql("SELECT sum(updates) FROM foo")[0][0]

View File

@@ -2,25 +2,40 @@
# This file runs pg_regress-based tests.
#
from pathlib import Path
from typing import Optional
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
import pytest
from fixtures.neon_fixtures import (
NeonEnvBuilder,
check_restored_datadir_content,
)
from fixtures.remote_storage import s3_storage
# Run the main PostgreSQL regression tests, in src/test/regress.
#
@pytest.mark.parametrize("shard_count", [None, 4])
def test_pg_regress(
neon_simple_env: NeonEnv,
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_bin,
capsys,
base_dir: Path,
pg_distrib_dir: Path,
shard_count: Optional[int],
):
env = neon_simple_env
"""
:param shard_count: if None, create an unsharded tenant. Otherwise create a tenant with this
many shards.
"""
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
env.neon_cli.create_branch("test_pg_regress", "empty")
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("test_pg_regress")
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("CREATE DATABASE regression")
# Create some local directories for pg_regress to run in.
@@ -61,22 +76,25 @@ def test_pg_regress(
# Run the PostgreSQL "isolation" tests, in src/test/isolation.
#
@pytest.mark.parametrize("shard_count", [None, 4])
def test_isolation(
neon_simple_env: NeonEnv,
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_bin,
capsys,
base_dir: Path,
pg_distrib_dir: Path,
shard_count: Optional[int],
):
env = neon_simple_env
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
env.neon_cli.create_branch("test_isolation", "empty")
# Connect to postgres and create a database called "regression".
# isolation tests use prepared transactions, so enable them
endpoint = env.endpoints.create_start(
"test_isolation", config_lines=["max_prepared_transactions=100"]
)
endpoint = env.endpoints.create_start("main", config_lines=["max_prepared_transactions=100"])
endpoint.safe_psql("CREATE DATABASE isolation_regression")
# Create some local directories for pg_isolation_regress to run in.
@@ -114,19 +132,24 @@ def test_isolation(
# Run extra Neon-specific pg_regress-based tests. The tests and their
# schedule file are in the sql_regress/ directory.
@pytest.mark.parametrize("shard_count", [None, 4])
def test_sql_regress(
neon_simple_env: NeonEnv,
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_bin,
capsys,
base_dir: Path,
pg_distrib_dir: Path,
shard_count: Optional[int],
):
env = neon_simple_env
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
env.neon_cli.create_branch("test_sql_regress", "empty")
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("test_sql_regress")
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("CREATE DATABASE regression")
# Create some local directories for pg_regress to run in.

View File

@@ -0,0 +1,85 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.remote_storage import s3_storage
from fixtures.types import TimelineId
from fixtures.workload import Workload
def test_sharding_smoke(
neon_env_builder: NeonEnvBuilder,
):
"""
Test the basic lifecycle of a sharded tenant:
- ingested data gets split up
- page service reads
- timeline creation and deletion
- splits
"""
shard_count = 4
neon_env_builder.num_pageservers = shard_count
# 1MiB stripes: enable getting some meaningful data distribution without
# writing large quantities of data in this test. The stripe size is given
# in number of 8KiB pages.
stripe_size = 128
# Use S3-compatible remote storage so that we can scrub: this test validates
# that the scrubber doesn't barf when it sees a sharded tenant.
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
neon_env_builder.preserve_database_files = True
env = neon_env_builder.init_start(
initial_tenant_shard_count=shard_count, initial_tenant_shard_stripe_size=stripe_size
)
tenant_id = env.initial_tenant
pageservers = dict((int(p.id), p) for p in env.pageservers)
shards = env.attachment_service.locate(tenant_id)
def get_sizes():
sizes = {}
for shard in shards:
node_id = int(shard["node_id"])
pageserver = pageservers[node_id]
sizes[node_id] = pageserver.http_client().tenant_status(shard["shard_id"])[
"current_physical_size"
]
log.info(f"sizes = {sizes}")
return sizes
# Test that timeline creation works on a sharded tenant
timeline_b = env.neon_cli.create_branch("branch_b", tenant_id=tenant_id)
# Test that we can write data to a sharded tenant
workload = Workload(env, tenant_id, timeline_b, branch_name="branch_b")
workload.init()
sizes_before = get_sizes()
workload.write_rows(256)
# Test that we can read data back from a sharded tenant
workload.validate()
# Validate that the data is spread across pageservers
sizes_after = get_sizes()
# Our sizes increased when we wrote data
assert sum(sizes_after.values()) > sum(sizes_before.values())
# That increase is present on all shards
assert all(sizes_after[ps.id] > sizes_before[ps.id] for ps in env.pageservers)
# Validate that timeline list API works properly on all shards
for shard in shards:
node_id = int(shard["node_id"])
pageserver = pageservers[node_id]
timelines = set(
TimelineId(tl["timeline_id"])
for tl in pageserver.http_client().timeline_list(shard["shard_id"])
)
assert timelines == {env.initial_timeline, timeline_b}
# TODO: test timeline deletion and tenant deletion (depends on change in attachment_service)