mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
control_plane/attachment_service: complete APIs (#6394)
Depends on: https://github.com/neondatabase/neon/pull/6468 ## Problem The sharding service will be used as a "virtual pageserver" by the control plane -- so it needs the set of pageserver APIs that the control plane uses, and to present them under identical URLs, including prefix (/v1). ## Summary of changes - Add missing APIs: - Tenant deletion - Timeline deletion - Node list (used in test now, later in tools) - `/location_config` API (for migrating tenants into the sharding service) - Rework attachment service URLs: - `/v1` prefix is used for pageserver-compatible APIs - `/upcall/v1` prefix is used for APIs that are called by the pageserver (re-attach and validate) - `/debug/v1` prefix is used for endpoints that are for testing - `/control/v1` prefix is used for new sharding service APIs that do not mimic a pageserver API, such as registering and configuring nodes. - Add test_sharding_service. The sharding service already had some collateral coverage from its use in general tests, but this is the first dedicated testing for it.
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -285,7 +285,6 @@ dependencies = [
|
||||
"metrics",
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"postgres_backend",
|
||||
"postgres_connection",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
||||
@@ -21,10 +21,6 @@ tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
# TODO: remove this after DB persistence is added, it is only used for
|
||||
# a parsing function when loading pageservers from neon_local LocalEnv
|
||||
postgres_backend.workspace = true
|
||||
|
||||
diesel = { version = "2.1.4", features = ["serde_json", "postgres"] }
|
||||
|
||||
utils = { path = "../../libs/utils/" }
|
||||
|
||||
@@ -2,13 +2,17 @@ use crate::reconciler::ReconcileError;
|
||||
use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
|
||||
use hyper::{Body, Request, Response};
|
||||
use hyper::{StatusCode, Uri};
|
||||
use pageserver_api::models::{TenantCreateRequest, TimelineCreateRequest};
|
||||
use pageserver_api::models::{
|
||||
TenantCreateRequest, TenantLocationConfigRequest, TimelineCreateRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use utils::auth::SwappableJwtAuth;
|
||||
use utils::http::endpoint::{auth_middleware, request_span};
|
||||
use utils::http::request::parse_request_param;
|
||||
use utils::id::TenantId;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use utils::{
|
||||
http::{
|
||||
@@ -112,6 +116,78 @@ async fn handle_tenant_create(
|
||||
json_response(StatusCode::OK, service.tenant_create(create_req).await?)
|
||||
}
|
||||
|
||||
// For tenant and timeline deletions, which both implement an "initially return 202, then 404 once
|
||||
// we're done" semantic, we wrap with a retry loop to expose a simpler API upstream. This avoids
|
||||
// needing to track a "deleting" state for tenants.
|
||||
async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
|
||||
where
|
||||
R: std::future::Future<Output = Result<StatusCode, ApiError>> + Send + 'static,
|
||||
F: Fn(Arc<Service>) -> R + Send + Sync + 'static,
|
||||
{
|
||||
let started_at = Instant::now();
|
||||
// To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion
|
||||
// completed.
|
||||
let mut retry_period = Duration::from_secs(1);
|
||||
// On subsequent retries, wait longer.
|
||||
let max_retry_period = Duration::from_secs(5);
|
||||
// Enable callers with a 30 second request timeout to reliably get a response
|
||||
let max_wait = Duration::from_secs(25);
|
||||
|
||||
loop {
|
||||
let status = f(service.clone()).await?;
|
||||
match status {
|
||||
StatusCode::ACCEPTED => {
|
||||
tracing::info!("Deletion accepted, waiting to try again...");
|
||||
tokio::time::sleep(retry_period).await;
|
||||
retry_period = max_retry_period;
|
||||
}
|
||||
StatusCode::NOT_FOUND => {
|
||||
tracing::info!("Deletion complete");
|
||||
return json_response(StatusCode::OK, ());
|
||||
}
|
||||
_ => {
|
||||
tracing::warn!("Unexpected status {status}");
|
||||
return json_response(status, ());
|
||||
}
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
if now + retry_period > started_at + max_wait {
|
||||
tracing::info!("Deletion timed out waiting for 404");
|
||||
// REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of
|
||||
// the pageserver's swagger definition for this endpoint, and has the same desired
|
||||
// effect of causing the control plane to retry later.
|
||||
return json_response(StatusCode::CONFLICT, ());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_tenant_location_config(
|
||||
service: Arc<Service>,
|
||||
mut req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
let config_req = json_request::<TenantLocationConfigRequest>(&mut req).await?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
service
|
||||
.tenant_location_config(tenant_id, config_req)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenant_delete(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
|
||||
deletion_wrapper(service, move |service| async move {
|
||||
service.tenant_delete(tenant_id).await
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_create(
|
||||
service: Arc<Service>,
|
||||
mut req: Request<Body>,
|
||||
@@ -126,6 +202,63 @@ async fn handle_tenant_timeline_create(
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_delete(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
||||
|
||||
deletion_wrapper(service, move |service| async move {
|
||||
service.tenant_timeline_delete(tenant_id, timeline_id).await
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn handle_tenant_timeline_passthrough(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
|
||||
let Some(path) = req.uri().path_and_query() else {
|
||||
// This should never happen, our request router only calls us if there is a path
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path")));
|
||||
};
|
||||
|
||||
tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
|
||||
|
||||
// Find the node that holds shard zero
|
||||
let (base_url, tenant_shard_id) = service.tenant_shard0_baseurl(tenant_id)?;
|
||||
|
||||
// Callers will always pass an unsharded tenant ID. Before proxying, we must
|
||||
// rewrite this to a shard-aware shard zero ID.
|
||||
let path = format!("{}", path);
|
||||
let tenant_str = tenant_id.to_string();
|
||||
let tenant_shard_str = format!("{}", tenant_shard_id);
|
||||
let path = path.replace(&tenant_str, &tenant_shard_str);
|
||||
|
||||
let client = mgmt_api::Client::new(base_url, service.get_config().jwt_token.as_deref());
|
||||
let resp = client.get_raw(path).await.map_err(|_e|
|
||||
// FIXME: give APiError a proper Unavailable variant. We return 503 here because
|
||||
// if we can't successfully send a request to the pageserver, we aren't available.
|
||||
ApiError::ShuttingDown)?;
|
||||
|
||||
// We have a reqest::Response, would like a http::Response
|
||||
let mut builder = hyper::Response::builder()
|
||||
.status(resp.status())
|
||||
.version(resp.version());
|
||||
for (k, v) in resp.headers() {
|
||||
builder = builder.header(k, v);
|
||||
}
|
||||
|
||||
let response = builder
|
||||
.body(Body::wrap_stream(resp.bytes_stream()))
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn handle_tenant_locate(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
@@ -141,6 +274,11 @@ async fn handle_node_register(mut req: Request<Body>) -> Result<Response<Body>,
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let state = get_state(&req);
|
||||
json_response(StatusCode::OK, state.service.node_list().await?)
|
||||
}
|
||||
|
||||
async fn handle_node_configure(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
let config_req = json_request::<NodeConfigureRequest>(&mut req).await?;
|
||||
@@ -226,26 +364,64 @@ pub fn make_router(
|
||||
|
||||
router
|
||||
.data(Arc::new(HttpState::new(service, auth)))
|
||||
// Non-prefixed generic endpoints (status, metrics)
|
||||
.get("/status", |r| request_span(r, handle_status))
|
||||
.post("/re-attach", |r| request_span(r, handle_re_attach))
|
||||
.post("/validate", |r| request_span(r, handle_validate))
|
||||
.post("/attach-hook", |r| request_span(r, handle_attach_hook))
|
||||
.post("/inspect", |r| request_span(r, handle_inspect))
|
||||
.post("/node", |r| request_span(r, handle_node_register))
|
||||
.put("/node/:node_id/config", |r| {
|
||||
// Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix
|
||||
.post("/upcall/v1/re-attach", |r| {
|
||||
request_span(r, handle_re_attach)
|
||||
})
|
||||
.post("/upcall/v1/validate", |r| request_span(r, handle_validate))
|
||||
// Test/dev/debug endpoints
|
||||
.post("/debug/v1/attach-hook", |r| {
|
||||
request_span(r, handle_attach_hook)
|
||||
})
|
||||
.post("/debug/v1/inspect", |r| request_span(r, handle_inspect))
|
||||
.get("/control/v1/tenant/:tenant_id/locate", |r| {
|
||||
tenant_service_handler(r, handle_tenant_locate)
|
||||
})
|
||||
// Node operations
|
||||
.post("/control/v1/node", |r| {
|
||||
request_span(r, handle_node_register)
|
||||
})
|
||||
.get("/control/v1/node", |r| request_span(r, handle_node_list))
|
||||
.put("/control/v1/node/:node_id/config", |r| {
|
||||
request_span(r, handle_node_configure)
|
||||
})
|
||||
// Tenant Shard operations
|
||||
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
|
||||
tenant_service_handler(r, handle_tenant_shard_migrate)
|
||||
})
|
||||
// Tenant operations
|
||||
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
|
||||
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
|
||||
.post("/v1/tenant", |r| {
|
||||
tenant_service_handler(r, handle_tenant_create)
|
||||
})
|
||||
.delete("/v1/tenant/:tenant_id", |r| {
|
||||
tenant_service_handler(r, handle_tenant_delete)
|
||||
})
|
||||
.put("/v1/tenant/:tenant_id/location_config", |r| {
|
||||
tenant_service_handler(r, handle_tenant_location_config)
|
||||
})
|
||||
// Tenant Shard operations (low level/maintenance)
|
||||
.put("/tenant/:tenant_shard_id/migrate", |r| {
|
||||
tenant_service_handler(r, handle_tenant_shard_migrate)
|
||||
})
|
||||
// Timeline operations
|
||||
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
||||
tenant_service_handler(r, handle_tenant_timeline_delete)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_id/timeline", |r| {
|
||||
tenant_service_handler(r, handle_tenant_timeline_create)
|
||||
})
|
||||
.get("/tenant/:tenant_id/locate", |r| {
|
||||
tenant_service_handler(r, handle_tenant_locate)
|
||||
// Tenant detail GET passthrough to shard zero
|
||||
.get("/v1/tenant/:tenant_id*", |r| {
|
||||
tenant_service_handler(r, handle_tenant_timeline_passthrough)
|
||||
})
|
||||
.put("/tenant/:tenant_shard_id/migrate", |r| {
|
||||
tenant_service_handler(r, handle_tenant_shard_migrate)
|
||||
// Timeline GET passthrough to shard zero. Note that the `*` in the URL is a wildcard: any future
|
||||
// timeline GET APIs will be implicitly included.
|
||||
.get("/v1/tenant/:tenant_id/timeline*", |r| {
|
||||
tenant_service_handler(r, handle_tenant_timeline_passthrough)
|
||||
})
|
||||
// Path aliases for tests_forward_compatibility
|
||||
// TODO: remove these in future PR
|
||||
|
||||
@@ -9,7 +9,6 @@ use diesel::prelude::*;
|
||||
use diesel::Connection;
|
||||
use pageserver_api::models::TenantConfig;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId};
|
||||
use postgres_connection::parse_host_port;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TenantId};
|
||||
@@ -129,51 +128,11 @@ impl Persistence {
|
||||
})
|
||||
.await?;
|
||||
|
||||
if nodes.is_empty() {
|
||||
return self.list_nodes_local_env().await;
|
||||
}
|
||||
|
||||
tracing::info!("list_nodes: loaded {} nodes", nodes.len());
|
||||
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
/// Shim for automated compatibility tests: load nodes from LocalEnv instead of database
|
||||
pub(crate) async fn list_nodes_local_env(&self) -> DatabaseResult<Vec<Node>> {
|
||||
// Enable test_backward_compatibility to work by populating our list of
|
||||
// nodes from LocalEnv when it is not present in persistent storage. Otherwise at
|
||||
// first startup in the compat test, we may have shards but no nodes.
|
||||
use control_plane::local_env::LocalEnv;
|
||||
let env = LocalEnv::load_config().map_err(|e| DatabaseError::Logical(format!("{e}")))?;
|
||||
tracing::info!(
|
||||
"Loading {} pageserver nodes from LocalEnv",
|
||||
env.pageservers.len()
|
||||
);
|
||||
let mut nodes = Vec::new();
|
||||
for ps_conf in env.pageservers {
|
||||
let (pg_host, pg_port) =
|
||||
parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr");
|
||||
let (http_host, http_port) = parse_host_port(&ps_conf.listen_http_addr)
|
||||
.expect("Unable to parse listen_http_addr");
|
||||
let node = Node {
|
||||
id: ps_conf.id,
|
||||
listen_pg_addr: pg_host.to_string(),
|
||||
listen_pg_port: pg_port.unwrap_or(5432),
|
||||
listen_http_addr: http_host.to_string(),
|
||||
listen_http_port: http_port.unwrap_or(80),
|
||||
availability: NodeAvailability::Active,
|
||||
scheduling: NodeSchedulingPolicy::Active,
|
||||
};
|
||||
|
||||
// Synchronize database with what we learn from LocalEnv
|
||||
self.insert_node(&node).await?;
|
||||
|
||||
nodes.push(node);
|
||||
}
|
||||
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
/// At startup, load the high level state for shards, such as their config + policy. This will
|
||||
/// be enriched at runtime with state discovered on pageservers.
|
||||
pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult<Vec<TenantShardPersistence>> {
|
||||
|
||||
@@ -21,6 +21,7 @@ use pageserver_api::{
|
||||
models,
|
||||
models::{
|
||||
LocationConfig, LocationConfigMode, ShardParameters, TenantConfig, TenantCreateRequest,
|
||||
TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation,
|
||||
TimelineCreateRequest, TimelineInfo,
|
||||
},
|
||||
shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
|
||||
@@ -30,14 +31,14 @@ use utils::{
|
||||
completion::Barrier,
|
||||
generation::Generation,
|
||||
http::error::ApiError,
|
||||
id::{NodeId, TenantId},
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
seqwait::SeqWait,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
compute_hook::ComputeHook,
|
||||
node::Node,
|
||||
persistence::{DatabaseError, Persistence, TenantShardPersistence},
|
||||
persistence::{DatabaseError, NodePersistence, Persistence, TenantShardPersistence},
|
||||
scheduler::Scheduler,
|
||||
tenant_state::{
|
||||
IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
|
||||
@@ -635,7 +636,7 @@ impl Service {
|
||||
shard_number: tenant_shard_id.shard_number.0 as i32,
|
||||
shard_count: tenant_shard_id.shard_count.0 as i32,
|
||||
shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32,
|
||||
generation: 0,
|
||||
generation: create_req.generation.map(|g| g as i32).unwrap_or(0),
|
||||
generation_pageserver: i64::MAX,
|
||||
placement_policy: serde_json::to_string(&placement_policy).unwrap(),
|
||||
config: serde_json::to_string(&create_req.config).unwrap(),
|
||||
@@ -677,6 +678,7 @@ impl Service {
|
||||
})?;
|
||||
|
||||
response_shards.push(TenantCreateResponseShard {
|
||||
shard_id: tenant_shard_id,
|
||||
node_id: entry
|
||||
.get()
|
||||
.intent
|
||||
@@ -709,6 +711,7 @@ impl Service {
|
||||
})?;
|
||||
|
||||
response_shards.push(TenantCreateResponseShard {
|
||||
shard_id: tenant_shard_id,
|
||||
node_id: state
|
||||
.intent
|
||||
.attached
|
||||
@@ -742,14 +745,257 @@ impl Service {
|
||||
(waiters, response_shards)
|
||||
};
|
||||
|
||||
let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap();
|
||||
self.await_waiters(waiters).await?;
|
||||
|
||||
Ok(TenantCreateResponse {
|
||||
shards: response_shards,
|
||||
})
|
||||
}
|
||||
|
||||
/// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded
|
||||
/// wait for reconciliation to complete before responding.
|
||||
async fn await_waiters(
|
||||
&self,
|
||||
waiters: Vec<ReconcilerWaiter>,
|
||||
) -> Result<(), ReconcileWaitError> {
|
||||
let deadline = Instant::now().checked_add(Duration::from_secs(30)).unwrap();
|
||||
for waiter in waiters {
|
||||
let timeout = deadline.duration_since(Instant::now());
|
||||
waiter.wait_timeout(timeout).await?;
|
||||
}
|
||||
Ok(TenantCreateResponse {
|
||||
shards: response_shards,
|
||||
})
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This API is used by the cloud control plane to do coarse-grained control of tenants:
|
||||
/// - Call with mode Attached* to upsert the tenant.
|
||||
/// - Call with mode Detached to switch to PolicyMode::Detached
|
||||
///
|
||||
/// In future, calling with mode Secondary may switch to a detach-lite mode in which a tenant only has
|
||||
/// secondary locations.
|
||||
pub(crate) async fn tenant_location_config(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
req: TenantLocationConfigRequest,
|
||||
) -> Result<TenantLocationConfigResponse, ApiError> {
|
||||
if req.tenant_id.shard_count.0 > 1 {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"This API is for importing single-sharded or unsharded tenants"
|
||||
)));
|
||||
}
|
||||
|
||||
let mut waiters = Vec::new();
|
||||
let mut result = TenantLocationConfigResponse { shards: Vec::new() };
|
||||
let maybe_create = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let result_tx = locked.result_tx.clone();
|
||||
let compute_hook = locked.compute_hook.clone();
|
||||
let pageservers = locked.nodes.clone();
|
||||
|
||||
let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
|
||||
|
||||
// Maybe we have existing shards
|
||||
let mut create = true;
|
||||
for (shard_id, shard) in locked
|
||||
.tenants
|
||||
.range_mut(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
// Saw an existing shard: this is not a creation
|
||||
create = false;
|
||||
|
||||
// Note that for existing tenants we do _not_ respect the generation in the request: this is likely
|
||||
// to be stale. Once a tenant is created in this service, our view of generation is authoritative, and
|
||||
// callers' generations may be ignored. This represents a one-way migration of tenants from the outer
|
||||
// cloud control plane into this service.
|
||||
|
||||
// Use location config mode as an indicator of policy: if they ask for
|
||||
// attached we go to default HA attached mode. If they ask for secondary
|
||||
// we go to secondary-only mode. If they ask for detached we detach.
|
||||
match req.config.mode {
|
||||
LocationConfigMode::Detached => {
|
||||
shard.policy = PlacementPolicy::Detached;
|
||||
}
|
||||
LocationConfigMode::Secondary => {
|
||||
// TODO: implement secondary-only mode.
|
||||
todo!();
|
||||
}
|
||||
LocationConfigMode::AttachedMulti
|
||||
| LocationConfigMode::AttachedSingle
|
||||
| LocationConfigMode::AttachedStale => {
|
||||
// TODO: persistence for changes in policy
|
||||
if pageservers.len() > 1 {
|
||||
shard.policy = PlacementPolicy::Double(1)
|
||||
} else {
|
||||
// Convenience for dev/test: if we just have one pageserver, import
|
||||
// tenants into Single mode so that scheduling will succeed.
|
||||
shard.policy = PlacementPolicy::Single
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
shard.schedule(&mut scheduler)?;
|
||||
|
||||
let maybe_waiter = shard.maybe_reconcile(
|
||||
result_tx.clone(),
|
||||
&pageservers,
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
);
|
||||
if let Some(waiter) = maybe_waiter {
|
||||
waiters.push(waiter);
|
||||
}
|
||||
|
||||
if let Some(node_id) = shard.intent.attached {
|
||||
result.shards.push(TenantShardLocation {
|
||||
shard_id: *shard_id,
|
||||
node_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if create {
|
||||
// Validate request mode
|
||||
match req.config.mode {
|
||||
LocationConfigMode::Detached | LocationConfigMode::Secondary => {
|
||||
// When using this API to onboard an existing tenant to this service, it must start in
|
||||
// an attached state, because we need the request to come with a generation
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Imported tenant must be in attached mode"
|
||||
)));
|
||||
}
|
||||
|
||||
LocationConfigMode::AttachedMulti
|
||||
| LocationConfigMode::AttachedSingle
|
||||
| LocationConfigMode::AttachedStale => {
|
||||
// Pass
|
||||
}
|
||||
}
|
||||
|
||||
// Validate request generation
|
||||
let Some(generation) = req.config.generation else {
|
||||
// We can only import attached tenants, because we need the request to come with a generation
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Generation is mandatory when importing tenant"
|
||||
)));
|
||||
};
|
||||
|
||||
// Synthesize a creation request
|
||||
Some(TenantCreateRequest {
|
||||
new_tenant_id: TenantShardId::unsharded(tenant_id),
|
||||
generation: Some(generation),
|
||||
shard_parameters: ShardParameters {
|
||||
// Must preserve the incoming shard_count do distinguish unsharded (0)
|
||||
// from single-sharded (1): this distinction appears in the S3 keys of the tenant.
|
||||
count: req.tenant_id.shard_count,
|
||||
// We only import un-sharded or single-sharded tenants, so stripe
|
||||
// size can be made up arbitrarily here.
|
||||
stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE,
|
||||
},
|
||||
config: req.config.tenant_conf,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(create_req) = maybe_create {
|
||||
let create_resp = self.tenant_create(create_req).await?;
|
||||
result.shards = create_resp
|
||||
.shards
|
||||
.into_iter()
|
||||
.map(|s| TenantShardLocation {
|
||||
node_id: s.node_id,
|
||||
shard_id: s.shard_id,
|
||||
})
|
||||
.collect();
|
||||
} else {
|
||||
// This was an update, wait for reconciliation
|
||||
self.await_waiters(waiters).await?;
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
|
||||
// TODO: refactor into helper
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
let node_id = shard.intent.attached.ok_or_else(|| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
|
||||
})?;
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
|
||||
targets.push((*tenant_shard_id, node.clone()));
|
||||
}
|
||||
targets
|
||||
};
|
||||
|
||||
// TODO: error out if the tenant is not attached anywhere.
|
||||
|
||||
// Phase 1: delete on the pageservers
|
||||
let mut any_pending = false;
|
||||
for (tenant_shard_id, node) in targets {
|
||||
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
|
||||
// TODO: this, like many other places, requires proper retry handling for 503, timeout: those should not
|
||||
// surface immediately as an error to our caller.
|
||||
let status = client.tenant_delete(tenant_shard_id).await.map_err(|e| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Error deleting shard {tenant_shard_id} on node {}: {e}",
|
||||
node.id
|
||||
))
|
||||
})?;
|
||||
tracing::info!(
|
||||
"Shard {tenant_shard_id} on node {}, delete returned {}",
|
||||
node.id,
|
||||
status
|
||||
);
|
||||
if status == StatusCode::ACCEPTED {
|
||||
any_pending = true;
|
||||
}
|
||||
}
|
||||
|
||||
if any_pending {
|
||||
// Caller should call us again later. When we eventually see 404s from
|
||||
// all the shards, we may proceed to delete our records of the tenant.
|
||||
tracing::info!(
|
||||
"Tenant {} has some shards pending deletion, returning 202",
|
||||
tenant_id
|
||||
);
|
||||
return Ok(StatusCode::ACCEPTED);
|
||||
}
|
||||
|
||||
// Fall through: deletion of the tenant on pageservers is complete, we may proceed to drop
|
||||
// our in-memory state and database state.
|
||||
|
||||
// Ordering: we delete persistent state first: if we then
|
||||
// crash, we will drop the in-memory state.
|
||||
|
||||
// Drop persistent state.
|
||||
self.persistence.delete_tenant(tenant_id).await?;
|
||||
|
||||
// Drop in-memory state
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
locked
|
||||
.tenants
|
||||
.retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id);
|
||||
tracing::info!(
|
||||
"Deleted tenant {tenant_id}, now have {} tenants",
|
||||
locked.tenants.len()
|
||||
);
|
||||
};
|
||||
|
||||
// Success is represented as 404, to imitate the existing pageserver deletion API
|
||||
Ok(StatusCode::NOT_FOUND)
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_create(
|
||||
@@ -759,25 +1005,15 @@ impl Service {
|
||||
) -> Result<TimelineInfo, ApiError> {
|
||||
let mut timeline_info = None;
|
||||
|
||||
let ensure_waiters = {
|
||||
let locked = self.inner.write().unwrap();
|
||||
tracing::info!(
|
||||
"Creating timeline {}/{}, have {} pageservers",
|
||||
tenant_id,
|
||||
create_req.new_timeline_id,
|
||||
locked.nodes.len()
|
||||
);
|
||||
tracing::info!(
|
||||
"Creating timeline {}/{}",
|
||||
tenant_id,
|
||||
create_req.new_timeline_id,
|
||||
);
|
||||
|
||||
self.ensure_attached(locked, tenant_id)
|
||||
.map_err(ApiError::InternalServerError)?
|
||||
};
|
||||
|
||||
let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap();
|
||||
for waiter in ensure_waiters {
|
||||
let timeout = deadline.duration_since(Instant::now());
|
||||
waiter.wait_timeout(timeout).await?;
|
||||
}
|
||||
self.ensure_attached_wait(tenant_id).await?;
|
||||
|
||||
// TODO: refuse to do this if shard splitting is in progress
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
@@ -848,6 +1084,111 @@ impl Service {
|
||||
Ok(timeline_info.expect("targets cannot be empty"))
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_delete(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<StatusCode, ApiError> {
|
||||
tracing::info!("Deleting timeline {}/{}", tenant_id, timeline_id,);
|
||||
|
||||
self.ensure_attached_wait(tenant_id).await?;
|
||||
|
||||
// TODO: refuse to do this if shard splitting is in progress
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
|
||||
for (tenant_shard_id, shard) in
|
||||
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
|
||||
{
|
||||
let node_id = shard.intent.attached.ok_or_else(|| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
|
||||
})?;
|
||||
let node = locked
|
||||
.nodes
|
||||
.get(&node_id)
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
|
||||
targets.push((*tenant_shard_id, node.clone()));
|
||||
}
|
||||
targets
|
||||
};
|
||||
|
||||
if targets.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
}
|
||||
|
||||
// TODO: call into shards concurrently
|
||||
let mut any_pending = false;
|
||||
for (tenant_shard_id, node) in targets {
|
||||
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
|
||||
|
||||
tracing::info!(
|
||||
"Deleting timeline on shard {}/{}, attached to node {}",
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node.id
|
||||
);
|
||||
|
||||
let status = client
|
||||
.timeline_delete(tenant_shard_id, timeline_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}",
|
||||
node.id
|
||||
))
|
||||
})?;
|
||||
|
||||
if status == StatusCode::ACCEPTED {
|
||||
any_pending = true;
|
||||
}
|
||||
}
|
||||
|
||||
if any_pending {
|
||||
Ok(StatusCode::ACCEPTED)
|
||||
} else {
|
||||
Ok(StatusCode::NOT_FOUND)
|
||||
}
|
||||
}
|
||||
|
||||
/// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
|
||||
/// function looks it up and returns the url. If the tenant isn't found, returns Err(ApiError::NotFound)
|
||||
pub(crate) fn tenant_shard0_baseurl(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<(String, TenantShardId), ApiError> {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some((tenant_shard_id, shard)) = locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(tenant_id))
|
||||
.next()
|
||||
else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant {tenant_id} not found").into(),
|
||||
));
|
||||
};
|
||||
|
||||
// TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
|
||||
// point to somewhere we haven't attached yet.
|
||||
let Some(node_id) = shard.intent.attached else {
|
||||
return Err(ApiError::Conflict(
|
||||
"Cannot call timeline API on non-attached tenant".to_string(),
|
||||
));
|
||||
};
|
||||
|
||||
let Some(node) = locked.nodes.get(&node_id) else {
|
||||
// This should never happen
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Shard refers to nonexistent node"
|
||||
)));
|
||||
};
|
||||
|
||||
Ok((node.base_url(), *tenant_shard_id))
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_locate(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
@@ -993,6 +1334,20 @@ impl Service {
|
||||
Ok(TenantShardMigrateResponse {})
|
||||
}
|
||||
|
||||
pub(crate) async fn node_list(&self) -> Result<Vec<NodePersistence>, ApiError> {
|
||||
// It is convenient to avoid taking the big lock and converting Node to a serializable
|
||||
// structure, by fetching from storage instead of reading in-memory state.
|
||||
let nodes = self
|
||||
.persistence
|
||||
.list_nodes()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|n| n.to_persistent())
|
||||
.collect();
|
||||
|
||||
Ok(nodes)
|
||||
}
|
||||
|
||||
pub(crate) async fn node_register(
|
||||
&self,
|
||||
register_req: NodeRegisterRequest,
|
||||
@@ -1166,7 +1521,7 @@ impl Service {
|
||||
/// Helper for methods that will try and call pageserver APIs for
|
||||
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
|
||||
/// is attached somewhere.
|
||||
fn ensure_attached(
|
||||
fn ensure_attached_schedule(
|
||||
&self,
|
||||
mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>,
|
||||
tenant_id: TenantId,
|
||||
@@ -1196,6 +1551,23 @@ impl Service {
|
||||
Ok(waiters)
|
||||
}
|
||||
|
||||
async fn ensure_attached_wait(&self, tenant_id: TenantId) -> Result<(), ApiError> {
|
||||
let ensure_waiters = {
|
||||
let locked = self.inner.write().unwrap();
|
||||
|
||||
self.ensure_attached_schedule(locked, tenant_id)
|
||||
.map_err(ApiError::InternalServerError)?
|
||||
};
|
||||
|
||||
let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap();
|
||||
for waiter in ensure_waiters {
|
||||
let timeout = deadline.duration_since(Instant::now());
|
||||
waiter.wait_timeout(timeout).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check all tenants for pending reconciliation work, and reconcile those in need
|
||||
///
|
||||
/// Returns how many reconciliation tasks were started
|
||||
|
||||
@@ -17,6 +17,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use std::{env, str::FromStr};
|
||||
use tokio::process::Command;
|
||||
use tracing::instrument;
|
||||
use url::Url;
|
||||
use utils::{
|
||||
auth::{Claims, Scope},
|
||||
id::{NodeId, TenantId},
|
||||
@@ -59,6 +60,7 @@ pub struct InspectResponse {
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct TenantCreateResponseShard {
|
||||
pub shard_id: TenantShardId,
|
||||
pub node_id: NodeId,
|
||||
pub generation: u32,
|
||||
}
|
||||
@@ -523,13 +525,15 @@ impl AttachmentService {
|
||||
RQ: Serialize + Sized,
|
||||
RS: DeserializeOwned + Sized,
|
||||
{
|
||||
let url = self
|
||||
.env
|
||||
.control_plane_api
|
||||
.clone()
|
||||
.unwrap()
|
||||
.join(&path)
|
||||
.unwrap();
|
||||
// The configured URL has the /upcall path prefix for pageservers to use: we will strip that out
|
||||
// for general purpose API access.
|
||||
let listen_url = self.env.control_plane_api.clone().unwrap();
|
||||
let url = Url::from_str(&format!(
|
||||
"http://{}:{}/{path}",
|
||||
listen_url.host_str().unwrap(),
|
||||
listen_url.port().unwrap()
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let mut builder = self.client.request(method, url);
|
||||
if let Some(body) = body {
|
||||
@@ -566,7 +570,7 @@ impl AttachmentService {
|
||||
let response = self
|
||||
.dispatch::<_, AttachHookResponse>(
|
||||
Method::POST,
|
||||
"attach-hook".to_string(),
|
||||
"debug/v1/attach-hook".to_string(),
|
||||
Some(request),
|
||||
)
|
||||
.await?;
|
||||
@@ -582,7 +586,11 @@ impl AttachmentService {
|
||||
let request = InspectRequest { tenant_shard_id };
|
||||
|
||||
let response = self
|
||||
.dispatch::<_, InspectResponse>(Method::POST, "inspect".to_string(), Some(request))
|
||||
.dispatch::<_, InspectResponse>(
|
||||
Method::POST,
|
||||
"debug/v1/inspect".to_string(),
|
||||
Some(request),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(response.attachment)
|
||||
@@ -599,8 +607,12 @@ impl AttachmentService {
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result<TenantLocateResponse> {
|
||||
self.dispatch::<(), _>(Method::GET, format!("tenant/{tenant_id}/locate"), None)
|
||||
.await
|
||||
self.dispatch::<(), _>(
|
||||
Method::GET,
|
||||
format!("control/v1/tenant/{tenant_id}/locate"),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
@@ -622,7 +634,7 @@ impl AttachmentService {
|
||||
|
||||
#[instrument(skip_all, fields(node_id=%req.node_id))]
|
||||
pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> {
|
||||
self.dispatch::<_, ()>(Method::POST, "node".to_string(), Some(req))
|
||||
self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -630,7 +642,7 @@ impl AttachmentService {
|
||||
pub async fn node_configure(&self, req: NodeConfigureRequest) -> anyhow::Result<()> {
|
||||
self.dispatch::<_, ()>(
|
||||
Method::PUT,
|
||||
format!("node/{}/config", req.node_id),
|
||||
format!("control/v1/node/{}/config", req.node_id),
|
||||
Some(req),
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -51,7 +51,7 @@ project_git_version!(GIT_VERSION);
|
||||
|
||||
const DEFAULT_PG_VERSION: &str = "15";
|
||||
|
||||
const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/";
|
||||
const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/";
|
||||
|
||||
fn default_conf(num_pageservers: u16) -> String {
|
||||
let mut template = format!(
|
||||
|
||||
@@ -364,6 +364,19 @@ pub struct TenantLocationConfigRequest {
|
||||
pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TenantShardLocation {
|
||||
pub shard_id: TenantShardId,
|
||||
pub node_id: NodeId,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TenantLocationConfigResponse {
|
||||
pub shards: Vec<TenantShardLocation>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct TenantConfigRequest {
|
||||
|
||||
@@ -69,6 +69,25 @@ impl Client {
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
/// Get an arbitrary path and returning a streaming Response. This function is suitable
|
||||
/// for pass-through/proxy use cases where we don't care what the response content looks
|
||||
/// like.
|
||||
///
|
||||
/// Use/add one of the properly typed methods below if you know aren't proxying, and
|
||||
/// know what kind of response you expect.
|
||||
pub async fn get_raw(&self, path: String) -> Result<reqwest::Response> {
|
||||
debug_assert!(path.starts_with('/'));
|
||||
let uri = format!("{}{}", self.mgmt_api_endpoint, path);
|
||||
|
||||
let req = self.client.request(Method::GET, uri);
|
||||
let req = if let Some(value) = &self.authorization_header {
|
||||
req.header(reqwest::header::AUTHORIZATION, value)
|
||||
} else {
|
||||
req
|
||||
};
|
||||
req.send().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn tenant_details(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -171,6 +190,25 @@ impl Client {
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
/// The tenant deletion API can return 202 if deletion is incomplete, or
|
||||
/// 404 if it is complete. Callers are responsible for checking the status
|
||||
/// code and retrying. Error codes other than 404 will return Err().
|
||||
pub async fn tenant_delete(&self, tenant_shard_id: TenantShardId) -> Result<StatusCode> {
|
||||
let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint);
|
||||
|
||||
match self.request(Method::DELETE, &uri, ()).await {
|
||||
Err(Error::ApiError(status_code, msg)) => {
|
||||
if status_code == StatusCode::NOT_FOUND {
|
||||
Ok(StatusCode::NOT_FOUND)
|
||||
} else {
|
||||
Err(Error::ApiError(status_code, msg))
|
||||
}
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
Ok(response) => Ok(response.status()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> {
|
||||
let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint);
|
||||
self.request(Method::PUT, &uri, req).await?;
|
||||
@@ -234,6 +272,32 @@ impl Client {
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
/// The timeline deletion API can return 201 if deletion is incomplete, or
|
||||
/// 403 if it is complete. Callers are responsible for checking the status
|
||||
/// code and retrying. Error codes other than 403 will return Err().
|
||||
pub async fn timeline_delete(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<StatusCode> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
|
||||
self.mgmt_api_endpoint
|
||||
);
|
||||
|
||||
match self.request(Method::DELETE, &uri, ()).await {
|
||||
Err(Error::ApiError(status_code, msg)) => {
|
||||
if status_code == StatusCode::NOT_FOUND {
|
||||
Ok(StatusCode::NOT_FOUND)
|
||||
} else {
|
||||
Err(Error::ApiError(status_code, msg))
|
||||
}
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
Ok(response) => Ok(response.status()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/reset",
|
||||
|
||||
@@ -674,6 +674,10 @@ paths:
|
||||
responses:
|
||||
"200":
|
||||
description: Tenant is now in requested state
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/TenantLocationConfigResponse"
|
||||
"503":
|
||||
description: Tenant's state cannot be changed right now. Wait a few seconds and retry.
|
||||
content:
|
||||
@@ -1426,6 +1430,27 @@ components:
|
||||
$ref: '#/components/schemas/SecondaryConfig'
|
||||
tenant_conf:
|
||||
$ref: '#/components/schemas/TenantConfig'
|
||||
TenantLocationConfigResponse:
|
||||
type: object
|
||||
required:
|
||||
- shards
|
||||
properties:
|
||||
shards:
|
||||
description: Pageservers where this tenant's shards are attached. Not populated for secondary locations.
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/TenantShardLocation"
|
||||
TenantShardLocation:
|
||||
type: object
|
||||
required:
|
||||
- node_id
|
||||
- shard_id
|
||||
properties:
|
||||
node_id:
|
||||
description: Pageserver node ID where this shard is attached
|
||||
type: integer
|
||||
shard_id: Tenant shard ID of the shard
|
||||
type: string
|
||||
SecondaryConfig:
|
||||
type: object
|
||||
properties:
|
||||
|
||||
@@ -17,6 +17,8 @@ use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use pageserver_api::models::LocationConfigListResponse;
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::models::TenantDetails;
|
||||
use pageserver_api::models::TenantLocationConfigResponse;
|
||||
use pageserver_api::models::TenantShardLocation;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::{
|
||||
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
|
||||
@@ -1356,7 +1358,7 @@ async fn put_tenant_location_config_handler(
|
||||
let location_conf =
|
||||
LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
|
||||
|
||||
state
|
||||
let attached = state
|
||||
.tenant_manager
|
||||
.upsert_location(
|
||||
tenant_shard_id,
|
||||
@@ -1365,7 +1367,8 @@ async fn put_tenant_location_config_handler(
|
||||
tenant::SpawnMode::Normal,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
.await?
|
||||
.is_some();
|
||||
|
||||
if let Some(_flush_ms) = flush {
|
||||
match state
|
||||
@@ -1384,7 +1387,18 @@ async fn put_tenant_location_config_handler(
|
||||
tracing::info!("No flush requested when configuring");
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
// This API returns a vector of pageservers where the tenant is attached: this is
|
||||
// primarily for use in the sharding service. For compatibilty, we also return this
|
||||
// when called directly on a pageserver, but the payload is always zero or one shards.
|
||||
let mut response = TenantLocationConfigResponse { shards: Vec::new() };
|
||||
if attached {
|
||||
response.shards.push(TenantShardLocation {
|
||||
shard_id: tenant_shard_id,
|
||||
node_id: state.conf.id,
|
||||
})
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
async fn list_location_config_handler(
|
||||
|
||||
@@ -993,13 +993,20 @@ class NeonEnv:
|
||||
self.initial_tenant = config.initial_tenant
|
||||
self.initial_timeline = config.initial_timeline
|
||||
|
||||
attachment_service_port = self.port_distributor.get_port()
|
||||
# Reserve the next port after attachment service for use by its postgres: this
|
||||
# will assert out if the next port wasn't free.
|
||||
attachment_service_pg_port = self.port_distributor.get_port()
|
||||
assert attachment_service_pg_port == attachment_service_port + 1
|
||||
# Find two adjacent ports for attachment service and its postgres DB. This
|
||||
# loop would eventually throw from get_port() if we run out of ports (extremely
|
||||
# unlikely): usually we find two adjacent free ports on the first iteration.
|
||||
while True:
|
||||
self.attachment_service_port = self.port_distributor.get_port()
|
||||
attachment_service_pg_port = self.port_distributor.get_port()
|
||||
if attachment_service_pg_port == self.attachment_service_port + 1:
|
||||
break
|
||||
|
||||
# The URL for the pageserver to use as its control_plane_api config
|
||||
self.control_plane_api: str = f"http://127.0.0.1:{self.attachment_service_port}/upcall/v1"
|
||||
# The base URL of the attachment service
|
||||
self.attachment_service_api: str = f"http://127.0.0.1:{self.attachment_service_port}"
|
||||
|
||||
self.control_plane_api: str = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service: NeonAttachmentService = NeonAttachmentService(
|
||||
self, config.auth_enabled
|
||||
)
|
||||
@@ -1914,6 +1921,14 @@ class NeonAttachmentService:
|
||||
self.running = False
|
||||
return self
|
||||
|
||||
def pageserver_api(self) -> PageserverHttpClient:
|
||||
"""
|
||||
The attachment service implements a subset of the pageserver REST API, for mapping
|
||||
per-tenant actions into per-shard actions (e.g. timeline creation). Tests should invoke those
|
||||
functions via the HttpClient, as an implicit check that these APIs remain compatible.
|
||||
"""
|
||||
return PageserverHttpClient(self.env.attachment_service_port, lambda: True)
|
||||
|
||||
def request(self, method, *args, **kwargs) -> requests.Response:
|
||||
kwargs["headers"] = self.headers()
|
||||
return requests.request(method, *args, **kwargs)
|
||||
@@ -1931,7 +1946,7 @@ class NeonAttachmentService:
|
||||
) -> int:
|
||||
response = self.request(
|
||||
"POST",
|
||||
f"{self.env.control_plane_api}/attach-hook",
|
||||
f"{self.env.attachment_service_api}/debug/v1/attach-hook",
|
||||
json={"tenant_shard_id": str(tenant_shard_id), "node_id": pageserver_id},
|
||||
headers=self.headers(),
|
||||
)
|
||||
@@ -1943,7 +1958,7 @@ class NeonAttachmentService:
|
||||
def attach_hook_drop(self, tenant_shard_id: Union[TenantId, TenantShardId]):
|
||||
response = self.request(
|
||||
"POST",
|
||||
f"{self.env.control_plane_api}/attach-hook",
|
||||
f"{self.env.attachment_service_api}/debug/v1/attach-hook",
|
||||
json={"tenant_shard_id": str(tenant_shard_id), "node_id": None},
|
||||
headers=self.headers(),
|
||||
)
|
||||
@@ -1955,7 +1970,7 @@ class NeonAttachmentService:
|
||||
"""
|
||||
response = self.request(
|
||||
"POST",
|
||||
f"{self.env.control_plane_api}/inspect",
|
||||
f"{self.env.attachment_service_api}/debug/v1/inspect",
|
||||
json={"tenant_shard_id": str(tenant_shard_id)},
|
||||
headers=self.headers(),
|
||||
)
|
||||
@@ -1976,7 +1991,27 @@ class NeonAttachmentService:
|
||||
}
|
||||
log.info(f"node_register({body})")
|
||||
self.request(
|
||||
"POST", f"{self.env.control_plane_api}/node", json=body, headers=self.headers()
|
||||
"POST",
|
||||
f"{self.env.attachment_service_api}/control/v1/node",
|
||||
json=body,
|
||||
headers=self.headers(),
|
||||
).raise_for_status()
|
||||
|
||||
def node_list(self):
|
||||
response = self.request(
|
||||
"GET", f"{self.env.attachment_service_api}/control/v1/node", headers=self.headers()
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def node_configure(self, node_id, body: dict[str, Any]):
|
||||
log.info(f"node_configure({node_id}, {body})")
|
||||
body["node_id"] = node_id
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.env.attachment_service_api}/control/v1/node/{node_id}/config",
|
||||
json=body,
|
||||
headers=self.headers(),
|
||||
).raise_for_status()
|
||||
|
||||
def tenant_create(
|
||||
@@ -1986,6 +2021,9 @@ class NeonAttachmentService:
|
||||
shard_stripe_size: Optional[int] = None,
|
||||
tenant_config: Optional[Dict[Any, Any]] = None,
|
||||
):
|
||||
"""
|
||||
Use this rather than pageserver_api() when you need to include shard parameters
|
||||
"""
|
||||
body: Dict[str, Any] = {"new_tenant_id": str(tenant_id)}
|
||||
|
||||
if shard_count is not None:
|
||||
@@ -1999,21 +2037,17 @@ class NeonAttachmentService:
|
||||
for k, v in tenant_config.items():
|
||||
body[k] = v
|
||||
|
||||
response = self.request("POST", f"{self.env.control_plane_api}/tenant", json=body)
|
||||
response = self.request("POST", f"{self.env.attachment_service_api}/v1/tenant", json=body)
|
||||
response.raise_for_status()
|
||||
log.info(f"tenant_create success: {response.json()}")
|
||||
|
||||
def tenant_timeline_create(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
body: Dict[str, Any] = {"new_timeline_id": str(timeline_id)}
|
||||
|
||||
response = self.request(
|
||||
"POST", f"{self.env.control_plane_api}/tenant/{tenant_id}/timeline", json=body
|
||||
)
|
||||
response.raise_for_status()
|
||||
log.info(f"tenant_timeline_create success: {response.json()}")
|
||||
|
||||
def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]:
|
||||
response = self.request("GET", f"{self.env.control_plane_api}/tenant/{tenant_id}/locate")
|
||||
"""
|
||||
:return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr: str, "listen_http_port: int}
|
||||
"""
|
||||
response = self.request(
|
||||
"GET", f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_id}/locate"
|
||||
)
|
||||
response.raise_for_status()
|
||||
body = response.json()
|
||||
shards: list[dict[str, Any]] = body["shards"]
|
||||
@@ -2022,7 +2056,7 @@ class NeonAttachmentService:
|
||||
def tenant_shard_split(self, tenant_id: TenantId, shard_count: int) -> list[TenantShardId]:
|
||||
response = self.request(
|
||||
"PUT",
|
||||
f"{self.env.control_plane_api}/tenant/{tenant_id}/shard_split",
|
||||
f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_id}/shard_split",
|
||||
json={"new_shard_count": shard_count},
|
||||
)
|
||||
response.raise_for_status()
|
||||
@@ -2034,7 +2068,7 @@ class NeonAttachmentService:
|
||||
def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int):
|
||||
response = self.request(
|
||||
"PUT",
|
||||
f"{self.env.control_plane_api}/tenant/{tenant_shard_id}/migrate",
|
||||
f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_shard_id}/migrate",
|
||||
json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id},
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
272
test_runner/regress/test_sharding_service.py
Normal file
272
test_runner/regress/test_sharding_service.py
Normal file
@@ -0,0 +1,272 @@
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import tenant_delete_wait_completed, timeline_delete_wait_completed
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
def test_sharding_service_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Test the basic lifecycle of a sharding service:
|
||||
- Restarting
|
||||
- Restarting a pageserver
|
||||
- Creating and deleting tenants and timelines
|
||||
- Marking a pageserver offline
|
||||
"""
|
||||
|
||||
neon_env_builder.num_pageservers = 3
|
||||
env = neon_env_builder.init_configs()
|
||||
|
||||
# Start services by hand so that we can skip a pageserver (this will start + register later)
|
||||
env.broker.try_start()
|
||||
env.attachment_service.start()
|
||||
env.pageservers[0].start()
|
||||
env.pageservers[1].start()
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
|
||||
# The pageservers we started should have registered with the sharding service on startup
|
||||
nodes = env.attachment_service.node_list()
|
||||
assert len(nodes) == 2
|
||||
assert set(n["node_id"] for n in nodes) == {env.pageservers[0].id, env.pageservers[1].id}
|
||||
|
||||
# Starting an additional pageserver should register successfully
|
||||
env.pageservers[2].start()
|
||||
nodes = env.attachment_service.node_list()
|
||||
assert len(nodes) == 3
|
||||
assert set(n["node_id"] for n in nodes) == {ps.id for ps in env.pageservers}
|
||||
|
||||
# Use a multiple of pageservers to get nice even number of shards on each one
|
||||
tenant_shard_count = len(env.pageservers) * 4
|
||||
tenant_count = len(env.pageservers) * 2
|
||||
shards_per_tenant = tenant_shard_count // tenant_count
|
||||
tenant_ids = set(TenantId.generate() for i in range(0, tenant_count))
|
||||
|
||||
# Creating several tenants should spread out across the pageservers
|
||||
for tid in tenant_ids:
|
||||
env.neon_cli.create_tenant(tid, shard_count=shards_per_tenant)
|
||||
|
||||
def get_node_shard_counts():
|
||||
counts: defaultdict[str, int] = defaultdict(int)
|
||||
for tid in tenant_ids:
|
||||
for shard in env.attachment_service.locate(tid):
|
||||
counts[shard["node_id"]] += 1
|
||||
return counts
|
||||
|
||||
for node_id, count in get_node_shard_counts().items():
|
||||
# we used a multiple of pagservers for the total shard count,
|
||||
# so expect equal number on all pageservers
|
||||
assert count == tenant_shard_count / len(
|
||||
env.pageservers
|
||||
), f"Node {node_id} has bad count {count}"
|
||||
|
||||
# Creating and deleting timelines should work, using identical API to pageserver
|
||||
timeline_crud_tenant = next(iter(tenant_ids))
|
||||
timeline_id = TimelineId.generate()
|
||||
env.attachment_service.pageserver_api().timeline_create(
|
||||
pg_version=PgVersion.NOT_SET, tenant_id=timeline_crud_tenant, new_timeline_id=timeline_id
|
||||
)
|
||||
timelines = env.attachment_service.pageserver_api().timeline_list(timeline_crud_tenant)
|
||||
assert len(timelines) == 2
|
||||
assert timeline_id in set(TimelineId(t["timeline_id"]) for t in timelines)
|
||||
# virtual_ps_http.timeline_delete(tenant_id=timeline_crud_tenant, timeline_id=timeline_id)
|
||||
timeline_delete_wait_completed(
|
||||
env.attachment_service.pageserver_api(), timeline_crud_tenant, timeline_id
|
||||
)
|
||||
timelines = env.attachment_service.pageserver_api().timeline_list(timeline_crud_tenant)
|
||||
assert len(timelines) == 1
|
||||
assert timeline_id not in set(TimelineId(t["timeline_id"]) for t in timelines)
|
||||
|
||||
# Marking a pageserver offline should migrate tenants away from it.
|
||||
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"})
|
||||
|
||||
def node_evacuated(node_id: int):
|
||||
counts = get_node_shard_counts()
|
||||
assert counts[node_id] == 0
|
||||
|
||||
wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id))
|
||||
|
||||
# Marking pageserver active should not migrate anything to it
|
||||
# immediately
|
||||
env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Active"})
|
||||
time.sleep(1)
|
||||
assert get_node_shard_counts()[env.pageservers[0].id] == 0
|
||||
|
||||
# Delete all the tenants
|
||||
for tid in tenant_ids:
|
||||
tenant_delete_wait_completed(env.attachment_service.pageserver_api(), tid, 10)
|
||||
|
||||
# Set a scheduling policy on one node, create all the tenants, observe
|
||||
# that the scheduling policy is respected.
|
||||
env.attachment_service.node_configure(env.pageservers[1].id, {"scheduling": "Draining"})
|
||||
|
||||
# Create some fresh tenants
|
||||
tenant_ids = set(TenantId.generate() for i in range(0, tenant_count))
|
||||
for tid in tenant_ids:
|
||||
env.neon_cli.create_tenant(tid, shard_count=shards_per_tenant)
|
||||
|
||||
counts = get_node_shard_counts()
|
||||
# Nothing should have been scheduled on the node in Draining
|
||||
assert counts[env.pageservers[1].id] == 0
|
||||
assert counts[env.pageservers[0].id] == tenant_shard_count // 2
|
||||
assert counts[env.pageservers[2].id] == tenant_shard_count // 2
|
||||
|
||||
|
||||
def test_sharding_service_passthrough(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
For simple timeline/tenant GET APIs that don't require coordination across
|
||||
shards, the sharding service implements a proxy to shard zero. This test
|
||||
calls those APIs.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
# We will talk to attachment service as if it was a pageserver, using the pageserver
|
||||
# HTTP client
|
||||
client = PageserverHttpClient(env.attachment_service_port, lambda: True)
|
||||
timelines = client.timeline_list(tenant_id=env.initial_tenant)
|
||||
assert len(timelines) == 1
|
||||
|
||||
|
||||
def test_sharding_service_restart(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
tenant_a = env.initial_tenant
|
||||
tenant_b = TenantId.generate()
|
||||
env.attachment_service.tenant_create(tenant_b)
|
||||
env.pageserver.tenant_detach(tenant_a)
|
||||
|
||||
# TODO: extend this test to use multiple pageservers, and check that locations don't move around
|
||||
# on restart.
|
||||
|
||||
# Attachment service restart
|
||||
env.attachment_service.stop()
|
||||
env.attachment_service.start()
|
||||
|
||||
observed = set(TenantId(tenant["id"]) for tenant in env.pageserver.http_client().tenant_list())
|
||||
|
||||
# Tenant A should still be attached
|
||||
assert tenant_a not in observed
|
||||
|
||||
# Tenant B should remain detached
|
||||
assert tenant_b in observed
|
||||
|
||||
# Pageserver restart
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
# Same assertions as above: restarting either service should not perturb things
|
||||
observed = set(TenantId(tenant["id"]) for tenant in env.pageserver.http_client().tenant_list())
|
||||
assert tenant_a not in observed
|
||||
assert tenant_b in observed
|
||||
|
||||
|
||||
def test_sharding_service_onboarding(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
We onboard tenants to the sharding service by treating it as a 'virtual pageserver'
|
||||
which provides the /location_config API. This is similar to creating a tenant,
|
||||
but imports the generation number.
|
||||
"""
|
||||
|
||||
neon_env_builder.num_pageservers = 2
|
||||
|
||||
# Start services by hand so that we can skip registration on one of the pageservers
|
||||
env = neon_env_builder.init_configs()
|
||||
env.broker.try_start()
|
||||
env.attachment_service.start()
|
||||
|
||||
# This is the pageserver where we'll initially create the tenant
|
||||
env.pageservers[0].start(register=False)
|
||||
origin_ps = env.pageservers[0]
|
||||
|
||||
# This is the pageserver managed by the sharding service, where the tenant
|
||||
# will be attached after onboarding
|
||||
env.pageservers[1].start(register=True)
|
||||
dest_ps = env.pageservers[1]
|
||||
virtual_ps_http = PageserverHttpClient(env.attachment_service_port, lambda: True)
|
||||
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
|
||||
# Create a tenant directly via pageserver HTTP API, skipping the attachment service
|
||||
tenant_id = TenantId.generate()
|
||||
generation = 123
|
||||
origin_ps.http_client().tenant_create(tenant_id, generation=generation)
|
||||
|
||||
# As if doing a live migration, first configure origin into stale mode
|
||||
origin_ps.http_client().tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "AttachedStale",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": generation,
|
||||
},
|
||||
)
|
||||
|
||||
# Call into attachment service to onboard the tenant
|
||||
generation += 1
|
||||
virtual_ps_http.tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "AttachedMulti",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": generation,
|
||||
},
|
||||
)
|
||||
|
||||
# As if doing a live migration, detach the original pageserver
|
||||
origin_ps.http_client().tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "Detached",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": None,
|
||||
},
|
||||
)
|
||||
|
||||
# As if doing a live migration, call into the attachment service to
|
||||
# set it to AttachedSingle: this is a no-op, but we test it because the
|
||||
# cloud control plane may call this for symmetry with live migration to
|
||||
# an individual pageserver
|
||||
virtual_ps_http.tenant_location_conf(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "AttachedSingle",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": generation,
|
||||
},
|
||||
)
|
||||
|
||||
# We should see the tenant is now attached to the pageserver managed
|
||||
# by the sharding service
|
||||
origin_tenants = origin_ps.http_client().tenant_list()
|
||||
assert len(origin_tenants) == 0
|
||||
dest_tenants = dest_ps.http_client().tenant_list()
|
||||
assert len(dest_tenants) == 1
|
||||
assert TenantId(dest_tenants[0]["id"]) == tenant_id
|
||||
|
||||
# sharding service advances generation by 1 when it first attaches
|
||||
assert dest_tenants[0]["generation"] == generation + 1
|
||||
|
||||
# The onboarded tenant should survive a restart of sharding service
|
||||
env.attachment_service.stop()
|
||||
env.attachment_service.start()
|
||||
|
||||
# The onboarded tenant should surviev a restart of pageserver
|
||||
dest_ps.stop()
|
||||
dest_ps.start()
|
||||
Reference in New Issue
Block a user