mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
## Summary - Currently we can set stripe size at tenant creation, but it doesn't mean anything until we have multiple shards - When onboarding an existing tenant, it will always get a default shard stripe size, so we would like to be able to pick the actual stripe size at the point we split. ## Why do this inline with a split? The alternative to this change would be to have a separate endpoint on the storage controller for setting the stripe size on a tenant, and only permit writes to that endpoint when the tenant has only a single shard. That would work, but be a little bit more work for a client, and not appreciably simpler (instead of having a special argument to the split functions, we'd have a special separate endpoint, and a requirement that the controller must sync its config down to the pageserver before calling the split API). Either approach would work, but this one feels a bit more robust end-to-end: the split API is the _very last moment_ that the stripe size is mutable, so if we aim to set it before splitting, it makes sense to do it as part of the same operation.
498 lines
18 KiB
Python
498 lines
18 KiB
Python
import os
|
|
from typing import Dict, List, Union
|
|
|
|
import pytest
|
|
from fixtures.log_helper import log
|
|
from fixtures.neon_fixtures import (
|
|
NeonEnvBuilder,
|
|
tenant_get_shards,
|
|
)
|
|
from fixtures.remote_storage import s3_storage
|
|
from fixtures.types import Lsn, TenantShardId, TimelineId
|
|
from fixtures.utils import wait_until
|
|
from fixtures.workload import Workload
|
|
from pytest_httpserver import HTTPServer
|
|
from werkzeug.wrappers.request import Request
|
|
from werkzeug.wrappers.response import Response
|
|
|
|
|
|
def test_sharding_smoke(
|
|
neon_env_builder: NeonEnvBuilder,
|
|
):
|
|
"""
|
|
Test the basic lifecycle of a sharded tenant:
|
|
- ingested data gets split up
|
|
- page service reads
|
|
- timeline creation and deletion
|
|
- splits
|
|
"""
|
|
|
|
shard_count = 4
|
|
neon_env_builder.num_pageservers = shard_count
|
|
|
|
# 1MiB stripes: enable getting some meaningful data distribution without
|
|
# writing large quantities of data in this test. The stripe size is given
|
|
# in number of 8KiB pages.
|
|
stripe_size = 128
|
|
|
|
# Use S3-compatible remote storage so that we can scrub: this test validates
|
|
# that the scrubber doesn't barf when it sees a sharded tenant.
|
|
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
|
neon_env_builder.enable_scrub_on_exit()
|
|
|
|
neon_env_builder.preserve_database_files = True
|
|
|
|
env = neon_env_builder.init_start(
|
|
initial_tenant_shard_count=shard_count, initial_tenant_shard_stripe_size=stripe_size
|
|
)
|
|
tenant_id = env.initial_tenant
|
|
|
|
pageservers = dict((int(p.id), p) for p in env.pageservers)
|
|
shards = env.storage_controller.locate(tenant_id)
|
|
|
|
def get_sizes():
|
|
sizes = {}
|
|
for shard in shards:
|
|
node_id = int(shard["node_id"])
|
|
pageserver = pageservers[node_id]
|
|
sizes[node_id] = pageserver.http_client().tenant_status(shard["shard_id"])[
|
|
"current_physical_size"
|
|
]
|
|
log.info(f"sizes = {sizes}")
|
|
return sizes
|
|
|
|
# Test that timeline creation works on a sharded tenant
|
|
timeline_b = env.neon_cli.create_branch("branch_b", tenant_id=tenant_id)
|
|
|
|
# Test that we can write data to a sharded tenant
|
|
workload = Workload(env, tenant_id, timeline_b, branch_name="branch_b")
|
|
workload.init()
|
|
|
|
sizes_before = get_sizes()
|
|
workload.write_rows(256)
|
|
|
|
# Test that we can read data back from a sharded tenant
|
|
workload.validate()
|
|
|
|
# Validate that the data is spread across pageservers
|
|
sizes_after = get_sizes()
|
|
# Our sizes increased when we wrote data
|
|
assert sum(sizes_after.values()) > sum(sizes_before.values())
|
|
# That increase is present on all shards
|
|
assert all(sizes_after[ps.id] > sizes_before[ps.id] for ps in env.pageservers)
|
|
|
|
# Validate that timeline list API works properly on all shards
|
|
for shard in shards:
|
|
node_id = int(shard["node_id"])
|
|
pageserver = pageservers[node_id]
|
|
timelines = set(
|
|
TimelineId(tl["timeline_id"])
|
|
for tl in pageserver.http_client().timeline_list(shard["shard_id"])
|
|
)
|
|
assert timelines == {env.initial_timeline, timeline_b}
|
|
|
|
env.storage_controller.consistency_check()
|
|
|
|
|
|
def test_sharding_split_unsharded(
|
|
neon_env_builder: NeonEnvBuilder,
|
|
):
|
|
"""
|
|
Test that shard splitting works on a tenant created as unsharded (i.e. with
|
|
ShardCount(0)).
|
|
"""
|
|
env = neon_env_builder.init_start()
|
|
tenant_id = env.initial_tenant
|
|
timeline_id = env.initial_timeline
|
|
|
|
# Check that we created with an unsharded TenantShardId: this is the default,
|
|
# but check it in case we change the default in future
|
|
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 0)) is not None
|
|
|
|
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
|
|
workload.init()
|
|
workload.write_rows(256)
|
|
workload.validate()
|
|
|
|
# Split one shard into two
|
|
env.storage_controller.tenant_shard_split(tenant_id, shard_count=2)
|
|
|
|
# Check we got the shard IDs we expected
|
|
assert env.storage_controller.inspect(TenantShardId(tenant_id, 0, 2)) is not None
|
|
assert env.storage_controller.inspect(TenantShardId(tenant_id, 1, 2)) is not None
|
|
|
|
workload.validate()
|
|
|
|
env.storage_controller.consistency_check()
|
|
|
|
|
|
def test_sharding_split_smoke(
|
|
neon_env_builder: NeonEnvBuilder,
|
|
):
|
|
"""
|
|
Test the basics of shard splitting:
|
|
- The API results in more shards than we started with
|
|
- The tenant's data remains readable
|
|
|
|
"""
|
|
|
|
# We will start with 4 shards and split into 8, then migrate all those
|
|
# 8 shards onto separate pageservers
|
|
shard_count = 4
|
|
split_shard_count = 8
|
|
neon_env_builder.num_pageservers = split_shard_count
|
|
|
|
# 1MiB stripes: enable getting some meaningful data distribution without
|
|
# writing large quantities of data in this test. The stripe size is given
|
|
# in number of 8KiB pages.
|
|
stripe_size = 128
|
|
|
|
# Use S3-compatible remote storage so that we can scrub: this test validates
|
|
# that the scrubber doesn't barf when it sees a sharded tenant.
|
|
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
|
|
neon_env_builder.enable_scrub_on_exit()
|
|
|
|
neon_env_builder.preserve_database_files = True
|
|
|
|
env = neon_env_builder.init_start(
|
|
initial_tenant_shard_count=shard_count, initial_tenant_shard_stripe_size=stripe_size
|
|
)
|
|
tenant_id = env.initial_tenant
|
|
timeline_id = env.initial_timeline
|
|
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
|
|
workload.init()
|
|
|
|
# Initial data
|
|
workload.write_rows(256)
|
|
|
|
# Note which pageservers initially hold a shard after tenant creation
|
|
pre_split_pageserver_ids = [loc["node_id"] for loc in env.storage_controller.locate(tenant_id)]
|
|
|
|
# For pageservers holding a shard, validate their ingest statistics
|
|
# reflect a proper splitting of the WAL.
|
|
for pageserver in env.pageservers:
|
|
if pageserver.id not in pre_split_pageserver_ids:
|
|
continue
|
|
|
|
metrics = pageserver.http_client().get_metrics_values(
|
|
[
|
|
"pageserver_wal_ingest_records_received_total",
|
|
"pageserver_wal_ingest_records_committed_total",
|
|
"pageserver_wal_ingest_records_filtered_total",
|
|
]
|
|
)
|
|
|
|
log.info(f"Pageserver {pageserver.id} metrics: {metrics}")
|
|
|
|
# Not everything received was committed
|
|
assert (
|
|
metrics["pageserver_wal_ingest_records_received_total"]
|
|
> metrics["pageserver_wal_ingest_records_committed_total"]
|
|
)
|
|
|
|
# Something was committed
|
|
assert metrics["pageserver_wal_ingest_records_committed_total"] > 0
|
|
|
|
# Counts are self consistent
|
|
assert (
|
|
metrics["pageserver_wal_ingest_records_received_total"]
|
|
== metrics["pageserver_wal_ingest_records_committed_total"]
|
|
+ metrics["pageserver_wal_ingest_records_filtered_total"]
|
|
)
|
|
|
|
# TODO: validate that shards have different sizes
|
|
|
|
workload.validate()
|
|
|
|
assert len(pre_split_pageserver_ids) == 4
|
|
|
|
def shards_on_disk(shard_ids):
|
|
for pageserver in env.pageservers:
|
|
for shard_id in shard_ids:
|
|
if pageserver.tenant_dir(shard_id).exists():
|
|
return True
|
|
|
|
return False
|
|
|
|
old_shard_ids = [TenantShardId(tenant_id, i, shard_count) for i in range(0, shard_count)]
|
|
# Before split, old shards exist
|
|
assert shards_on_disk(old_shard_ids)
|
|
|
|
env.storage_controller.tenant_shard_split(tenant_id, shard_count=split_shard_count)
|
|
|
|
post_split_pageserver_ids = [loc["node_id"] for loc in env.storage_controller.locate(tenant_id)]
|
|
# We should have split into 8 shards, on the same 4 pageservers we started on.
|
|
assert len(post_split_pageserver_ids) == split_shard_count
|
|
assert len(set(post_split_pageserver_ids)) == shard_count
|
|
assert set(post_split_pageserver_ids) == set(pre_split_pageserver_ids)
|
|
|
|
# The old parent shards should no longer exist on disk
|
|
assert not shards_on_disk(old_shard_ids)
|
|
|
|
workload.validate()
|
|
|
|
workload.churn_rows(256)
|
|
|
|
workload.validate()
|
|
|
|
# Run GC on all new shards, to check they don't barf or delete anything that breaks reads
|
|
# (compaction was already run as part of churn_rows)
|
|
all_shards = tenant_get_shards(env, tenant_id)
|
|
for tenant_shard_id, pageserver in all_shards:
|
|
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
|
|
workload.validate()
|
|
|
|
migrate_to_pageserver_ids = list(
|
|
set(p.id for p in env.pageservers) - set(pre_split_pageserver_ids)
|
|
)
|
|
assert len(migrate_to_pageserver_ids) == split_shard_count - shard_count
|
|
|
|
# Migrate shards away from the node where the split happened
|
|
for ps_id in pre_split_pageserver_ids:
|
|
shards_here = [
|
|
tenant_shard_id
|
|
for (tenant_shard_id, pageserver) in all_shards
|
|
if pageserver.id == ps_id
|
|
]
|
|
assert len(shards_here) == 2
|
|
migrate_shard = shards_here[0]
|
|
destination = migrate_to_pageserver_ids.pop()
|
|
|
|
log.info(f"Migrating shard {migrate_shard} from {ps_id} to {destination}")
|
|
env.neon_cli.tenant_migrate(migrate_shard, destination, timeout_secs=10)
|
|
|
|
workload.validate()
|
|
|
|
# Check that we didn't do any spurious reconciliations.
|
|
# Total number of reconciles should have been one per original shard, plus
|
|
# one for each shard that was migrated.
|
|
reconcile_ok = env.storage_controller.get_metric_value(
|
|
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
|
)
|
|
assert reconcile_ok == shard_count + split_shard_count // 2
|
|
|
|
# Check that no cancelled or errored reconciliations occurred: this test does no
|
|
# failure injection and should run clean.
|
|
assert (
|
|
env.storage_controller.get_metric_value(
|
|
"storage_controller_reconcile_complete_total", filter={"status": "cancel"}
|
|
)
|
|
is None
|
|
)
|
|
assert (
|
|
env.storage_controller.get_metric_value(
|
|
"storage_controller_reconcile_complete_total", filter={"status": "error"}
|
|
)
|
|
is None
|
|
)
|
|
|
|
env.storage_controller.consistency_check()
|
|
|
|
# Validate pageserver state
|
|
shards_exist: list[TenantShardId] = []
|
|
for pageserver in env.pageservers:
|
|
locations = pageserver.http_client().tenant_list_locations()
|
|
shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"])
|
|
|
|
log.info("Shards after split: {shards_exist}")
|
|
assert len(shards_exist) == split_shard_count
|
|
|
|
# Ensure post-split pageserver locations survive a restart (i.e. the child shards
|
|
# correctly wrote config to disk, and the storage controller responds correctly
|
|
# to /re-attach)
|
|
for pageserver in env.pageservers:
|
|
pageserver.stop()
|
|
pageserver.start()
|
|
|
|
shards_exist = []
|
|
for pageserver in env.pageservers:
|
|
locations = pageserver.http_client().tenant_list_locations()
|
|
shards_exist.extend(TenantShardId.parse(s[0]) for s in locations["tenant_shards"])
|
|
|
|
log.info("Shards after restart: {shards_exist}")
|
|
assert len(shards_exist) == split_shard_count
|
|
|
|
workload.validate()
|
|
|
|
|
|
@pytest.mark.parametrize("initial_stripe_size", [None, 65536])
|
|
def test_sharding_split_stripe_size(
|
|
neon_env_builder: NeonEnvBuilder,
|
|
httpserver: HTTPServer,
|
|
httpserver_listen_address,
|
|
initial_stripe_size: int,
|
|
):
|
|
"""
|
|
Check that modifying stripe size inline with a shard split works as expected
|
|
"""
|
|
(host, port) = httpserver_listen_address
|
|
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
|
|
neon_env_builder.num_pageservers = 1
|
|
|
|
# Set up fake HTTP notify endpoint: we will use this to validate that we receive
|
|
# the correct stripe size after split.
|
|
notifications = []
|
|
|
|
def handler(request: Request):
|
|
log.info(f"Notify request: {request}")
|
|
notifications.append(request.json)
|
|
return Response(status=200)
|
|
|
|
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
|
|
|
|
env = neon_env_builder.init_start(
|
|
initial_tenant_shard_count=1, initial_tenant_shard_stripe_size=initial_stripe_size
|
|
)
|
|
tenant_id = env.initial_tenant
|
|
|
|
assert len(notifications) == 1
|
|
expect: Dict[str, Union[List[Dict[str, int]], str, None, int]] = {
|
|
"tenant_id": str(env.initial_tenant),
|
|
"stripe_size": None,
|
|
"shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}],
|
|
}
|
|
assert notifications[0] == expect
|
|
|
|
new_stripe_size = 2048
|
|
env.storage_controller.tenant_shard_split(
|
|
tenant_id, shard_count=2, shard_stripe_size=new_stripe_size
|
|
)
|
|
|
|
# Check that we ended up with the stripe size that we expected, both on the pageserver
|
|
# and in the notifications to compute
|
|
assert len(notifications) == 2
|
|
expect_after: Dict[str, Union[List[Dict[str, int]], str, None, int]] = {
|
|
"tenant_id": str(env.initial_tenant),
|
|
"stripe_size": new_stripe_size,
|
|
"shards": [
|
|
{"node_id": int(env.pageservers[0].id), "shard_number": 0},
|
|
{"node_id": int(env.pageservers[0].id), "shard_number": 1},
|
|
],
|
|
}
|
|
log.info(f"Got notification: {notifications[1]}")
|
|
assert notifications[1] == expect_after
|
|
|
|
# Inspect the stripe size on the pageserver
|
|
shard_0_loc = (
|
|
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 0, 2))
|
|
)
|
|
assert shard_0_loc["shard_stripe_size"] == new_stripe_size
|
|
shard_1_loc = (
|
|
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 1, 2))
|
|
)
|
|
assert shard_1_loc["shard_stripe_size"] == new_stripe_size
|
|
|
|
# Ensure stripe size survives a pageserver restart
|
|
env.pageservers[0].stop()
|
|
env.pageservers[0].start()
|
|
shard_0_loc = (
|
|
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 0, 2))
|
|
)
|
|
assert shard_0_loc["shard_stripe_size"] == new_stripe_size
|
|
shard_1_loc = (
|
|
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 1, 2))
|
|
)
|
|
assert shard_1_loc["shard_stripe_size"] == new_stripe_size
|
|
|
|
# Ensure stripe size survives a storage controller restart
|
|
env.storage_controller.stop()
|
|
env.storage_controller.start()
|
|
|
|
def assert_restart_notification():
|
|
assert len(notifications) == 3
|
|
assert notifications[2] == expect_after
|
|
|
|
wait_until(10, 1, assert_restart_notification)
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
# The quantity of data isn't huge, but debug can be _very_ slow, and the things we're
|
|
# validating in this test don't benefit much from debug assertions.
|
|
os.getenv("BUILD_TYPE") == "debug",
|
|
reason="Avoid running bulkier ingest tests in debug mode",
|
|
)
|
|
def test_sharding_ingest(
|
|
neon_env_builder: NeonEnvBuilder,
|
|
):
|
|
"""
|
|
Check behaviors related to ingest:
|
|
- That we generate properly sized layers
|
|
- TODO: that updates to remote_consistent_lsn are made correctly via safekeepers
|
|
"""
|
|
|
|
# Set a small stripe size and checkpoint distance, so that we can exercise rolling logic
|
|
# without writing a lot of data.
|
|
expect_layer_size = 131072
|
|
TENANT_CONF = {
|
|
# small checkpointing and compaction targets to ensure we generate many upload operations
|
|
"checkpoint_distance": f"{expect_layer_size}",
|
|
"compaction_target_size": f"{expect_layer_size}",
|
|
}
|
|
shard_count = 4
|
|
neon_env_builder.num_pageservers = shard_count
|
|
env = neon_env_builder.init_start(
|
|
initial_tenant_conf=TENANT_CONF,
|
|
initial_tenant_shard_count=shard_count,
|
|
# A stripe size the same order of magnitude as layer size: this ensures that
|
|
# within checkpoint_distance some shards will have no data to ingest, if LSN
|
|
# contains sequential page writes. This test checks that this kind of
|
|
# scenario doesn't result in some shards emitting empty/tiny layers.
|
|
initial_tenant_shard_stripe_size=expect_layer_size // 8192,
|
|
)
|
|
tenant_id = env.initial_tenant
|
|
timeline_id = env.initial_timeline
|
|
|
|
workload = Workload(env, tenant_id, timeline_id)
|
|
workload.init()
|
|
workload.write_rows(4096, upload=False)
|
|
workload.write_rows(4096, upload=False)
|
|
workload.write_rows(4096, upload=False)
|
|
workload.write_rows(4096, upload=False)
|
|
workload.validate()
|
|
|
|
small_layer_count = 0
|
|
ok_layer_count = 0
|
|
huge_layer_count = 0
|
|
|
|
# Inspect the resulting layer map, count how many layers are undersized.
|
|
for shard in env.storage_controller.locate(tenant_id):
|
|
pageserver = env.get_pageserver(shard["node_id"])
|
|
shard_id = shard["shard_id"]
|
|
layer_map = pageserver.http_client().layer_map_info(shard_id, timeline_id)
|
|
|
|
for layer in layer_map.historic_layers:
|
|
assert layer.layer_file_size is not None
|
|
if layer.layer_file_size < expect_layer_size // 2:
|
|
classification = "Small"
|
|
small_layer_count += 1
|
|
elif layer.layer_file_size > expect_layer_size * 2:
|
|
classification = "Huge "
|
|
huge_layer_count += 1
|
|
else:
|
|
classification = "OK "
|
|
ok_layer_count += 1
|
|
|
|
if layer.kind == "Delta":
|
|
assert layer.lsn_end is not None
|
|
lsn_size = Lsn(layer.lsn_end) - Lsn(layer.lsn_start)
|
|
else:
|
|
lsn_size = 0
|
|
|
|
log.info(
|
|
f"{classification} layer[{pageserver.id}]: {layer.layer_file_name} (size {layer.layer_file_size}, LSN distance {lsn_size})"
|
|
)
|
|
|
|
# Why an inexact check?
|
|
# - Because we roll layers on checkpoint_distance * shard_count, we expect to obey the target
|
|
# layer size on average, but it is still possible to write some tiny layers.
|
|
log.info(f"Totals: {small_layer_count} small layers, {ok_layer_count} ok layers")
|
|
if small_layer_count <= shard_count:
|
|
# If each shard has <= 1 small layer
|
|
pass
|
|
else:
|
|
# General case:
|
|
assert float(small_layer_count) / float(ok_layer_count) < 0.25
|
|
|
|
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
|
|
assert huge_layer_count <= shard_count
|