From 1b41db8bddfc1a89569346e1036df74f34454a4c Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 12 Mar 2024 20:41:08 +0000 Subject: [PATCH] pageserver: enable setting stripe size inline with split request. (#7093) ## Summary - Currently we can set stripe size at tenant creation, but it doesn't mean anything until we have multiple shards - When onboarding an existing tenant, it will always get a default shard stripe size, so we would like to be able to pick the actual stripe size at the point we split. ## Why do this inline with a split? The alternative to this change would be to have a separate endpoint on the storage controller for setting the stripe size on a tenant, and only permit writes to that endpoint when the tenant has only a single shard. That would work, but be a little bit more work for a client, and not appreciably simpler (instead of having a special argument to the split functions, we'd have a special separate endpoint, and a requirement that the controller must sync its config down to the pageserver before calling the split API). Either approach would work, but this one feels a bit more robust end-to-end: the split API is the _very last moment_ that the stripe size is mutable, so if we aim to set it before splitting, it makes sense to do it as part of the same operation. --- .../attachment_service/src/service.rs | 14 ++- control_plane/src/bin/neon_local.rs | 7 +- control_plane/src/storage_controller.rs | 8 +- libs/pageserver_api/src/models.rs | 7 ++ pageserver/src/http/routes.rs | 9 +- pageserver/src/tenant/mgr.rs | 28 +++++- test_runner/fixtures/neon_fixtures.py | 6 +- test_runner/fixtures/pageserver/http.py | 7 ++ test_runner/regress/test_sharding.py | 95 +++++++++++++++++++ 9 files changed, 168 insertions(+), 13 deletions(-) diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index a8498a39b5..ea301d0372 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -2222,7 +2222,18 @@ impl Service { // unwrap safety: we would have returned above if we didn't find at least one shard to split let old_shard_count = old_shard_count.unwrap(); - let shard_ident = shard_ident.unwrap(); + let shard_ident = if let Some(new_stripe_size) = split_req.new_stripe_size { + // This ShardIdentity will be used as the template for all children, so this implicitly + // applies the new stripe size to the children. + let mut shard_ident = shard_ident.unwrap(); + if shard_ident.count.count() > 1 && shard_ident.stripe_size != new_stripe_size { + return Err(ApiError::BadRequest(anyhow::anyhow!("Attempted to change stripe size ({:?}->{new_stripe_size:?}) on a tenant with multiple shards", shard_ident.stripe_size))); + } + shard_ident.stripe_size = new_stripe_size; + shard_ident + } else { + shard_ident.unwrap() + }; let policy = policy.unwrap(); // FIXME: we have dropped self.inner lock, and not yet written anything to the database: another @@ -2314,6 +2325,7 @@ impl Service { *parent_id, TenantShardSplitRequest { new_shard_count: split_req.new_shard_count, + new_stripe_size: split_req.new_stripe_size, }, ) .await diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 952229c4b7..6c722f36b4 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -585,10 +585,14 @@ async fn handle_tenant( Some(("shard-split", matches)) => { let tenant_id = get_tenant_id(matches, env)?; let shard_count: u8 = matches.get_one::("shard-count").cloned().unwrap_or(0); + let shard_stripe_size: Option = matches + .get_one::>("shard-stripe-size") + .cloned() + .unwrap(); let storage_controller = StorageController::from_env(env); let result = storage_controller - .tenant_split(tenant_id, shard_count) + .tenant_split(tenant_id, shard_count, shard_stripe_size) .await?; println!( "Split tenant {} into shards {}", @@ -1585,6 +1589,7 @@ fn cli() -> Command { .about("Increase the number of shards in the tenant") .arg(tenant_id_arg.clone()) .arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)")) + .arg(Arg::new("shard-stripe-size").value_parser(value_parser!(u32)).long("shard-stripe-size").action(ArgAction::Set).help("Sharding stripe size in pages")) ) ) .subcommand( diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index c505e67770..d7673f1b26 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -10,7 +10,7 @@ use pageserver_api::{ TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo, }, - shard::TenantShardId, + shard::{ShardStripeSize, TenantShardId}, }; use pageserver_client::mgmt_api::ResponseErrorMessageExt; use postgres_backend::AuthType; @@ -496,11 +496,15 @@ impl StorageController { &self, tenant_id: TenantId, new_shard_count: u8, + new_stripe_size: Option, ) -> anyhow::Result { self.dispatch( Method::PUT, format!("control/v1/tenant/{tenant_id}/shard_split"), - Some(TenantShardSplitRequest { new_shard_count }), + Some(TenantShardSplitRequest { + new_shard_count, + new_stripe_size, + }), ) .await } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index fe5bbd1c06..a96cc09158 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -198,6 +198,13 @@ pub struct TimelineCreateRequest { #[derive(Serialize, Deserialize)] pub struct TenantShardSplitRequest { pub new_shard_count: u8, + + // A tenant's stripe size is only meaningful the first time their shard count goes + // above 1: therefore during a split from 1->N shards, we may modify the stripe size. + // + // If this is set while the stripe count is being increased from an already >1 value, + // then the request will fail with 400. + pub new_stripe_size: Option, } #[derive(Serialize, Deserialize)] diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index eafad9ab73..bb8b1bb7e5 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1151,7 +1151,12 @@ async fn tenant_shard_split_handler( let new_shards = state .tenant_manager - .shard_split(tenant_shard_id, ShardCount::new(req.new_shard_count), &ctx) + .shard_split( + tenant_shard_id, + ShardCount::new(req.new_shard_count), + req.new_stripe_size, + &ctx, + ) .await .map_err(ApiError::InternalServerError)?; @@ -2247,7 +2252,7 @@ pub fn make_router( .get("/v1/location_config", |r| { api_handler(r, list_location_config_handler) }) - .get("/v1/location_config/:tenant_id", |r| { + .get("/v1/location_config/:tenant_shard_id", |r| { api_handler(r, get_location_config_handler) }) .put( diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 38274448b3..26fcce1f38 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -6,7 +6,9 @@ use futures::stream::StreamExt; use itertools::Itertools; use pageserver_api::key::Key; use pageserver_api::models::ShardParameters; -use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId}; +use pageserver_api::shard::{ + ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId, +}; use rand::{distributions::Alphanumeric, Rng}; use std::borrow::Cow; use std::cmp::Ordering; @@ -1439,11 +1441,12 @@ impl TenantManager { &self, tenant_shard_id: TenantShardId, new_shard_count: ShardCount, + new_stripe_size: Option, ctx: &RequestContext, ) -> anyhow::Result> { let tenant = get_tenant(tenant_shard_id, true)?; - // Plan: identify what the new child shards will be + // Validate the incoming request if new_shard_count.count() <= tenant_shard_id.shard_count.count() { anyhow::bail!("Requested shard count is not an increase"); } @@ -1452,10 +1455,18 @@ impl TenantManager { anyhow::bail!("Requested split is not a power of two"); } - let parent_shard_identity = tenant.shard_identity; - let parent_tenant_conf = tenant.get_tenant_conf(); - let parent_generation = tenant.generation; + if let Some(new_stripe_size) = new_stripe_size { + if tenant.get_shard_stripe_size() != new_stripe_size + && tenant_shard_id.shard_count.count() > 1 + { + // This tenant already has multiple shards, it is illegal to try and change its stripe size + anyhow::bail!( + "Shard stripe size may not be modified once tenant has multiple shards" + ); + } + } + // Plan: identify what the new child shards will be let child_shards = tenant_shard_id.split(new_shard_count); tracing::info!( "Shard {} splits into: {}", @@ -1466,6 +1477,10 @@ impl TenantManager { .join(",") ); + let parent_shard_identity = tenant.shard_identity; + let parent_tenant_conf = tenant.get_tenant_conf(); + let parent_generation = tenant.generation; + // Phase 1: Write out child shards' remote index files, in the parent tenant's current generation if let Err(e) = tenant.split_prepare(&child_shards).await { // If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might @@ -1515,6 +1530,9 @@ impl TenantManager { // Phase 3: Spawn the child shards for child_shard in &child_shards { let mut child_shard_identity = parent_shard_identity; + if let Some(new_stripe_size) = new_stripe_size { + child_shard_identity.stripe_size = new_stripe_size; + } child_shard_identity.count = child_shard.shard_count; child_shard_identity.number = child_shard.shard_number; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 975c6d865b..b3f460c7fe 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2142,11 +2142,13 @@ class NeonStorageController(MetricsGetter): shards: list[dict[str, Any]] = body["shards"] return shards - def tenant_shard_split(self, tenant_id: TenantId, shard_count: int) -> list[TenantShardId]: + def tenant_shard_split( + self, tenant_id: TenantId, shard_count: int, shard_stripe_size: Optional[int] = None + ) -> list[TenantShardId]: response = self.request( "PUT", f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/shard_split", - json={"new_shard_count": shard_count}, + json={"new_shard_count": shard_count, "new_stripe_size": shard_stripe_size}, headers=self.headers(TokenScope.ADMIN), ) body = response.json() diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index b8e20c451f..6e082374d7 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -318,6 +318,13 @@ class PageserverHttpClient(requests.Session, MetricsGetter): assert isinstance(res_json["tenant_shards"], list) return res_json + def tenant_get_location(self, tenant_id: TenantShardId): + res = self.get( + f"http://localhost:{self.port}/v1/location_config/{tenant_id}", + ) + self.verbose_error(res) + return res.json() + def tenant_delete(self, tenant_id: Union[TenantId, TenantShardId]): res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}") self.verbose_error(res) diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 1b96cd6a80..9309af066b 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -1,4 +1,5 @@ import os +from typing import Dict, List, Union import pytest from fixtures.log_helper import log @@ -8,7 +9,11 @@ from fixtures.neon_fixtures import ( ) from fixtures.remote_storage import s3_storage from fixtures.types import Lsn, TenantShardId, TimelineId +from fixtures.utils import wait_until from fixtures.workload import Workload +from pytest_httpserver import HTTPServer +from werkzeug.wrappers.request import Request +from werkzeug.wrappers.response import Response def test_sharding_smoke( @@ -310,6 +315,96 @@ def test_sharding_split_smoke( workload.validate() +@pytest.mark.parametrize("initial_stripe_size", [None, 65536]) +def test_sharding_split_stripe_size( + neon_env_builder: NeonEnvBuilder, + httpserver: HTTPServer, + httpserver_listen_address, + initial_stripe_size: int, +): + """ + Check that modifying stripe size inline with a shard split works as expected + """ + (host, port) = httpserver_listen_address + neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify" + neon_env_builder.num_pageservers = 1 + + # Set up fake HTTP notify endpoint: we will use this to validate that we receive + # the correct stripe size after split. + notifications = [] + + def handler(request: Request): + log.info(f"Notify request: {request}") + notifications.append(request.json) + return Response(status=200) + + httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler) + + env = neon_env_builder.init_start( + initial_tenant_shard_count=1, initial_tenant_shard_stripe_size=initial_stripe_size + ) + tenant_id = env.initial_tenant + + assert len(notifications) == 1 + expect: Dict[str, Union[List[Dict[str, int]], str, None, int]] = { + "tenant_id": str(env.initial_tenant), + "stripe_size": None, + "shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}], + } + assert notifications[0] == expect + + new_stripe_size = 2048 + env.storage_controller.tenant_shard_split( + tenant_id, shard_count=2, shard_stripe_size=new_stripe_size + ) + + # Check that we ended up with the stripe size that we expected, both on the pageserver + # and in the notifications to compute + assert len(notifications) == 2 + expect_after: Dict[str, Union[List[Dict[str, int]], str, None, int]] = { + "tenant_id": str(env.initial_tenant), + "stripe_size": new_stripe_size, + "shards": [ + {"node_id": int(env.pageservers[0].id), "shard_number": 0}, + {"node_id": int(env.pageservers[0].id), "shard_number": 1}, + ], + } + log.info(f"Got notification: {notifications[1]}") + assert notifications[1] == expect_after + + # Inspect the stripe size on the pageserver + shard_0_loc = ( + env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 0, 2)) + ) + assert shard_0_loc["shard_stripe_size"] == new_stripe_size + shard_1_loc = ( + env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 1, 2)) + ) + assert shard_1_loc["shard_stripe_size"] == new_stripe_size + + # Ensure stripe size survives a pageserver restart + env.pageservers[0].stop() + env.pageservers[0].start() + shard_0_loc = ( + env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 0, 2)) + ) + assert shard_0_loc["shard_stripe_size"] == new_stripe_size + shard_1_loc = ( + env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 1, 2)) + ) + assert shard_1_loc["shard_stripe_size"] == new_stripe_size + + # Ensure stripe size survives a storage controller restart + env.storage_controller.stop() + env.storage_controller.start() + + def assert_restart_notification(): + assert len(notifications) == 3 + assert notifications[2] == expect_after + + wait_until(10, 1, assert_restart_notification) + + @pytest.mark.skipif( # The quantity of data isn't huge, but debug can be _very_ slow, and the things we're # validating in this test don't benefit much from debug assertions.