mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 04:20:39 +00:00
pageserver: enable setting stripe size inline with split request. (#7093)
## Summary - Currently we can set stripe size at tenant creation, but it doesn't mean anything until we have multiple shards - When onboarding an existing tenant, it will always get a default shard stripe size, so we would like to be able to pick the actual stripe size at the point we split. ## Why do this inline with a split? The alternative to this change would be to have a separate endpoint on the storage controller for setting the stripe size on a tenant, and only permit writes to that endpoint when the tenant has only a single shard. That would work, but be a little bit more work for a client, and not appreciably simpler (instead of having a special argument to the split functions, we'd have a special separate endpoint, and a requirement that the controller must sync its config down to the pageserver before calling the split API). Either approach would work, but this one feels a bit more robust end-to-end: the split API is the _very last moment_ that the stripe size is mutable, so if we aim to set it before splitting, it makes sense to do it as part of the same operation.
This commit is contained in:
@@ -2222,7 +2222,18 @@ impl Service {
|
||||
|
||||
// unwrap safety: we would have returned above if we didn't find at least one shard to split
|
||||
let old_shard_count = old_shard_count.unwrap();
|
||||
let shard_ident = shard_ident.unwrap();
|
||||
let shard_ident = if let Some(new_stripe_size) = split_req.new_stripe_size {
|
||||
// This ShardIdentity will be used as the template for all children, so this implicitly
|
||||
// applies the new stripe size to the children.
|
||||
let mut shard_ident = shard_ident.unwrap();
|
||||
if shard_ident.count.count() > 1 && shard_ident.stripe_size != new_stripe_size {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!("Attempted to change stripe size ({:?}->{new_stripe_size:?}) on a tenant with multiple shards", shard_ident.stripe_size)));
|
||||
}
|
||||
shard_ident.stripe_size = new_stripe_size;
|
||||
shard_ident
|
||||
} else {
|
||||
shard_ident.unwrap()
|
||||
};
|
||||
let policy = policy.unwrap();
|
||||
|
||||
// FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
|
||||
@@ -2314,6 +2325,7 @@ impl Service {
|
||||
*parent_id,
|
||||
TenantShardSplitRequest {
|
||||
new_shard_count: split_req.new_shard_count,
|
||||
new_stripe_size: split_req.new_stripe_size,
|
||||
},
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -585,10 +585,14 @@ async fn handle_tenant(
|
||||
Some(("shard-split", matches)) => {
|
||||
let tenant_id = get_tenant_id(matches, env)?;
|
||||
let shard_count: u8 = matches.get_one::<u8>("shard-count").cloned().unwrap_or(0);
|
||||
let shard_stripe_size: Option<ShardStripeSize> = matches
|
||||
.get_one::<Option<ShardStripeSize>>("shard-stripe-size")
|
||||
.cloned()
|
||||
.unwrap();
|
||||
|
||||
let storage_controller = StorageController::from_env(env);
|
||||
let result = storage_controller
|
||||
.tenant_split(tenant_id, shard_count)
|
||||
.tenant_split(tenant_id, shard_count, shard_stripe_size)
|
||||
.await?;
|
||||
println!(
|
||||
"Split tenant {} into shards {}",
|
||||
@@ -1585,6 +1589,7 @@ fn cli() -> Command {
|
||||
.about("Increase the number of shards in the tenant")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)"))
|
||||
.arg(Arg::new("shard-stripe-size").value_parser(value_parser!(u32)).long("shard-stripe-size").action(ArgAction::Set).help("Sharding stripe size in pages"))
|
||||
)
|
||||
)
|
||||
.subcommand(
|
||||
|
||||
@@ -10,7 +10,7 @@ use pageserver_api::{
|
||||
TenantCreateRequest, TenantShardSplitRequest, TenantShardSplitResponse,
|
||||
TimelineCreateRequest, TimelineInfo,
|
||||
},
|
||||
shard::TenantShardId,
|
||||
shard::{ShardStripeSize, TenantShardId},
|
||||
};
|
||||
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
|
||||
use postgres_backend::AuthType;
|
||||
@@ -496,11 +496,15 @@ impl StorageController {
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
new_shard_count: u8,
|
||||
new_stripe_size: Option<ShardStripeSize>,
|
||||
) -> anyhow::Result<TenantShardSplitResponse> {
|
||||
self.dispatch(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{tenant_id}/shard_split"),
|
||||
Some(TenantShardSplitRequest { new_shard_count }),
|
||||
Some(TenantShardSplitRequest {
|
||||
new_shard_count,
|
||||
new_stripe_size,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -198,6 +198,13 @@ pub struct TimelineCreateRequest {
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantShardSplitRequest {
|
||||
pub new_shard_count: u8,
|
||||
|
||||
// A tenant's stripe size is only meaningful the first time their shard count goes
|
||||
// above 1: therefore during a split from 1->N shards, we may modify the stripe size.
|
||||
//
|
||||
// If this is set while the stripe count is being increased from an already >1 value,
|
||||
// then the request will fail with 400.
|
||||
pub new_stripe_size: Option<ShardStripeSize>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
||||
@@ -1151,7 +1151,12 @@ async fn tenant_shard_split_handler(
|
||||
|
||||
let new_shards = state
|
||||
.tenant_manager
|
||||
.shard_split(tenant_shard_id, ShardCount::new(req.new_shard_count), &ctx)
|
||||
.shard_split(
|
||||
tenant_shard_id,
|
||||
ShardCount::new(req.new_shard_count),
|
||||
req.new_stripe_size,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
@@ -2247,7 +2252,7 @@ pub fn make_router(
|
||||
.get("/v1/location_config", |r| {
|
||||
api_handler(r, list_location_config_handler)
|
||||
})
|
||||
.get("/v1/location_config/:tenant_id", |r| {
|
||||
.get("/v1/location_config/:tenant_shard_id", |r| {
|
||||
api_handler(r, get_location_config_handler)
|
||||
})
|
||||
.put(
|
||||
|
||||
@@ -6,7 +6,9 @@ use futures::stream::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
|
||||
use pageserver_api::shard::{
|
||||
ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
|
||||
};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use std::borrow::Cow;
|
||||
use std::cmp::Ordering;
|
||||
@@ -1439,11 +1441,12 @@ impl TenantManager {
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
new_shard_count: ShardCount,
|
||||
new_stripe_size: Option<ShardStripeSize>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<TenantShardId>> {
|
||||
let tenant = get_tenant(tenant_shard_id, true)?;
|
||||
|
||||
// Plan: identify what the new child shards will be
|
||||
// Validate the incoming request
|
||||
if new_shard_count.count() <= tenant_shard_id.shard_count.count() {
|
||||
anyhow::bail!("Requested shard count is not an increase");
|
||||
}
|
||||
@@ -1452,10 +1455,18 @@ impl TenantManager {
|
||||
anyhow::bail!("Requested split is not a power of two");
|
||||
}
|
||||
|
||||
let parent_shard_identity = tenant.shard_identity;
|
||||
let parent_tenant_conf = tenant.get_tenant_conf();
|
||||
let parent_generation = tenant.generation;
|
||||
if let Some(new_stripe_size) = new_stripe_size {
|
||||
if tenant.get_shard_stripe_size() != new_stripe_size
|
||||
&& tenant_shard_id.shard_count.count() > 1
|
||||
{
|
||||
// This tenant already has multiple shards, it is illegal to try and change its stripe size
|
||||
anyhow::bail!(
|
||||
"Shard stripe size may not be modified once tenant has multiple shards"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Plan: identify what the new child shards will be
|
||||
let child_shards = tenant_shard_id.split(new_shard_count);
|
||||
tracing::info!(
|
||||
"Shard {} splits into: {}",
|
||||
@@ -1466,6 +1477,10 @@ impl TenantManager {
|
||||
.join(",")
|
||||
);
|
||||
|
||||
let parent_shard_identity = tenant.shard_identity;
|
||||
let parent_tenant_conf = tenant.get_tenant_conf();
|
||||
let parent_generation = tenant.generation;
|
||||
|
||||
// Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
|
||||
if let Err(e) = tenant.split_prepare(&child_shards).await {
|
||||
// If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
|
||||
@@ -1515,6 +1530,9 @@ impl TenantManager {
|
||||
// Phase 3: Spawn the child shards
|
||||
for child_shard in &child_shards {
|
||||
let mut child_shard_identity = parent_shard_identity;
|
||||
if let Some(new_stripe_size) = new_stripe_size {
|
||||
child_shard_identity.stripe_size = new_stripe_size;
|
||||
}
|
||||
child_shard_identity.count = child_shard.shard_count;
|
||||
child_shard_identity.number = child_shard.shard_number;
|
||||
|
||||
|
||||
@@ -2142,11 +2142,13 @@ class NeonStorageController(MetricsGetter):
|
||||
shards: list[dict[str, Any]] = body["shards"]
|
||||
return shards
|
||||
|
||||
def tenant_shard_split(self, tenant_id: TenantId, shard_count: int) -> list[TenantShardId]:
|
||||
def tenant_shard_split(
|
||||
self, tenant_id: TenantId, shard_count: int, shard_stripe_size: Optional[int] = None
|
||||
) -> list[TenantShardId]:
|
||||
response = self.request(
|
||||
"PUT",
|
||||
f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/shard_split",
|
||||
json={"new_shard_count": shard_count},
|
||||
json={"new_shard_count": shard_count, "new_stripe_size": shard_stripe_size},
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
body = response.json()
|
||||
|
||||
@@ -318,6 +318,13 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
assert isinstance(res_json["tenant_shards"], list)
|
||||
return res_json
|
||||
|
||||
def tenant_get_location(self, tenant_id: TenantShardId):
|
||||
res = self.get(
|
||||
f"http://localhost:{self.port}/v1/location_config/{tenant_id}",
|
||||
)
|
||||
self.verbose_error(res)
|
||||
return res.json()
|
||||
|
||||
def tenant_delete(self, tenant_id: Union[TenantId, TenantShardId]):
|
||||
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
|
||||
self.verbose_error(res)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
from typing import Dict, List, Union
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -8,7 +9,11 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.remote_storage import s3_storage
|
||||
from fixtures.types import Lsn, TenantShardId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
from fixtures.workload import Workload
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
|
||||
def test_sharding_smoke(
|
||||
@@ -310,6 +315,96 @@ def test_sharding_split_smoke(
|
||||
workload.validate()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("initial_stripe_size", [None, 65536])
|
||||
def test_sharding_split_stripe_size(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver: HTTPServer,
|
||||
httpserver_listen_address,
|
||||
initial_stripe_size: int,
|
||||
):
|
||||
"""
|
||||
Check that modifying stripe size inline with a shard split works as expected
|
||||
"""
|
||||
(host, port) = httpserver_listen_address
|
||||
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
|
||||
neon_env_builder.num_pageservers = 1
|
||||
|
||||
# Set up fake HTTP notify endpoint: we will use this to validate that we receive
|
||||
# the correct stripe size after split.
|
||||
notifications = []
|
||||
|
||||
def handler(request: Request):
|
||||
log.info(f"Notify request: {request}")
|
||||
notifications.append(request.json)
|
||||
return Response(status=200)
|
||||
|
||||
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
|
||||
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_shard_count=1, initial_tenant_shard_stripe_size=initial_stripe_size
|
||||
)
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
assert len(notifications) == 1
|
||||
expect: Dict[str, Union[List[Dict[str, int]], str, None, int]] = {
|
||||
"tenant_id": str(env.initial_tenant),
|
||||
"stripe_size": None,
|
||||
"shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}],
|
||||
}
|
||||
assert notifications[0] == expect
|
||||
|
||||
new_stripe_size = 2048
|
||||
env.storage_controller.tenant_shard_split(
|
||||
tenant_id, shard_count=2, shard_stripe_size=new_stripe_size
|
||||
)
|
||||
|
||||
# Check that we ended up with the stripe size that we expected, both on the pageserver
|
||||
# and in the notifications to compute
|
||||
assert len(notifications) == 2
|
||||
expect_after: Dict[str, Union[List[Dict[str, int]], str, None, int]] = {
|
||||
"tenant_id": str(env.initial_tenant),
|
||||
"stripe_size": new_stripe_size,
|
||||
"shards": [
|
||||
{"node_id": int(env.pageservers[0].id), "shard_number": 0},
|
||||
{"node_id": int(env.pageservers[0].id), "shard_number": 1},
|
||||
],
|
||||
}
|
||||
log.info(f"Got notification: {notifications[1]}")
|
||||
assert notifications[1] == expect_after
|
||||
|
||||
# Inspect the stripe size on the pageserver
|
||||
shard_0_loc = (
|
||||
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 0, 2))
|
||||
)
|
||||
assert shard_0_loc["shard_stripe_size"] == new_stripe_size
|
||||
shard_1_loc = (
|
||||
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 1, 2))
|
||||
)
|
||||
assert shard_1_loc["shard_stripe_size"] == new_stripe_size
|
||||
|
||||
# Ensure stripe size survives a pageserver restart
|
||||
env.pageservers[0].stop()
|
||||
env.pageservers[0].start()
|
||||
shard_0_loc = (
|
||||
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 0, 2))
|
||||
)
|
||||
assert shard_0_loc["shard_stripe_size"] == new_stripe_size
|
||||
shard_1_loc = (
|
||||
env.pageservers[0].http_client().tenant_get_location(TenantShardId(tenant_id, 1, 2))
|
||||
)
|
||||
assert shard_1_loc["shard_stripe_size"] == new_stripe_size
|
||||
|
||||
# Ensure stripe size survives a storage controller restart
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
def assert_restart_notification():
|
||||
assert len(notifications) == 3
|
||||
assert notifications[2] == expect_after
|
||||
|
||||
wait_until(10, 1, assert_restart_notification)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
# The quantity of data isn't huge, but debug can be _very_ slow, and the things we're
|
||||
# validating in this test don't benefit much from debug assertions.
|
||||
|
||||
Reference in New Issue
Block a user