diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 3183608862..5a6f9a590f 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1033,7 +1033,23 @@ impl WalIngest { // Copy content debug!("copying rel {} to {}, {} blocks", src_rel, dst_rel, nblocks); for blknum in 0..nblocks { - debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel); + // Sharding: + // - src and dst are always on the same shard, because they differ only by dbNode, and + // dbNode is not included in the hash inputs for sharding. + // - This WAL command is replayed on all shards, but each shard only copies the blocks + // that belong to it. + let src_key = rel_block_to_key(src_rel, blknum); + if !self.shard.is_key_local(&src_key) { + debug!( + "Skipping non-local key {} during XLOG_DBASE_CREATE", + src_key + ); + continue; + } + debug!( + "copying block {} from {} ({}) to {}", + blknum, src_rel, src_key, dst_rel + ); let content = modification .tline diff --git a/test_runner/fixtures/workload.py b/test_runner/fixtures/workload.py index 30def1194d..f29a6cbf3c 100644 --- a/test_runner/fixtures/workload.py +++ b/test_runner/fixtures/workload.py @@ -21,12 +21,21 @@ class Workload: - reads, checking we get the right data (`validate`) """ - def __init__(self, env: NeonEnv, tenant_id: TenantId, timeline_id: TimelineId): + def __init__( + self, + env: NeonEnv, + tenant_id: TenantId, + timeline_id: TimelineId, + branch_name: Optional[str] = None, + ): self.env = env self.tenant_id = tenant_id self.timeline_id = timeline_id self.table = "foo" + # By default, use the default branch name for initial tenant in NeonEnv + self.branch_name = branch_name or "main" + self.expect_rows = 0 self.churn_cursor = 0 @@ -35,7 +44,7 @@ class Workload: def endpoint(self, pageserver_id: Optional[int] = None) -> Endpoint: if self._endpoint is None: self._endpoint = self.env.endpoints.create( - "main", + self.branch_name, tenant_id=self.tenant_id, pageserver_id=pageserver_id, endpoint_id="ep-workload", diff --git a/test_runner/regress/test_pageserver_restart.py b/test_runner/regress/test_pageserver_restart.py index c4499196b5..753898f747 100644 --- a/test_runner/regress/test_pageserver_restart.py +++ b/test_runner/regress/test_pageserver_restart.py @@ -1,4 +1,6 @@ +import random from contextlib import closing +from typing import Optional import pytest from fixtures.log_helper import log @@ -141,18 +143,24 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder): # Test that repeatedly kills and restarts the page server, while the # safekeeper and compute node keep running. @pytest.mark.timeout(540) -def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder, build_type: str): +@pytest.mark.parametrize("shard_count", [None, 4]) +def test_pageserver_chaos( + neon_env_builder: NeonEnvBuilder, build_type: str, shard_count: Optional[int] +): if build_type == "debug": pytest.skip("times out in debug builds") neon_env_builder.enable_pageserver_remote_storage(s3_storage()) neon_env_builder.enable_scrub_on_exit() + if shard_count is not None: + neon_env_builder.num_pageservers = shard_count - env = neon_env_builder.init_start() + env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count) # these can happen, if we shutdown at a good time. to be fixed as part of #5172. message = ".*duplicated L1 layer layer=.*" - env.pageserver.allowed_errors.append(message) + for ps in env.pageservers: + ps.allowed_errors.append(message) # Use a tiny checkpoint distance, to create a lot of layers quickly. # That allows us to stress the compaction and layer flushing logic more. @@ -192,13 +200,19 @@ def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder, build_type: str): log.info(f"shared_buffers is {row[0]}, table size {row[1]}") assert int(row[0]) < int(row[1]) + # We run "random" kills using a fixed seed, to improve reproducibility if a test + # failure is related to a particular order of operations. + seed = 0xDEADBEEF + rng = random.Random(seed) + # Update the whole table, then immediately kill and restart the pageserver for i in range(1, 15): endpoint.safe_psql("UPDATE foo set updates = updates + 1") # This kills the pageserver immediately, to simulate a crash - env.pageserver.stop(immediate=True) - env.pageserver.start() + to_kill = rng.choice(env.pageservers) + to_kill.stop(immediate=True) + to_kill.start() # Check that all the updates are visible num_updates = endpoint.safe_psql("SELECT sum(updates) FROM foo")[0][0] diff --git a/test_runner/regress/test_pg_regress.py b/test_runner/regress/test_pg_regress.py index f26d04e2f3..e4219ec7a6 100644 --- a/test_runner/regress/test_pg_regress.py +++ b/test_runner/regress/test_pg_regress.py @@ -2,25 +2,40 @@ # This file runs pg_regress-based tests. # from pathlib import Path +from typing import Optional -from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content +import pytest +from fixtures.neon_fixtures import ( + NeonEnvBuilder, + check_restored_datadir_content, +) +from fixtures.remote_storage import s3_storage # Run the main PostgreSQL regression tests, in src/test/regress. # +@pytest.mark.parametrize("shard_count", [None, 4]) def test_pg_regress( - neon_simple_env: NeonEnv, + neon_env_builder: NeonEnvBuilder, test_output_dir: Path, pg_bin, capsys, base_dir: Path, pg_distrib_dir: Path, + shard_count: Optional[int], ): - env = neon_simple_env + """ + :param shard_count: if None, create an unsharded tenant. Otherwise create a tenant with this + many shards. + """ + if shard_count is not None: + neon_env_builder.num_pageservers = shard_count + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + neon_env_builder.enable_scrub_on_exit() + env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count) - env.neon_cli.create_branch("test_pg_regress", "empty") # Connect to postgres and create a database called "regression". - endpoint = env.endpoints.create_start("test_pg_regress") + endpoint = env.endpoints.create_start("main") endpoint.safe_psql("CREATE DATABASE regression") # Create some local directories for pg_regress to run in. @@ -61,22 +76,25 @@ def test_pg_regress( # Run the PostgreSQL "isolation" tests, in src/test/isolation. # +@pytest.mark.parametrize("shard_count", [None, 4]) def test_isolation( - neon_simple_env: NeonEnv, + neon_env_builder: NeonEnvBuilder, test_output_dir: Path, pg_bin, capsys, base_dir: Path, pg_distrib_dir: Path, + shard_count: Optional[int], ): - env = neon_simple_env + if shard_count is not None: + neon_env_builder.num_pageservers = shard_count + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + neon_env_builder.enable_scrub_on_exit() + env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count) - env.neon_cli.create_branch("test_isolation", "empty") # Connect to postgres and create a database called "regression". # isolation tests use prepared transactions, so enable them - endpoint = env.endpoints.create_start( - "test_isolation", config_lines=["max_prepared_transactions=100"] - ) + endpoint = env.endpoints.create_start("main", config_lines=["max_prepared_transactions=100"]) endpoint.safe_psql("CREATE DATABASE isolation_regression") # Create some local directories for pg_isolation_regress to run in. @@ -114,19 +132,24 @@ def test_isolation( # Run extra Neon-specific pg_regress-based tests. The tests and their # schedule file are in the sql_regress/ directory. +@pytest.mark.parametrize("shard_count", [None, 4]) def test_sql_regress( - neon_simple_env: NeonEnv, + neon_env_builder: NeonEnvBuilder, test_output_dir: Path, pg_bin, capsys, base_dir: Path, pg_distrib_dir: Path, + shard_count: Optional[int], ): - env = neon_simple_env + if shard_count is not None: + neon_env_builder.num_pageservers = shard_count + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + neon_env_builder.enable_scrub_on_exit() + env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count) - env.neon_cli.create_branch("test_sql_regress", "empty") # Connect to postgres and create a database called "regression". - endpoint = env.endpoints.create_start("test_sql_regress") + endpoint = env.endpoints.create_start("main") endpoint.safe_psql("CREATE DATABASE regression") # Create some local directories for pg_regress to run in. diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py new file mode 100644 index 0000000000..c16bfc2ec6 --- /dev/null +++ b/test_runner/regress/test_sharding.py @@ -0,0 +1,85 @@ +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnvBuilder, +) +from fixtures.remote_storage import s3_storage +from fixtures.types import TimelineId +from fixtures.workload import Workload + + +def test_sharding_smoke( + neon_env_builder: NeonEnvBuilder, +): + """ + Test the basic lifecycle of a sharded tenant: + - ingested data gets split up + - page service reads + - timeline creation and deletion + - splits + """ + + shard_count = 4 + neon_env_builder.num_pageservers = shard_count + + # 1MiB stripes: enable getting some meaningful data distribution without + # writing large quantities of data in this test. The stripe size is given + # in number of 8KiB pages. + stripe_size = 128 + + # Use S3-compatible remote storage so that we can scrub: this test validates + # that the scrubber doesn't barf when it sees a sharded tenant. + neon_env_builder.enable_pageserver_remote_storage(s3_storage()) + neon_env_builder.enable_scrub_on_exit() + + neon_env_builder.preserve_database_files = True + + env = neon_env_builder.init_start( + initial_tenant_shard_count=shard_count, initial_tenant_shard_stripe_size=stripe_size + ) + tenant_id = env.initial_tenant + + pageservers = dict((int(p.id), p) for p in env.pageservers) + shards = env.attachment_service.locate(tenant_id) + + def get_sizes(): + sizes = {} + for shard in shards: + node_id = int(shard["node_id"]) + pageserver = pageservers[node_id] + sizes[node_id] = pageserver.http_client().tenant_status(shard["shard_id"])[ + "current_physical_size" + ] + log.info(f"sizes = {sizes}") + return sizes + + # Test that timeline creation works on a sharded tenant + timeline_b = env.neon_cli.create_branch("branch_b", tenant_id=tenant_id) + + # Test that we can write data to a sharded tenant + workload = Workload(env, tenant_id, timeline_b, branch_name="branch_b") + workload.init() + + sizes_before = get_sizes() + workload.write_rows(256) + + # Test that we can read data back from a sharded tenant + workload.validate() + + # Validate that the data is spread across pageservers + sizes_after = get_sizes() + # Our sizes increased when we wrote data + assert sum(sizes_after.values()) > sum(sizes_before.values()) + # That increase is present on all shards + assert all(sizes_after[ps.id] > sizes_before[ps.id] for ps in env.pageservers) + + # Validate that timeline list API works properly on all shards + for shard in shards: + node_id = int(shard["node_id"]) + pageserver = pageservers[node_id] + timelines = set( + TimelineId(tl["timeline_id"]) + for tl in pageserver.http_client().timeline_list(shard["shard_id"]) + ) + assert timelines == {env.initial_timeline, timeline_b} + + # TODO: test timeline deletion and tenant deletion (depends on change in attachment_service)