mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 22:50:38 +00:00
## Problem To test sharding, we need something to control it. We could write python code for doing this from the test runner, but this wouldn't be usable with neon_local run directly, and when we want to write tests with large number of shards/tenants, Rust is a better fit efficiently handling all the required state. This service enables automated tests to easily get a system with sharding/HA without the test itself having to set this all up by hand: existing tests can be run against sharded tenants just by setting a shard count when creating the tenant. ## Summary of changes Attachment service was previously a map of TenantId->TenantState, where the principal state stored for each tenant was the generation and the last attached pageserver. This enabled it to serve the re-attach and validate requests that the pageserver requires. In this PR, the scope of the service is extended substantially to do overall management of tenants in the pageserver, including tenant/timeline creation, live migration, evacuation of offline pageservers etc. This is done using synchronous code to make declarative changes to the tenant's intended state (`TenantState.policy` and `TenantState.intent`), which are then translated into calls into the pageserver by the `Reconciler`. Top level summary of modules within `control_plane/attachment_service/src`: - `tenant_state`: structure that represents one tenant shard. - `service`: implements the main high level such as tenant/timeline creation, marking a node offline, etc. - `scheduler`: for operations that need to pick a pageserver for a tenant, construct a scheduler and call into it. - `compute_hook`: receive notifications when a tenant shard is attached somewhere new. Once we have locations for all the shards in a tenant, emit an update to postgres configuration via the neon_local `LocalEnv`. - `http`: HTTP stubs. These mostly map to methods on `Service`, but are separated for readability and so that it'll be easier to adapt if/when we switch to another RPC layer. - `node`: structure that describes a pageserver node. The most important attribute of a node is its availability: marking a node offline causes tenant shards to reschedule away from it. This PR is a precursor to implementing the full sharding service for prod (#6342). What's the difference between this and a production-ready controller for pageservers? - JSON file persistence to be replaced with a database - Limited observability. - No concurrency limits. Marking a pageserver offline will try and migrate every tenant to a new pageserver concurrently, even if there are thousands. - Very simple scheduler that only knows to pick the pageserver with fewest tenants, and place secondary locations on a different pageserver than attached locations: it does not try to place shards for the same tenant on different pageservers. This matters little in tests, because picking the least-used pageserver usually results in round-robin placement. - Scheduler state is rebuilt exhaustively for each operation that requires a scheduler. - Relies on neon_local mechanisms for updating postgres: in production this would be something that flows through the real control plane. --------- Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
219 lines
7.3 KiB
Rust
219 lines
7.3 KiB
Rust
use crate::reconciler::ReconcileError;
|
|
use crate::service::Service;
|
|
use hyper::{Body, Request, Response};
|
|
use hyper::{StatusCode, Uri};
|
|
use pageserver_api::models::{TenantCreateRequest, TimelineCreateRequest};
|
|
use pageserver_api::shard::TenantShardId;
|
|
use std::sync::Arc;
|
|
use utils::auth::SwappableJwtAuth;
|
|
use utils::http::endpoint::{auth_middleware, request_span};
|
|
use utils::http::request::parse_request_param;
|
|
use utils::id::TenantId;
|
|
|
|
use utils::{
|
|
http::{
|
|
endpoint::{self},
|
|
error::ApiError,
|
|
json::{json_request, json_response},
|
|
RequestExt, RouterBuilder,
|
|
},
|
|
id::NodeId,
|
|
};
|
|
|
|
use pageserver_api::control_api::{ReAttachRequest, ValidateRequest};
|
|
|
|
use control_plane::attachment_service::{
|
|
AttachHookRequest, InspectRequest, NodeConfigureRequest, NodeRegisterRequest,
|
|
TenantShardMigrateRequest,
|
|
};
|
|
|
|
/// State available to HTTP request handlers
|
|
#[derive(Clone)]
|
|
pub struct HttpState {
|
|
service: Arc<crate::service::Service>,
|
|
auth: Option<Arc<SwappableJwtAuth>>,
|
|
allowlist_routes: Vec<Uri>,
|
|
}
|
|
|
|
impl HttpState {
|
|
pub fn new(service: Arc<crate::service::Service>, auth: Option<Arc<SwappableJwtAuth>>) -> Self {
|
|
let allowlist_routes = ["/status"]
|
|
.iter()
|
|
.map(|v| v.parse().unwrap())
|
|
.collect::<Vec<_>>();
|
|
Self {
|
|
service,
|
|
auth,
|
|
allowlist_routes,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[inline(always)]
|
|
fn get_state(request: &Request<Body>) -> &HttpState {
|
|
request
|
|
.data::<Arc<HttpState>>()
|
|
.expect("unknown state type")
|
|
.as_ref()
|
|
}
|
|
|
|
/// Pageserver calls into this on startup, to learn which tenants it should attach
|
|
async fn handle_re_attach(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
let reattach_req = json_request::<ReAttachRequest>(&mut req).await?;
|
|
let state = get_state(&req);
|
|
json_response(
|
|
StatusCode::OK,
|
|
state
|
|
.service
|
|
.re_attach(reattach_req)
|
|
.await
|
|
.map_err(ApiError::InternalServerError)?,
|
|
)
|
|
}
|
|
|
|
/// Pageserver calls into this before doing deletions, to confirm that it still
|
|
/// holds the latest generation for the tenants with deletions enqueued
|
|
async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
let validate_req = json_request::<ValidateRequest>(&mut req).await?;
|
|
let state = get_state(&req);
|
|
json_response(StatusCode::OK, state.service.validate(validate_req))
|
|
}
|
|
|
|
/// Call into this before attaching a tenant to a pageserver, to acquire a generation number
|
|
/// (in the real control plane this is unnecessary, because the same program is managing
|
|
/// generation numbers and doing attachments).
|
|
async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
let attach_req = json_request::<AttachHookRequest>(&mut req).await?;
|
|
let state = get_state(&req);
|
|
|
|
json_response(
|
|
StatusCode::OK,
|
|
state
|
|
.service
|
|
.attach_hook(attach_req)
|
|
.await
|
|
.map_err(ApiError::InternalServerError)?,
|
|
)
|
|
}
|
|
|
|
async fn handle_inspect(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
let inspect_req = json_request::<InspectRequest>(&mut req).await?;
|
|
|
|
let state = get_state(&req);
|
|
|
|
json_response(StatusCode::OK, state.service.inspect(inspect_req))
|
|
}
|
|
|
|
async fn handle_tenant_create(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
|
|
let state = get_state(&req);
|
|
json_response(
|
|
StatusCode::OK,
|
|
state.service.tenant_create(create_req).await?,
|
|
)
|
|
}
|
|
|
|
async fn handle_tenant_timeline_create(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
|
let create_req = json_request::<TimelineCreateRequest>(&mut req).await?;
|
|
|
|
let state = get_state(&req);
|
|
json_response(
|
|
StatusCode::OK,
|
|
state
|
|
.service
|
|
.tenant_timeline_create(tenant_id, create_req)
|
|
.await?,
|
|
)
|
|
}
|
|
|
|
async fn handle_tenant_locate(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
|
let state = get_state(&req);
|
|
|
|
json_response(StatusCode::OK, state.service.tenant_locate(tenant_id)?)
|
|
}
|
|
|
|
async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
let register_req = json_request::<NodeRegisterRequest>(&mut req).await?;
|
|
let state = get_state(&req);
|
|
state.service.node_register(register_req).await?;
|
|
json_response(StatusCode::OK, ())
|
|
}
|
|
|
|
async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
|
let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
|
|
if node_id != config_req.node_id {
|
|
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
|
"Path and body node_id differ"
|
|
)));
|
|
}
|
|
let state = get_state(&req);
|
|
|
|
json_response(StatusCode::OK, state.service.node_configure(config_req)?)
|
|
}
|
|
|
|
async fn handle_tenant_shard_migrate(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
|
|
let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
|
|
let state = get_state(&req);
|
|
json_response(
|
|
StatusCode::OK,
|
|
state
|
|
.service
|
|
.tenant_shard_migrate(tenant_shard_id, migrate_req)
|
|
.await?,
|
|
)
|
|
}
|
|
|
|
/// Status endpoint is just used for checking that our HTTP listener is up
|
|
async fn handle_status(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
|
json_response(StatusCode::OK, ())
|
|
}
|
|
|
|
impl From<ReconcileError> for ApiError {
|
|
fn from(value: ReconcileError) -> Self {
|
|
ApiError::Conflict(format!("Reconciliation error: {}", value))
|
|
}
|
|
}
|
|
|
|
pub fn make_router(
|
|
service: Arc<Service>,
|
|
auth: Option<Arc<SwappableJwtAuth>>,
|
|
) -> RouterBuilder<hyper::Body, ApiError> {
|
|
let mut router = endpoint::make_router();
|
|
if auth.is_some() {
|
|
router = router.middleware(auth_middleware(|request| {
|
|
let state = get_state(request);
|
|
if state.allowlist_routes.contains(request.uri()) {
|
|
None
|
|
} else {
|
|
state.auth.as_deref()
|
|
}
|
|
}))
|
|
}
|
|
|
|
router
|
|
.data(Arc::new(HttpState::new(service, auth)))
|
|
.get("/status", |r| request_span(r, handle_status))
|
|
.post("/re-attach", |r| request_span(r, handle_re_attach))
|
|
.post("/validate", |r| request_span(r, handle_validate))
|
|
.post("/attach-hook", |r| request_span(r, handle_attach_hook))
|
|
.post("/inspect", |r| request_span(r, handle_inspect))
|
|
.post("/node", |r| request_span(r, handle_node_register))
|
|
.put("/node/:node_id/config", |r| {
|
|
request_span(r, handle_node_configure)
|
|
})
|
|
.post("/tenant", |r| request_span(r, handle_tenant_create))
|
|
.post("/tenant/:tenant_id/timeline", |r| {
|
|
request_span(r, handle_tenant_timeline_create)
|
|
})
|
|
.get("/tenant/:tenant_id/locate", |r| {
|
|
request_span(r, handle_tenant_locate)
|
|
})
|
|
.put("/tenant/:tenant_shard_id/migrate", |r| {
|
|
request_span(r, handle_tenant_shard_migrate)
|
|
})
|
|
}
|