storage controller: pagination for tenant listing API (#10365)

## Problem

For large deployments, the `control/v1/tenant` listing API can time out
transmitting a monolithic serialized response.

## Summary of changes

- Add `limit` and `start_after` parameters to listing API
- Update storcon_cli to use these parameters and limit requests to 1000
items at a time
This commit is contained in:
John Spray
2025-01-14 21:37:32 +00:00
committed by GitHub
parent 6debb49b87
commit 47c1640acc
5 changed files with 117 additions and 34 deletions

View File

@@ -477,16 +477,7 @@ async fn main() -> anyhow::Result<()> {
println!("{table}");
}
Command::Tenants { node_id: None } => {
let mut resp = storcon_client
.dispatch::<(), Vec<TenantDescribeResponse>>(
Method::GET,
"control/v1/tenant".to_string(),
None,
)
.await?;
resp.sort_by(|a, b| a.tenant_id.cmp(&b.tenant_id));
// Set up output formatting
let mut table = comfy_table::Table::new();
table.set_header([
"TenantId",
@@ -496,20 +487,55 @@ async fn main() -> anyhow::Result<()> {
"Placement",
"Scheduling",
]);
for tenant in resp {
let shard_zero = tenant.shards.into_iter().next().unwrap();
table.add_row([
format!("{}", tenant.tenant_id),
shard_zero
.preferred_az_id
.as_ref()
.cloned()
.unwrap_or("".to_string()),
format!("{}", shard_zero.tenant_shard_id.shard_count.literal()),
format!("{:?}", tenant.stripe_size),
format!("{:?}", tenant.policy),
format!("{:?}", shard_zero.scheduling_policy),
]);
// Pagination loop over listing API
let mut start_after = None;
const LIMIT: usize = 1000;
loop {
let path = match start_after {
None => format!("control/v1/tenant?limit={LIMIT}"),
Some(start_after) => {
format!("control/v1/tenant?limit={LIMIT}&start_after={start_after}")
}
};
let resp = storcon_client
.dispatch::<(), Vec<TenantDescribeResponse>>(Method::GET, path, None)
.await?;
if resp.is_empty() {
// End of data reached
break;
}
// Give some visual feedback while we're building up the table (comfy_table doesn't have
// streaming output)
if resp.len() >= LIMIT {
eprint!(".");
}
start_after = Some(resp.last().unwrap().tenant_id);
for tenant in resp {
let shard_zero = tenant.shards.into_iter().next().unwrap();
table.add_row([
format!("{}", tenant.tenant_id),
shard_zero
.preferred_az_id
.as_ref()
.cloned()
.unwrap_or("".to_string()),
format!("{}", shard_zero.tenant_shard_id.shard_count.literal()),
format!("{:?}", tenant.stripe_size),
format!("{:?}", tenant.policy),
format!("{:?}", shard_zero.scheduling_policy),
]);
}
}
// Terminate progress dots
if table.row_count() > LIMIT {
eprint!("");
}
println!("{table}");

View File

@@ -653,6 +653,10 @@ async fn handle_tenant_list(
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let limit: Option<usize> = parse_query_param(&req, "limit")?;
let start_after: Option<TenantId> = parse_query_param(&req, "start_after")?;
tracing::info!("start_after: {:?}", start_after);
match maybe_forward(req).await {
ForwardOutcome::Forwarded(res) => {
return res;
@@ -660,7 +664,7 @@ async fn handle_tenant_list(
ForwardOutcome::NotForwarded(_req) => {}
};
json_response(StatusCode::OK, service.tenant_list())
json_response(StatusCode::OK, service.tenant_list(limit, start_after))
}
async fn handle_node_register(req: Request<Body>) -> Result<Response<Body>, ApiError> {

View File

@@ -4158,17 +4158,42 @@ impl Service {
.ok_or_else(|| ApiError::NotFound(anyhow::anyhow!("Tenant {tenant_id} not found").into()))
}
pub(crate) fn tenant_list(&self) -> Vec<TenantDescribeResponse> {
/// limit & offset are pagination parameters. Since we are walking an in-memory HashMap, `offset` does not
/// avoid traversing data, it just avoid returning it. This is suitable for our purposes, since our in memory
/// maps are small enough to traverse fast, our pagination is just to avoid serializing huge JSON responses
/// in our external API.
pub(crate) fn tenant_list(
&self,
limit: Option<usize>,
start_after: Option<TenantId>,
) -> Vec<TenantDescribeResponse> {
let locked = self.inner.read().unwrap();
// Apply start_from parameter
let shard_range = match start_after {
None => locked.tenants.range(..),
Some(tenant_id) => locked.tenants.range(
TenantShardId {
tenant_id,
shard_number: ShardNumber(u8::MAX),
shard_count: ShardCount(u8::MAX),
}..,
),
};
let mut result = Vec::new();
for (_tenant_id, tenant_shards) in
&locked.tenants.iter().group_by(|(id, _shard)| id.tenant_id)
{
for (_tenant_id, tenant_shards) in &shard_range.group_by(|(id, _shard)| id.tenant_id) {
result.push(
self.tenant_describe_impl(tenant_shards.map(|(_k, v)| v))
.expect("Groups are always non-empty"),
);
// Enforce `limit` parameter
if let Some(limit) = limit {
if result.len() >= limit {
break;
}
}
}
result

View File

@@ -1884,7 +1884,10 @@ class NeonStorageController(MetricsGetter, LogUtils):
)
return response.json()
def tenant_list(self):
def tenant_shard_dump(self):
"""
Debug listing API: dumps the internal map of tenant shards
"""
response = self.request(
"GET",
f"{self.api}/debug/v1/tenant",
@@ -1892,6 +1895,18 @@ class NeonStorageController(MetricsGetter, LogUtils):
)
return response.json()
def tenant_list(self, **kwargs):
"""
Control API tenant listing: a vector of the same content returned by tenant_describe
"""
response = self.request(
"GET",
f"{self.api}/control/v1/tenant",
headers=self.headers(TokenScope.ADMIN),
params=kwargs,
)
return response.json()
def node_configure(self, node_id, body: dict[str, Any]):
log.info(f"node_configure({node_id}, {body})")
body["node_id"] = node_id
@@ -2238,7 +2253,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
"""
Get the intent and observed placements of all tenants known to the storage controller.
"""
tenants = self.tenant_list()
tenants = self.tenant_shard_dump()
tenant_placement: defaultdict[str, dict[str, Any]] = defaultdict(
lambda: {

View File

@@ -113,6 +113,19 @@ def test_storage_controller_smoke(neon_env_builder: NeonEnvBuilder, combination)
for tid in tenant_ids:
env.create_tenant(tid, shard_count=shards_per_tenant)
# Tenant listing API should work
listed_tenants = env.storage_controller.tenant_list()
log.info(f"listed_tenants: {listed_tenants}")
assert set(t["tenant_id"] for t in listed_tenants) == set(str(t) for t in tenant_ids)
paged = env.storage_controller.tenant_list(limit=2, start_after=listed_tenants[0]["tenant_id"])
assert len(paged) == 2
assert paged[0] == listed_tenants[1]
assert paged[1] == listed_tenants[2]
paged = env.storage_controller.tenant_list(
limit=1000, start_after="ffffffffffffffffffffffffffffffff"
)
assert paged == []
# Validate high level metrics
assert (
env.storage_controller.get_metric_value("storage_controller_tenant_shards")
@@ -1506,7 +1519,7 @@ class PageserverFailpoint(Failure):
def build_node_to_tenants_map(env: NeonEnv) -> dict[int, list[TenantId]]:
tenants = env.storage_controller.tenant_list()
tenants = env.storage_controller.tenant_shard_dump()
node_to_tenants: dict[int, list[TenantId]] = {}
for t in tenants:
@@ -2631,7 +2644,7 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
# Validate that the storcon attempts to forward the request, but stops.
# when it realises it is still the current leader.
with pytest.raises(StorageControllerApiException, match="Leader is stepped down instance"):
env.storage_controller.tenant_list()
env.storage_controller.tenant_shard_dump()
# Validate that we can step down multiple times and the observed state
# doesn't change.
@@ -2781,7 +2794,7 @@ def test_storage_controller_leadership_transfer(
# Check that the stepped down instance forwards requests
# to the new leader while it's still running.
storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}")
env.storage_controller.tenant_list()
env.storage_controller.tenant_shard_dump()
env.storage_controller.node_configure(env.pageservers[0].id, {"scheduling": "Pause"})
status = env.storage_controller.node_status(env.pageservers[0].id)
assert status["scheduling"] == "Pause"