pageserver: adjust checkpoint distance for sharded tenants (#6852)

## Problem

Where the stripe size is the same order of magnitude as the checkpoint
distance (such as with default settings), tenant shards can easily pass
through `checkpoint_distance` bytes of LSN without actually ingesting
anything. This results in emitting many tiny L0 delta layers.

## Summary of changes

- Multiply checkpoint distance by shard count before comparing with LSN
distance. This is a heuristic and does not guarantee that we won't emit
small layers, but it fixes the issue for typical cases where the writes
in a (checkpoint_distance * shard_count) range of LSN bytes are somewhat
distributed across shards.
- Add a test that checks the size of layers after ingesting to a sharded
tenant; this fails before the fix.

---------

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
This commit is contained in:
John Spray
2024-02-21 14:12:35 +00:00
committed by GitHub
parent 428d9fe69e
commit 84f027357d
3 changed files with 95 additions and 7 deletions

View File

@@ -5192,11 +5192,15 @@ impl<'a> TimelineWriter<'a> {
// Rolling the open layer can be triggered by:
// 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that
// the safekeepers need to store.
// the safekeepers need to store. For sharded tenants, we multiply by shard count to
// account for how writes are distributed across shards: we expect each node to consume
// 1/count of the LSN on average.
// 2. The size of the currently open layer.
// 3. The time since the last roll. It helps safekeepers to regard pageserver as caught
// up and suspend activity.
if distance >= self.get_checkpoint_distance().into() {
if distance
>= self.get_checkpoint_distance() as i128 * self.shard_identity.count.count() as i128
{
info!(
"Will roll layer at {} with layer size {} due to LSN distance ({})",
lsn, state.current_size, distance

View File

@@ -73,7 +73,7 @@ class Workload:
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
def write_rows(self, n, pageserver_id: Optional[int] = None):
def write_rows(self, n, pageserver_id: Optional[int] = None, upload: bool = True):
endpoint = self.endpoint(pageserver_id)
start = self.expect_rows
end = start + n - 1
@@ -87,9 +87,12 @@ class Workload:
"""
)
return last_flush_lsn_upload(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
if upload:
return last_flush_lsn_upload(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
else:
return False
def churn_rows(self, n, pageserver_id: Optional[int] = None, upload=True):
assert self.expect_rows >= n

View File

@@ -4,7 +4,7 @@ from fixtures.neon_fixtures import (
tenant_get_shards,
)
from fixtures.remote_storage import s3_storage
from fixtures.types import TenantShardId, TimelineId
from fixtures.types import Lsn, TenantShardId, TimelineId
from fixtures.workload import Workload
@@ -284,3 +284,84 @@ def test_sharding_split_smoke(
)
env.attachment_service.consistency_check()
def test_sharding_ingest(
neon_env_builder: NeonEnvBuilder,
):
"""
Check behaviors related to ingest:
- That we generate properly sized layers
- TODO: that updates to remote_consistent_lsn are made correctly via safekeepers
"""
# Set a small stripe size and checkpoint distance, so that we can exercise rolling logic
# without writing a lot of data.
expect_layer_size = 131072
TENANT_CONF = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{expect_layer_size}",
"compaction_target_size": f"{expect_layer_size}",
}
shard_count = 4
neon_env_builder.num_pageservers = shard_count
env = neon_env_builder.init_start(
initial_tenant_conf=TENANT_CONF,
initial_tenant_shard_count=shard_count,
# A stripe size the same order of magnitude as layer size: this ensures that
# within checkpoint_distance some shards will have no data to ingest, if LSN
# contains sequential page writes. This test checks that this kind of
# scenario doesn't result in some shards emitting empty/tiny layers.
initial_tenant_shard_stripe_size=expect_layer_size // 8192,
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(512, upload=False)
workload.write_rows(512, upload=False)
workload.write_rows(512, upload=False)
workload.write_rows(512, upload=False)
workload.validate()
small_layer_count = 0
ok_layer_count = 0
huge_layer_count = 0
# Inspect the resulting layer map, count how many layers are undersized.
for shard in env.attachment_service.locate(tenant_id):
pageserver = env.get_pageserver(shard["node_id"])
shard_id = shard["shard_id"]
layer_map = pageserver.http_client().layer_map_info(shard_id, timeline_id)
for layer in layer_map.historic_layers:
assert layer.layer_file_size is not None
if layer.layer_file_size < expect_layer_size // 2:
classification = "Small"
small_layer_count += 1
elif layer.layer_file_size > expect_layer_size * 2:
classification = "Huge "
huge_layer_count += 1
else:
classification = "OK "
ok_layer_count += 1
if layer.kind == "Delta":
assert layer.lsn_end is not None
lsn_size = Lsn(layer.lsn_end) - Lsn(layer.lsn_start)
else:
lsn_size = 0
log.info(
f"{classification} layer[{pageserver.id}]: {layer.layer_file_name} (size {layer.layer_file_size}, LSN distance {lsn_size})"
)
# Why an inexact check?
# - Because we roll layers on checkpoint_distance * shard_count, we expect to obey the target
# layer size on average, but it is still possible to write some tiny layers.
log.info(f"Totals: {small_layer_count} small layers, {ok_layer_count} ok layers")
assert float(small_layer_count) / float(ok_layer_count) < 0.25
# Each shard may emit up to one huge layer, because initdb ingest doesn't respect checkpoint_distance.
assert huge_layer_count <= shard_count