From 4010adf653252306a4ce9227b87bf9a23e9d155c Mon Sep 17 00:00:00 2001 From: John Spray Date: Wed, 31 Jan 2024 12:23:06 +0000 Subject: [PATCH] control_plane/attachment_service: complete APIs (#6394) Depends on: https://github.com/neondatabase/neon/pull/6468 ## Problem The sharding service will be used as a "virtual pageserver" by the control plane -- so it needs the set of pageserver APIs that the control plane uses, and to present them under identical URLs, including prefix (/v1). ## Summary of changes - Add missing APIs: - Tenant deletion - Timeline deletion - Node list (used in test now, later in tools) - `/location_config` API (for migrating tenants into the sharding service) - Rework attachment service URLs: - `/v1` prefix is used for pageserver-compatible APIs - `/upcall/v1` prefix is used for APIs that are called by the pageserver (re-attach and validate) - `/debug/v1` prefix is used for endpoints that are for testing - `/control/v1` prefix is used for new sharding service APIs that do not mimic a pageserver API, such as registering and configuring nodes. - Add test_sharding_service. The sharding service already had some collateral coverage from its use in general tests, but this is the first dedicated testing for it. --- Cargo.lock | 1 - control_plane/attachment_service/Cargo.toml | 4 - control_plane/attachment_service/src/http.rs | 200 ++++++++- .../attachment_service/src/persistence.rs | 41 -- .../attachment_service/src/service.rs | 422 ++++++++++++++++-- control_plane/src/attachment_service.rs | 38 +- control_plane/src/bin/neon_local.rs | 2 +- libs/pageserver_api/src/models.rs | 13 + pageserver/client/src/mgmt_api.rs | 64 +++ pageserver/src/http/openapi_spec.yml | 25 ++ pageserver/src/http/routes.rs | 20 +- test_runner/fixtures/neon_fixtures.py | 80 +++- test_runner/regress/test_sharding_service.py | 272 +++++++++++ 13 files changed, 1059 insertions(+), 123 deletions(-) create mode 100644 test_runner/regress/test_sharding_service.py diff --git a/Cargo.lock b/Cargo.lock index a669fef314..e14196350b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -285,7 +285,6 @@ dependencies = [ "metrics", "pageserver_api", "pageserver_client", - "postgres_backend", "postgres_connection", "serde", "serde_json", diff --git a/control_plane/attachment_service/Cargo.toml b/control_plane/attachment_service/Cargo.toml index 6fc21810bc..210a898747 100644 --- a/control_plane/attachment_service/Cargo.toml +++ b/control_plane/attachment_service/Cargo.toml @@ -21,10 +21,6 @@ tokio.workspace = true tokio-util.workspace = true tracing.workspace = true -# TODO: remove this after DB persistence is added, it is only used for -# a parsing function when loading pageservers from neon_local LocalEnv -postgres_backend.workspace = true - diesel = { version = "2.1.4", features = ["serde_json", "postgres"] } utils = { path = "../../libs/utils/" } diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs index 81f21a8e7a..aa8c73c493 100644 --- a/control_plane/attachment_service/src/http.rs +++ b/control_plane/attachment_service/src/http.rs @@ -2,13 +2,17 @@ use crate::reconciler::ReconcileError; use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT}; use hyper::{Body, Request, Response}; use hyper::{StatusCode, Uri}; -use pageserver_api::models::{TenantCreateRequest, TimelineCreateRequest}; +use pageserver_api::models::{ + TenantCreateRequest, TenantLocationConfigRequest, TimelineCreateRequest, +}; use pageserver_api::shard::TenantShardId; +use pageserver_client::mgmt_api; use std::sync::Arc; +use std::time::{Duration, Instant}; use utils::auth::SwappableJwtAuth; use utils::http::endpoint::{auth_middleware, request_span}; use utils::http::request::parse_request_param; -use utils::id::TenantId; +use utils::id::{TenantId, TimelineId}; use utils::{ http::{ @@ -112,6 +116,78 @@ async fn handle_tenant_create( json_response(StatusCode::OK, service.tenant_create(create_req).await?) } +// For tenant and timeline deletions, which both implement an "initially return 202, then 404 once +// we're done" semantic, we wrap with a retry loop to expose a simpler API upstream. This avoids +// needing to track a "deleting" state for tenants. +async fn deletion_wrapper(service: Arc, f: F) -> Result, ApiError> +where + R: std::future::Future> + Send + 'static, + F: Fn(Arc) -> R + Send + Sync + 'static, +{ + let started_at = Instant::now(); + // To keep deletion reasonably snappy for small tenants, initially check after 1 second if deletion + // completed. + let mut retry_period = Duration::from_secs(1); + // On subsequent retries, wait longer. + let max_retry_period = Duration::from_secs(5); + // Enable callers with a 30 second request timeout to reliably get a response + let max_wait = Duration::from_secs(25); + + loop { + let status = f(service.clone()).await?; + match status { + StatusCode::ACCEPTED => { + tracing::info!("Deletion accepted, waiting to try again..."); + tokio::time::sleep(retry_period).await; + retry_period = max_retry_period; + } + StatusCode::NOT_FOUND => { + tracing::info!("Deletion complete"); + return json_response(StatusCode::OK, ()); + } + _ => { + tracing::warn!("Unexpected status {status}"); + return json_response(status, ()); + } + } + + let now = Instant::now(); + if now + retry_period > started_at + max_wait { + tracing::info!("Deletion timed out waiting for 404"); + // REQUEST_TIMEOUT would be more appropriate, but CONFLICT is already part of + // the pageserver's swagger definition for this endpoint, and has the same desired + // effect of causing the control plane to retry later. + return json_response(StatusCode::CONFLICT, ()); + } + } +} + +async fn handle_tenant_location_config( + service: Arc, + mut req: Request, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + let config_req = json_request::(&mut req).await?; + json_response( + StatusCode::OK, + service + .tenant_location_config(tenant_id, config_req) + .await?, + ) +} + +async fn handle_tenant_delete( + service: Arc, + req: Request, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + + deletion_wrapper(service, move |service| async move { + service.tenant_delete(tenant_id).await + }) + .await +} + async fn handle_tenant_timeline_create( service: Arc, mut req: Request, @@ -126,6 +202,63 @@ async fn handle_tenant_timeline_create( ) } +async fn handle_tenant_timeline_delete( + service: Arc, + req: Request, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?; + + deletion_wrapper(service, move |service| async move { + service.tenant_timeline_delete(tenant_id, timeline_id).await + }) + .await +} + +async fn handle_tenant_timeline_passthrough( + service: Arc, + req: Request, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + + let Some(path) = req.uri().path_and_query() else { + // This should never happen, our request router only calls us if there is a path + return Err(ApiError::BadRequest(anyhow::anyhow!("Missing path"))); + }; + + tracing::info!("Proxying request for tenant {} ({})", tenant_id, path); + + // Find the node that holds shard zero + let (base_url, tenant_shard_id) = service.tenant_shard0_baseurl(tenant_id)?; + + // Callers will always pass an unsharded tenant ID. Before proxying, we must + // rewrite this to a shard-aware shard zero ID. + let path = format!("{}", path); + let tenant_str = tenant_id.to_string(); + let tenant_shard_str = format!("{}", tenant_shard_id); + let path = path.replace(&tenant_str, &tenant_shard_str); + + let client = mgmt_api::Client::new(base_url, service.get_config().jwt_token.as_deref()); + let resp = client.get_raw(path).await.map_err(|_e| + // FIXME: give APiError a proper Unavailable variant. We return 503 here because + // if we can't successfully send a request to the pageserver, we aren't available. + ApiError::ShuttingDown)?; + + // We have a reqest::Response, would like a http::Response + let mut builder = hyper::Response::builder() + .status(resp.status()) + .version(resp.version()); + for (k, v) in resp.headers() { + builder = builder.header(k, v); + } + + let response = builder + .body(Body::wrap_stream(resp.bytes_stream())) + .map_err(|e| ApiError::InternalServerError(e.into()))?; + + Ok(response) +} + async fn handle_tenant_locate( service: Arc, req: Request, @@ -141,6 +274,11 @@ async fn handle_node_register(mut req: Request) -> Result, json_response(StatusCode::OK, ()) } +async fn handle_node_list(req: Request) -> Result, ApiError> { + let state = get_state(&req); + json_response(StatusCode::OK, state.service.node_list().await?) +} + async fn handle_node_configure(mut req: Request) -> Result, ApiError> { let node_id: NodeId = parse_request_param(&req, "node_id")?; let config_req = json_request::(&mut req).await?; @@ -226,26 +364,64 @@ pub fn make_router( router .data(Arc::new(HttpState::new(service, auth))) + // Non-prefixed generic endpoints (status, metrics) .get("/status", |r| request_span(r, handle_status)) - .post("/re-attach", |r| request_span(r, handle_re_attach)) - .post("/validate", |r| request_span(r, handle_validate)) - .post("/attach-hook", |r| request_span(r, handle_attach_hook)) - .post("/inspect", |r| request_span(r, handle_inspect)) - .post("/node", |r| request_span(r, handle_node_register)) - .put("/node/:node_id/config", |r| { + // Upcalls for the pageserver: point the pageserver's `control_plane_api` config to this prefix + .post("/upcall/v1/re-attach", |r| { + request_span(r, handle_re_attach) + }) + .post("/upcall/v1/validate", |r| request_span(r, handle_validate)) + // Test/dev/debug endpoints + .post("/debug/v1/attach-hook", |r| { + request_span(r, handle_attach_hook) + }) + .post("/debug/v1/inspect", |r| request_span(r, handle_inspect)) + .get("/control/v1/tenant/:tenant_id/locate", |r| { + tenant_service_handler(r, handle_tenant_locate) + }) + // Node operations + .post("/control/v1/node", |r| { + request_span(r, handle_node_register) + }) + .get("/control/v1/node", |r| request_span(r, handle_node_list)) + .put("/control/v1/node/:node_id/config", |r| { request_span(r, handle_node_configure) }) + // Tenant Shard operations + .put("/control/v1/tenant/:tenant_shard_id/migrate", |r| { + tenant_service_handler(r, handle_tenant_shard_migrate) + }) + // Tenant operations + // The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into + // this service to manage tenants that actually consist of many tenant shards, as if they are a single entity. .post("/v1/tenant", |r| { tenant_service_handler(r, handle_tenant_create) }) + .delete("/v1/tenant/:tenant_id", |r| { + tenant_service_handler(r, handle_tenant_delete) + }) + .put("/v1/tenant/:tenant_id/location_config", |r| { + tenant_service_handler(r, handle_tenant_location_config) + }) + // Tenant Shard operations (low level/maintenance) + .put("/tenant/:tenant_shard_id/migrate", |r| { + tenant_service_handler(r, handle_tenant_shard_migrate) + }) + // Timeline operations + .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| { + tenant_service_handler(r, handle_tenant_timeline_delete) + }) .post("/v1/tenant/:tenant_id/timeline", |r| { tenant_service_handler(r, handle_tenant_timeline_create) }) - .get("/tenant/:tenant_id/locate", |r| { - tenant_service_handler(r, handle_tenant_locate) + // Tenant detail GET passthrough to shard zero + .get("/v1/tenant/:tenant_id*", |r| { + tenant_service_handler(r, handle_tenant_timeline_passthrough) }) - .put("/tenant/:tenant_shard_id/migrate", |r| { - tenant_service_handler(r, handle_tenant_shard_migrate) + // Timeline GET passthrough to shard zero. Note that the `*` in the URL is a wildcard: any future + // timeline GET APIs will be implicitly included. + .get("/v1/tenant/:tenant_id/timeline*", |r| { + tenant_service_handler(r, handle_tenant_timeline_passthrough) }) // Path aliases for tests_forward_compatibility // TODO: remove these in future PR diff --git a/control_plane/attachment_service/src/persistence.rs b/control_plane/attachment_service/src/persistence.rs index b27bd2bf2e..574441c409 100644 --- a/control_plane/attachment_service/src/persistence.rs +++ b/control_plane/attachment_service/src/persistence.rs @@ -9,7 +9,6 @@ use diesel::prelude::*; use diesel::Connection; use pageserver_api::models::TenantConfig; use pageserver_api::shard::{ShardCount, ShardNumber, TenantShardId}; -use postgres_connection::parse_host_port; use serde::{Deserialize, Serialize}; use utils::generation::Generation; use utils::id::{NodeId, TenantId}; @@ -129,51 +128,11 @@ impl Persistence { }) .await?; - if nodes.is_empty() { - return self.list_nodes_local_env().await; - } - tracing::info!("list_nodes: loaded {} nodes", nodes.len()); Ok(nodes) } - /// Shim for automated compatibility tests: load nodes from LocalEnv instead of database - pub(crate) async fn list_nodes_local_env(&self) -> DatabaseResult> { - // Enable test_backward_compatibility to work by populating our list of - // nodes from LocalEnv when it is not present in persistent storage. Otherwise at - // first startup in the compat test, we may have shards but no nodes. - use control_plane::local_env::LocalEnv; - let env = LocalEnv::load_config().map_err(|e| DatabaseError::Logical(format!("{e}")))?; - tracing::info!( - "Loading {} pageserver nodes from LocalEnv", - env.pageservers.len() - ); - let mut nodes = Vec::new(); - for ps_conf in env.pageservers { - let (pg_host, pg_port) = - parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); - let (http_host, http_port) = parse_host_port(&ps_conf.listen_http_addr) - .expect("Unable to parse listen_http_addr"); - let node = Node { - id: ps_conf.id, - listen_pg_addr: pg_host.to_string(), - listen_pg_port: pg_port.unwrap_or(5432), - listen_http_addr: http_host.to_string(), - listen_http_port: http_port.unwrap_or(80), - availability: NodeAvailability::Active, - scheduling: NodeSchedulingPolicy::Active, - }; - - // Synchronize database with what we learn from LocalEnv - self.insert_node(&node).await?; - - nodes.push(node); - } - - Ok(nodes) - } - /// At startup, load the high level state for shards, such as their config + policy. This will /// be enriched at runtime with state discovered on pageservers. pub(crate) async fn list_tenant_shards(&self) -> DatabaseResult> { diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index ec56dc8ad4..8c6a348515 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -21,6 +21,7 @@ use pageserver_api::{ models, models::{ LocationConfig, LocationConfigMode, ShardParameters, TenantConfig, TenantCreateRequest, + TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation, TimelineCreateRequest, TimelineInfo, }, shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId}, @@ -30,14 +31,14 @@ use utils::{ completion::Barrier, generation::Generation, http::error::ApiError, - id::{NodeId, TenantId}, + id::{NodeId, TenantId, TimelineId}, seqwait::SeqWait, }; use crate::{ compute_hook::ComputeHook, node::Node, - persistence::{DatabaseError, Persistence, TenantShardPersistence}, + persistence::{DatabaseError, NodePersistence, Persistence, TenantShardPersistence}, scheduler::Scheduler, tenant_state::{ IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError, @@ -635,7 +636,7 @@ impl Service { shard_number: tenant_shard_id.shard_number.0 as i32, shard_count: tenant_shard_id.shard_count.0 as i32, shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32, - generation: 0, + generation: create_req.generation.map(|g| g as i32).unwrap_or(0), generation_pageserver: i64::MAX, placement_policy: serde_json::to_string(&placement_policy).unwrap(), config: serde_json::to_string(&create_req.config).unwrap(), @@ -677,6 +678,7 @@ impl Service { })?; response_shards.push(TenantCreateResponseShard { + shard_id: tenant_shard_id, node_id: entry .get() .intent @@ -709,6 +711,7 @@ impl Service { })?; response_shards.push(TenantCreateResponseShard { + shard_id: tenant_shard_id, node_id: state .intent .attached @@ -742,14 +745,257 @@ impl Service { (waiters, response_shards) }; - let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap(); + self.await_waiters(waiters).await?; + + Ok(TenantCreateResponse { + shards: response_shards, + }) + } + + /// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded + /// wait for reconciliation to complete before responding. + async fn await_waiters( + &self, + waiters: Vec, + ) -> Result<(), ReconcileWaitError> { + let deadline = Instant::now().checked_add(Duration::from_secs(30)).unwrap(); for waiter in waiters { let timeout = deadline.duration_since(Instant::now()); waiter.wait_timeout(timeout).await?; } - Ok(TenantCreateResponse { - shards: response_shards, - }) + + Ok(()) + } + + /// This API is used by the cloud control plane to do coarse-grained control of tenants: + /// - Call with mode Attached* to upsert the tenant. + /// - Call with mode Detached to switch to PolicyMode::Detached + /// + /// In future, calling with mode Secondary may switch to a detach-lite mode in which a tenant only has + /// secondary locations. + pub(crate) async fn tenant_location_config( + &self, + tenant_id: TenantId, + req: TenantLocationConfigRequest, + ) -> Result { + if req.tenant_id.shard_count.0 > 1 { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "This API is for importing single-sharded or unsharded tenants" + ))); + } + + let mut waiters = Vec::new(); + let mut result = TenantLocationConfigResponse { shards: Vec::new() }; + let maybe_create = { + let mut locked = self.inner.write().unwrap(); + let result_tx = locked.result_tx.clone(); + let compute_hook = locked.compute_hook.clone(); + let pageservers = locked.nodes.clone(); + + let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes); + + // Maybe we have existing shards + let mut create = true; + for (shard_id, shard) in locked + .tenants + .range_mut(TenantShardId::tenant_range(tenant_id)) + { + // Saw an existing shard: this is not a creation + create = false; + + // Note that for existing tenants we do _not_ respect the generation in the request: this is likely + // to be stale. Once a tenant is created in this service, our view of generation is authoritative, and + // callers' generations may be ignored. This represents a one-way migration of tenants from the outer + // cloud control plane into this service. + + // Use location config mode as an indicator of policy: if they ask for + // attached we go to default HA attached mode. If they ask for secondary + // we go to secondary-only mode. If they ask for detached we detach. + match req.config.mode { + LocationConfigMode::Detached => { + shard.policy = PlacementPolicy::Detached; + } + LocationConfigMode::Secondary => { + // TODO: implement secondary-only mode. + todo!(); + } + LocationConfigMode::AttachedMulti + | LocationConfigMode::AttachedSingle + | LocationConfigMode::AttachedStale => { + // TODO: persistence for changes in policy + if pageservers.len() > 1 { + shard.policy = PlacementPolicy::Double(1) + } else { + // Convenience for dev/test: if we just have one pageserver, import + // tenants into Single mode so that scheduling will succeed. + shard.policy = PlacementPolicy::Single + } + } + } + + shard.schedule(&mut scheduler)?; + + let maybe_waiter = shard.maybe_reconcile( + result_tx.clone(), + &pageservers, + &compute_hook, + &self.config, + &self.persistence, + ); + if let Some(waiter) = maybe_waiter { + waiters.push(waiter); + } + + if let Some(node_id) = shard.intent.attached { + result.shards.push(TenantShardLocation { + shard_id: *shard_id, + node_id, + }) + } + } + + if create { + // Validate request mode + match req.config.mode { + LocationConfigMode::Detached | LocationConfigMode::Secondary => { + // When using this API to onboard an existing tenant to this service, it must start in + // an attached state, because we need the request to come with a generation + return Err(ApiError::BadRequest(anyhow::anyhow!( + "Imported tenant must be in attached mode" + ))); + } + + LocationConfigMode::AttachedMulti + | LocationConfigMode::AttachedSingle + | LocationConfigMode::AttachedStale => { + // Pass + } + } + + // Validate request generation + let Some(generation) = req.config.generation else { + // We can only import attached tenants, because we need the request to come with a generation + return Err(ApiError::BadRequest(anyhow::anyhow!( + "Generation is mandatory when importing tenant" + ))); + }; + + // Synthesize a creation request + Some(TenantCreateRequest { + new_tenant_id: TenantShardId::unsharded(tenant_id), + generation: Some(generation), + shard_parameters: ShardParameters { + // Must preserve the incoming shard_count do distinguish unsharded (0) + // from single-sharded (1): this distinction appears in the S3 keys of the tenant. + count: req.tenant_id.shard_count, + // We only import un-sharded or single-sharded tenants, so stripe + // size can be made up arbitrarily here. + stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE, + }, + config: req.config.tenant_conf, + }) + } else { + None + } + }; + + if let Some(create_req) = maybe_create { + let create_resp = self.tenant_create(create_req).await?; + result.shards = create_resp + .shards + .into_iter() + .map(|s| TenantShardLocation { + node_id: s.node_id, + shard_id: s.shard_id, + }) + .collect(); + } else { + // This was an update, wait for reconciliation + self.await_waiters(waiters).await?; + } + + Ok(result) + } + + pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result { + // TODO: refactor into helper + let targets = { + let locked = self.inner.read().unwrap(); + let mut targets = Vec::new(); + + for (tenant_shard_id, shard) in + locked.tenants.range(TenantShardId::tenant_range(tenant_id)) + { + let node_id = shard.intent.attached.ok_or_else(|| { + ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled")) + })?; + let node = locked + .nodes + .get(&node_id) + .expect("Pageservers may not be deleted while referenced"); + + targets.push((*tenant_shard_id, node.clone())); + } + targets + }; + + // TODO: error out if the tenant is not attached anywhere. + + // Phase 1: delete on the pageservers + let mut any_pending = false; + for (tenant_shard_id, node) in targets { + let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref()); + // TODO: this, like many other places, requires proper retry handling for 503, timeout: those should not + // surface immediately as an error to our caller. + let status = client.tenant_delete(tenant_shard_id).await.map_err(|e| { + ApiError::InternalServerError(anyhow::anyhow!( + "Error deleting shard {tenant_shard_id} on node {}: {e}", + node.id + )) + })?; + tracing::info!( + "Shard {tenant_shard_id} on node {}, delete returned {}", + node.id, + status + ); + if status == StatusCode::ACCEPTED { + any_pending = true; + } + } + + if any_pending { + // Caller should call us again later. When we eventually see 404s from + // all the shards, we may proceed to delete our records of the tenant. + tracing::info!( + "Tenant {} has some shards pending deletion, returning 202", + tenant_id + ); + return Ok(StatusCode::ACCEPTED); + } + + // Fall through: deletion of the tenant on pageservers is complete, we may proceed to drop + // our in-memory state and database state. + + // Ordering: we delete persistent state first: if we then + // crash, we will drop the in-memory state. + + // Drop persistent state. + self.persistence.delete_tenant(tenant_id).await?; + + // Drop in-memory state + { + let mut locked = self.inner.write().unwrap(); + locked + .tenants + .retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id); + tracing::info!( + "Deleted tenant {tenant_id}, now have {} tenants", + locked.tenants.len() + ); + }; + + // Success is represented as 404, to imitate the existing pageserver deletion API + Ok(StatusCode::NOT_FOUND) } pub(crate) async fn tenant_timeline_create( @@ -759,25 +1005,15 @@ impl Service { ) -> Result { let mut timeline_info = None; - let ensure_waiters = { - let locked = self.inner.write().unwrap(); - tracing::info!( - "Creating timeline {}/{}, have {} pageservers", - tenant_id, - create_req.new_timeline_id, - locked.nodes.len() - ); + tracing::info!( + "Creating timeline {}/{}", + tenant_id, + create_req.new_timeline_id, + ); - self.ensure_attached(locked, tenant_id) - .map_err(ApiError::InternalServerError)? - }; - - let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap(); - for waiter in ensure_waiters { - let timeout = deadline.duration_since(Instant::now()); - waiter.wait_timeout(timeout).await?; - } + self.ensure_attached_wait(tenant_id).await?; + // TODO: refuse to do this if shard splitting is in progress let targets = { let locked = self.inner.read().unwrap(); let mut targets = Vec::new(); @@ -848,6 +1084,111 @@ impl Service { Ok(timeline_info.expect("targets cannot be empty")) } + pub(crate) async fn tenant_timeline_delete( + &self, + tenant_id: TenantId, + timeline_id: TimelineId, + ) -> Result { + tracing::info!("Deleting timeline {}/{}", tenant_id, timeline_id,); + + self.ensure_attached_wait(tenant_id).await?; + + // TODO: refuse to do this if shard splitting is in progress + let targets = { + let locked = self.inner.read().unwrap(); + let mut targets = Vec::new(); + + for (tenant_shard_id, shard) in + locked.tenants.range(TenantShardId::tenant_range(tenant_id)) + { + let node_id = shard.intent.attached.ok_or_else(|| { + ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled")) + })?; + let node = locked + .nodes + .get(&node_id) + .expect("Pageservers may not be deleted while referenced"); + + targets.push((*tenant_shard_id, node.clone())); + } + targets + }; + + if targets.is_empty() { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant not found").into(), + )); + } + + // TODO: call into shards concurrently + let mut any_pending = false; + for (tenant_shard_id, node) in targets { + let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref()); + + tracing::info!( + "Deleting timeline on shard {}/{}, attached to node {}", + tenant_shard_id, + timeline_id, + node.id + ); + + let status = client + .timeline_delete(tenant_shard_id, timeline_id) + .await + .map_err(|e| { + ApiError::InternalServerError(anyhow::anyhow!( + "Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}", + node.id + )) + })?; + + if status == StatusCode::ACCEPTED { + any_pending = true; + } + } + + if any_pending { + Ok(StatusCode::ACCEPTED) + } else { + Ok(StatusCode::NOT_FOUND) + } + } + + /// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this + /// function looks it up and returns the url. If the tenant isn't found, returns Err(ApiError::NotFound) + pub(crate) fn tenant_shard0_baseurl( + &self, + tenant_id: TenantId, + ) -> Result<(String, TenantShardId), ApiError> { + let locked = self.inner.read().unwrap(); + let Some((tenant_shard_id, shard)) = locked + .tenants + .range(TenantShardId::tenant_range(tenant_id)) + .next() + else { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant {tenant_id} not found").into(), + )); + }; + + // TODO: should use the ID last published to compute_hook, rather than the intent: the intent might + // point to somewhere we haven't attached yet. + let Some(node_id) = shard.intent.attached else { + return Err(ApiError::Conflict( + "Cannot call timeline API on non-attached tenant".to_string(), + )); + }; + + let Some(node) = locked.nodes.get(&node_id) else { + // This should never happen + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Shard refers to nonexistent node" + ))); + }; + + Ok((node.base_url(), *tenant_shard_id)) + } + pub(crate) fn tenant_locate( &self, tenant_id: TenantId, @@ -993,6 +1334,20 @@ impl Service { Ok(TenantShardMigrateResponse {}) } + pub(crate) async fn node_list(&self) -> Result, ApiError> { + // It is convenient to avoid taking the big lock and converting Node to a serializable + // structure, by fetching from storage instead of reading in-memory state. + let nodes = self + .persistence + .list_nodes() + .await? + .into_iter() + .map(|n| n.to_persistent()) + .collect(); + + Ok(nodes) + } + pub(crate) async fn node_register( &self, register_req: NodeRegisterRequest, @@ -1166,7 +1521,7 @@ impl Service { /// Helper for methods that will try and call pageserver APIs for /// a tenant, such as timeline CRUD: they cannot proceed unless the tenant /// is attached somewhere. - fn ensure_attached( + fn ensure_attached_schedule( &self, mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>, tenant_id: TenantId, @@ -1196,6 +1551,23 @@ impl Service { Ok(waiters) } + async fn ensure_attached_wait(&self, tenant_id: TenantId) -> Result<(), ApiError> { + let ensure_waiters = { + let locked = self.inner.write().unwrap(); + + self.ensure_attached_schedule(locked, tenant_id) + .map_err(ApiError::InternalServerError)? + }; + + let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap(); + for waiter in ensure_waiters { + let timeout = deadline.duration_since(Instant::now()); + waiter.wait_timeout(timeout).await?; + } + + Ok(()) + } + /// Check all tenants for pending reconciliation work, and reconcile those in need /// /// Returns how many reconciliation tasks were started diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 6602aa9a73..7816d0953b 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -17,6 +17,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::{env, str::FromStr}; use tokio::process::Command; use tracing::instrument; +use url::Url; use utils::{ auth::{Claims, Scope}, id::{NodeId, TenantId}, @@ -59,6 +60,7 @@ pub struct InspectResponse { #[derive(Serialize, Deserialize)] pub struct TenantCreateResponseShard { + pub shard_id: TenantShardId, pub node_id: NodeId, pub generation: u32, } @@ -523,13 +525,15 @@ impl AttachmentService { RQ: Serialize + Sized, RS: DeserializeOwned + Sized, { - let url = self - .env - .control_plane_api - .clone() - .unwrap() - .join(&path) - .unwrap(); + // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out + // for general purpose API access. + let listen_url = self.env.control_plane_api.clone().unwrap(); + let url = Url::from_str(&format!( + "http://{}:{}/{path}", + listen_url.host_str().unwrap(), + listen_url.port().unwrap() + )) + .unwrap(); let mut builder = self.client.request(method, url); if let Some(body) = body { @@ -566,7 +570,7 @@ impl AttachmentService { let response = self .dispatch::<_, AttachHookResponse>( Method::POST, - "attach-hook".to_string(), + "debug/v1/attach-hook".to_string(), Some(request), ) .await?; @@ -582,7 +586,11 @@ impl AttachmentService { let request = InspectRequest { tenant_shard_id }; let response = self - .dispatch::<_, InspectResponse>(Method::POST, "inspect".to_string(), Some(request)) + .dispatch::<_, InspectResponse>( + Method::POST, + "debug/v1/inspect".to_string(), + Some(request), + ) .await?; Ok(response.attachment) @@ -599,8 +607,12 @@ impl AttachmentService { #[instrument(skip(self))] pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result { - self.dispatch::<(), _>(Method::GET, format!("tenant/{tenant_id}/locate"), None) - .await + self.dispatch::<(), _>( + Method::GET, + format!("control/v1/tenant/{tenant_id}/locate"), + None, + ) + .await } #[instrument(skip(self))] @@ -622,7 +634,7 @@ impl AttachmentService { #[instrument(skip_all, fields(node_id=%req.node_id))] pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> { - self.dispatch::<_, ()>(Method::POST, "node".to_string(), Some(req)) + self.dispatch::<_, ()>(Method::POST, "control/v1/node".to_string(), Some(req)) .await } @@ -630,7 +642,7 @@ impl AttachmentService { pub async fn node_configure(&self, req: NodeConfigureRequest) -> anyhow::Result<()> { self.dispatch::<_, ()>( Method::PUT, - format!("node/{}/config", req.node_id), + format!("control/v1/node/{}/config", req.node_id), Some(req), ) .await diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index a5242e3dc7..d5abda729f 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -51,7 +51,7 @@ project_git_version!(GIT_VERSION); const DEFAULT_PG_VERSION: &str = "15"; -const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/"; +const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/upcall/v1/"; fn default_conf(num_pageservers: u16) -> String { let mut template = format!( diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 86d2c2a7ca..d885553cc7 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -364,6 +364,19 @@ pub struct TenantLocationConfigRequest { pub config: LocationConfig, // as we have a flattened field, we should reject all unknown fields in it } +#[derive(Serialize, Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct TenantShardLocation { + pub shard_id: TenantShardId, + pub node_id: NodeId, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct TenantLocationConfigResponse { + pub shards: Vec, +} + #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TenantConfigRequest { diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 077c3909e1..91b9afa026 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -69,6 +69,25 @@ impl Client { resp.json().await.map_err(Error::ReceiveBody) } + /// Get an arbitrary path and returning a streaming Response. This function is suitable + /// for pass-through/proxy use cases where we don't care what the response content looks + /// like. + /// + /// Use/add one of the properly typed methods below if you know aren't proxying, and + /// know what kind of response you expect. + pub async fn get_raw(&self, path: String) -> Result { + debug_assert!(path.starts_with('/')); + let uri = format!("{}{}", self.mgmt_api_endpoint, path); + + let req = self.client.request(Method::GET, uri); + let req = if let Some(value) = &self.authorization_header { + req.header(reqwest::header::AUTHORIZATION, value) + } else { + req + }; + req.send().await.map_err(Error::ReceiveBody) + } + pub async fn tenant_details( &self, tenant_shard_id: TenantShardId, @@ -171,6 +190,25 @@ impl Client { .map_err(Error::ReceiveBody) } + /// The tenant deletion API can return 202 if deletion is incomplete, or + /// 404 if it is complete. Callers are responsible for checking the status + /// code and retrying. Error codes other than 404 will return Err(). + pub async fn tenant_delete(&self, tenant_shard_id: TenantShardId) -> Result { + let uri = format!("{}/v1/tenant/{tenant_shard_id}", self.mgmt_api_endpoint); + + match self.request(Method::DELETE, &uri, ()).await { + Err(Error::ApiError(status_code, msg)) => { + if status_code == StatusCode::NOT_FOUND { + Ok(StatusCode::NOT_FOUND) + } else { + Err(Error::ApiError(status_code, msg)) + } + } + Err(e) => Err(e), + Ok(response) => Ok(response.status()), + } + } + pub async fn tenant_config(&self, req: &TenantConfigRequest) -> Result<()> { let uri = format!("{}/v1/tenant/config", self.mgmt_api_endpoint); self.request(Method::PUT, &uri, req).await?; @@ -234,6 +272,32 @@ impl Client { .map_err(Error::ReceiveBody) } + /// The timeline deletion API can return 201 if deletion is incomplete, or + /// 403 if it is complete. Callers are responsible for checking the status + /// code and retrying. Error codes other than 403 will return Err(). + pub async fn timeline_delete( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + ) -> Result { + let uri = format!( + "{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}", + self.mgmt_api_endpoint + ); + + match self.request(Method::DELETE, &uri, ()).await { + Err(Error::ApiError(status_code, msg)) => { + if status_code == StatusCode::NOT_FOUND { + Ok(StatusCode::NOT_FOUND) + } else { + Err(Error::ApiError(status_code, msg)) + } + } + Err(e) => Err(e), + Ok(response) => Ok(response.status()), + } + } + pub async fn tenant_reset(&self, tenant_shard_id: TenantShardId) -> Result<()> { let uri = format!( "{}/v1/tenant/{}/reset", diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index a49eef8bb9..676a63937d 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -674,6 +674,10 @@ paths: responses: "200": description: Tenant is now in requested state + content: + application/json: + schema: + $ref: "#/components/schemas/TenantLocationConfigResponse" "503": description: Tenant's state cannot be changed right now. Wait a few seconds and retry. content: @@ -1426,6 +1430,27 @@ components: $ref: '#/components/schemas/SecondaryConfig' tenant_conf: $ref: '#/components/schemas/TenantConfig' + TenantLocationConfigResponse: + type: object + required: + - shards + properties: + shards: + description: Pageservers where this tenant's shards are attached. Not populated for secondary locations. + type: array + items: + $ref: "#/components/schemas/TenantShardLocation" + TenantShardLocation: + type: object + required: + - node_id + - shard_id + properties: + node_id: + description: Pageserver node ID where this shard is attached + type: integer + shard_id: Tenant shard ID of the shard + type: string SecondaryConfig: type: object properties: diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index aa56806246..c025a25ef1 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -17,6 +17,8 @@ use metrics::launch_timestamp::LaunchTimestamp; use pageserver_api::models::LocationConfigListResponse; use pageserver_api::models::ShardParameters; use pageserver_api::models::TenantDetails; +use pageserver_api::models::TenantLocationConfigResponse; +use pageserver_api::models::TenantShardLocation; use pageserver_api::models::TenantState; use pageserver_api::models::{ DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest, @@ -1356,7 +1358,7 @@ async fn put_tenant_location_config_handler( let location_conf = LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?; - state + let attached = state .tenant_manager .upsert_location( tenant_shard_id, @@ -1365,7 +1367,8 @@ async fn put_tenant_location_config_handler( tenant::SpawnMode::Normal, &ctx, ) - .await?; + .await? + .is_some(); if let Some(_flush_ms) = flush { match state @@ -1384,7 +1387,18 @@ async fn put_tenant_location_config_handler( tracing::info!("No flush requested when configuring"); } - json_response(StatusCode::OK, ()) + // This API returns a vector of pageservers where the tenant is attached: this is + // primarily for use in the sharding service. For compatibilty, we also return this + // when called directly on a pageserver, but the payload is always zero or one shards. + let mut response = TenantLocationConfigResponse { shards: Vec::new() }; + if attached { + response.shards.push(TenantShardLocation { + shard_id: tenant_shard_id, + node_id: state.conf.id, + }) + } + + json_response(StatusCode::OK, response) } async fn list_location_config_handler( diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 0f79df74ba..5be7551a1e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -993,13 +993,20 @@ class NeonEnv: self.initial_tenant = config.initial_tenant self.initial_timeline = config.initial_timeline - attachment_service_port = self.port_distributor.get_port() - # Reserve the next port after attachment service for use by its postgres: this - # will assert out if the next port wasn't free. - attachment_service_pg_port = self.port_distributor.get_port() - assert attachment_service_pg_port == attachment_service_port + 1 + # Find two adjacent ports for attachment service and its postgres DB. This + # loop would eventually throw from get_port() if we run out of ports (extremely + # unlikely): usually we find two adjacent free ports on the first iteration. + while True: + self.attachment_service_port = self.port_distributor.get_port() + attachment_service_pg_port = self.port_distributor.get_port() + if attachment_service_pg_port == self.attachment_service_port + 1: + break + + # The URL for the pageserver to use as its control_plane_api config + self.control_plane_api: str = f"http://127.0.0.1:{self.attachment_service_port}/upcall/v1" + # The base URL of the attachment service + self.attachment_service_api: str = f"http://127.0.0.1:{self.attachment_service_port}" - self.control_plane_api: str = f"http://127.0.0.1:{attachment_service_port}" self.attachment_service: NeonAttachmentService = NeonAttachmentService( self, config.auth_enabled ) @@ -1914,6 +1921,14 @@ class NeonAttachmentService: self.running = False return self + def pageserver_api(self) -> PageserverHttpClient: + """ + The attachment service implements a subset of the pageserver REST API, for mapping + per-tenant actions into per-shard actions (e.g. timeline creation). Tests should invoke those + functions via the HttpClient, as an implicit check that these APIs remain compatible. + """ + return PageserverHttpClient(self.env.attachment_service_port, lambda: True) + def request(self, method, *args, **kwargs) -> requests.Response: kwargs["headers"] = self.headers() return requests.request(method, *args, **kwargs) @@ -1931,7 +1946,7 @@ class NeonAttachmentService: ) -> int: response = self.request( "POST", - f"{self.env.control_plane_api}/attach-hook", + f"{self.env.attachment_service_api}/debug/v1/attach-hook", json={"tenant_shard_id": str(tenant_shard_id), "node_id": pageserver_id}, headers=self.headers(), ) @@ -1943,7 +1958,7 @@ class NeonAttachmentService: def attach_hook_drop(self, tenant_shard_id: Union[TenantId, TenantShardId]): response = self.request( "POST", - f"{self.env.control_plane_api}/attach-hook", + f"{self.env.attachment_service_api}/debug/v1/attach-hook", json={"tenant_shard_id": str(tenant_shard_id), "node_id": None}, headers=self.headers(), ) @@ -1955,7 +1970,7 @@ class NeonAttachmentService: """ response = self.request( "POST", - f"{self.env.control_plane_api}/inspect", + f"{self.env.attachment_service_api}/debug/v1/inspect", json={"tenant_shard_id": str(tenant_shard_id)}, headers=self.headers(), ) @@ -1976,7 +1991,27 @@ class NeonAttachmentService: } log.info(f"node_register({body})") self.request( - "POST", f"{self.env.control_plane_api}/node", json=body, headers=self.headers() + "POST", + f"{self.env.attachment_service_api}/control/v1/node", + json=body, + headers=self.headers(), + ).raise_for_status() + + def node_list(self): + response = self.request( + "GET", f"{self.env.attachment_service_api}/control/v1/node", headers=self.headers() + ) + response.raise_for_status() + return response.json() + + def node_configure(self, node_id, body: dict[str, Any]): + log.info(f"node_configure({node_id}, {body})") + body["node_id"] = node_id + self.request( + "PUT", + f"{self.env.attachment_service_api}/control/v1/node/{node_id}/config", + json=body, + headers=self.headers(), ).raise_for_status() def tenant_create( @@ -1986,6 +2021,9 @@ class NeonAttachmentService: shard_stripe_size: Optional[int] = None, tenant_config: Optional[Dict[Any, Any]] = None, ): + """ + Use this rather than pageserver_api() when you need to include shard parameters + """ body: Dict[str, Any] = {"new_tenant_id": str(tenant_id)} if shard_count is not None: @@ -1999,21 +2037,17 @@ class NeonAttachmentService: for k, v in tenant_config.items(): body[k] = v - response = self.request("POST", f"{self.env.control_plane_api}/tenant", json=body) + response = self.request("POST", f"{self.env.attachment_service_api}/v1/tenant", json=body) response.raise_for_status() log.info(f"tenant_create success: {response.json()}") - def tenant_timeline_create(self, tenant_id: TenantId, timeline_id: TimelineId): - body: Dict[str, Any] = {"new_timeline_id": str(timeline_id)} - - response = self.request( - "POST", f"{self.env.control_plane_api}/tenant/{tenant_id}/timeline", json=body - ) - response.raise_for_status() - log.info(f"tenant_timeline_create success: {response.json()}") - def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]: - response = self.request("GET", f"{self.env.control_plane_api}/tenant/{tenant_id}/locate") + """ + :return: list of {"shard_id": "", "node_id": int, "listen_pg_addr": str, "listen_pg_port": int, "listen_http_addr: str, "listen_http_port: int} + """ + response = self.request( + "GET", f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_id}/locate" + ) response.raise_for_status() body = response.json() shards: list[dict[str, Any]] = body["shards"] @@ -2022,7 +2056,7 @@ class NeonAttachmentService: def tenant_shard_split(self, tenant_id: TenantId, shard_count: int) -> list[TenantShardId]: response = self.request( "PUT", - f"{self.env.control_plane_api}/tenant/{tenant_id}/shard_split", + f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_id}/shard_split", json={"new_shard_count": shard_count}, ) response.raise_for_status() @@ -2034,7 +2068,7 @@ class NeonAttachmentService: def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int): response = self.request( "PUT", - f"{self.env.control_plane_api}/tenant/{tenant_shard_id}/migrate", + f"{self.env.attachment_service_api}/control/v1/tenant/{tenant_shard_id}/migrate", json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id}, ) response.raise_for_status() diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py new file mode 100644 index 0000000000..3b2c9334db --- /dev/null +++ b/test_runner/regress/test_sharding_service.py @@ -0,0 +1,272 @@ +import time +from collections import defaultdict + +from fixtures.neon_fixtures import ( + NeonEnvBuilder, +) +from fixtures.pageserver.http import PageserverHttpClient +from fixtures.pageserver.utils import tenant_delete_wait_completed, timeline_delete_wait_completed +from fixtures.pg_version import PgVersion +from fixtures.types import TenantId, TimelineId +from fixtures.utils import wait_until + + +def test_sharding_service_smoke( + neon_env_builder: NeonEnvBuilder, +): + """ + Test the basic lifecycle of a sharding service: + - Restarting + - Restarting a pageserver + - Creating and deleting tenants and timelines + - Marking a pageserver offline + """ + + neon_env_builder.num_pageservers = 3 + env = neon_env_builder.init_configs() + + # Start services by hand so that we can skip a pageserver (this will start + register later) + env.broker.try_start() + env.attachment_service.start() + env.pageservers[0].start() + env.pageservers[1].start() + for sk in env.safekeepers: + sk.start() + + # The pageservers we started should have registered with the sharding service on startup + nodes = env.attachment_service.node_list() + assert len(nodes) == 2 + assert set(n["node_id"] for n in nodes) == {env.pageservers[0].id, env.pageservers[1].id} + + # Starting an additional pageserver should register successfully + env.pageservers[2].start() + nodes = env.attachment_service.node_list() + assert len(nodes) == 3 + assert set(n["node_id"] for n in nodes) == {ps.id for ps in env.pageservers} + + # Use a multiple of pageservers to get nice even number of shards on each one + tenant_shard_count = len(env.pageservers) * 4 + tenant_count = len(env.pageservers) * 2 + shards_per_tenant = tenant_shard_count // tenant_count + tenant_ids = set(TenantId.generate() for i in range(0, tenant_count)) + + # Creating several tenants should spread out across the pageservers + for tid in tenant_ids: + env.neon_cli.create_tenant(tid, shard_count=shards_per_tenant) + + def get_node_shard_counts(): + counts: defaultdict[str, int] = defaultdict(int) + for tid in tenant_ids: + for shard in env.attachment_service.locate(tid): + counts[shard["node_id"]] += 1 + return counts + + for node_id, count in get_node_shard_counts().items(): + # we used a multiple of pagservers for the total shard count, + # so expect equal number on all pageservers + assert count == tenant_shard_count / len( + env.pageservers + ), f"Node {node_id} has bad count {count}" + + # Creating and deleting timelines should work, using identical API to pageserver + timeline_crud_tenant = next(iter(tenant_ids)) + timeline_id = TimelineId.generate() + env.attachment_service.pageserver_api().timeline_create( + pg_version=PgVersion.NOT_SET, tenant_id=timeline_crud_tenant, new_timeline_id=timeline_id + ) + timelines = env.attachment_service.pageserver_api().timeline_list(timeline_crud_tenant) + assert len(timelines) == 2 + assert timeline_id in set(TimelineId(t["timeline_id"]) for t in timelines) + # virtual_ps_http.timeline_delete(tenant_id=timeline_crud_tenant, timeline_id=timeline_id) + timeline_delete_wait_completed( + env.attachment_service.pageserver_api(), timeline_crud_tenant, timeline_id + ) + timelines = env.attachment_service.pageserver_api().timeline_list(timeline_crud_tenant) + assert len(timelines) == 1 + assert timeline_id not in set(TimelineId(t["timeline_id"]) for t in timelines) + + # Marking a pageserver offline should migrate tenants away from it. + env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Offline"}) + + def node_evacuated(node_id: int): + counts = get_node_shard_counts() + assert counts[node_id] == 0 + + wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id)) + + # Marking pageserver active should not migrate anything to it + # immediately + env.attachment_service.node_configure(env.pageservers[0].id, {"availability": "Active"}) + time.sleep(1) + assert get_node_shard_counts()[env.pageservers[0].id] == 0 + + # Delete all the tenants + for tid in tenant_ids: + tenant_delete_wait_completed(env.attachment_service.pageserver_api(), tid, 10) + + # Set a scheduling policy on one node, create all the tenants, observe + # that the scheduling policy is respected. + env.attachment_service.node_configure(env.pageservers[1].id, {"scheduling": "Draining"}) + + # Create some fresh tenants + tenant_ids = set(TenantId.generate() for i in range(0, tenant_count)) + for tid in tenant_ids: + env.neon_cli.create_tenant(tid, shard_count=shards_per_tenant) + + counts = get_node_shard_counts() + # Nothing should have been scheduled on the node in Draining + assert counts[env.pageservers[1].id] == 0 + assert counts[env.pageservers[0].id] == tenant_shard_count // 2 + assert counts[env.pageservers[2].id] == tenant_shard_count // 2 + + +def test_sharding_service_passthrough( + neon_env_builder: NeonEnvBuilder, +): + """ + For simple timeline/tenant GET APIs that don't require coordination across + shards, the sharding service implements a proxy to shard zero. This test + calls those APIs. + """ + neon_env_builder.num_pageservers = 2 + env = neon_env_builder.init_start() + + # We will talk to attachment service as if it was a pageserver, using the pageserver + # HTTP client + client = PageserverHttpClient(env.attachment_service_port, lambda: True) + timelines = client.timeline_list(tenant_id=env.initial_tenant) + assert len(timelines) == 1 + + +def test_sharding_service_restart(neon_env_builder: NeonEnvBuilder): + env = neon_env_builder.init_start() + tenant_a = env.initial_tenant + tenant_b = TenantId.generate() + env.attachment_service.tenant_create(tenant_b) + env.pageserver.tenant_detach(tenant_a) + + # TODO: extend this test to use multiple pageservers, and check that locations don't move around + # on restart. + + # Attachment service restart + env.attachment_service.stop() + env.attachment_service.start() + + observed = set(TenantId(tenant["id"]) for tenant in env.pageserver.http_client().tenant_list()) + + # Tenant A should still be attached + assert tenant_a not in observed + + # Tenant B should remain detached + assert tenant_b in observed + + # Pageserver restart + env.pageserver.stop() + env.pageserver.start() + + # Same assertions as above: restarting either service should not perturb things + observed = set(TenantId(tenant["id"]) for tenant in env.pageserver.http_client().tenant_list()) + assert tenant_a not in observed + assert tenant_b in observed + + +def test_sharding_service_onboarding( + neon_env_builder: NeonEnvBuilder, +): + """ + We onboard tenants to the sharding service by treating it as a 'virtual pageserver' + which provides the /location_config API. This is similar to creating a tenant, + but imports the generation number. + """ + + neon_env_builder.num_pageservers = 2 + + # Start services by hand so that we can skip registration on one of the pageservers + env = neon_env_builder.init_configs() + env.broker.try_start() + env.attachment_service.start() + + # This is the pageserver where we'll initially create the tenant + env.pageservers[0].start(register=False) + origin_ps = env.pageservers[0] + + # This is the pageserver managed by the sharding service, where the tenant + # will be attached after onboarding + env.pageservers[1].start(register=True) + dest_ps = env.pageservers[1] + virtual_ps_http = PageserverHttpClient(env.attachment_service_port, lambda: True) + + for sk in env.safekeepers: + sk.start() + + # Create a tenant directly via pageserver HTTP API, skipping the attachment service + tenant_id = TenantId.generate() + generation = 123 + origin_ps.http_client().tenant_create(tenant_id, generation=generation) + + # As if doing a live migration, first configure origin into stale mode + origin_ps.http_client().tenant_location_conf( + tenant_id, + { + "mode": "AttachedStale", + "secondary_conf": None, + "tenant_conf": {}, + "generation": generation, + }, + ) + + # Call into attachment service to onboard the tenant + generation += 1 + virtual_ps_http.tenant_location_conf( + tenant_id, + { + "mode": "AttachedMulti", + "secondary_conf": None, + "tenant_conf": {}, + "generation": generation, + }, + ) + + # As if doing a live migration, detach the original pageserver + origin_ps.http_client().tenant_location_conf( + tenant_id, + { + "mode": "Detached", + "secondary_conf": None, + "tenant_conf": {}, + "generation": None, + }, + ) + + # As if doing a live migration, call into the attachment service to + # set it to AttachedSingle: this is a no-op, but we test it because the + # cloud control plane may call this for symmetry with live migration to + # an individual pageserver + virtual_ps_http.tenant_location_conf( + tenant_id, + { + "mode": "AttachedSingle", + "secondary_conf": None, + "tenant_conf": {}, + "generation": generation, + }, + ) + + # We should see the tenant is now attached to the pageserver managed + # by the sharding service + origin_tenants = origin_ps.http_client().tenant_list() + assert len(origin_tenants) == 0 + dest_tenants = dest_ps.http_client().tenant_list() + assert len(dest_tenants) == 1 + assert TenantId(dest_tenants[0]["id"]) == tenant_id + + # sharding service advances generation by 1 when it first attaches + assert dest_tenants[0]["generation"] == generation + 1 + + # The onboarded tenant should survive a restart of sharding service + env.attachment_service.stop() + env.attachment_service.start() + + # The onboarded tenant should surviev a restart of pageserver + dest_ps.stop() + dest_ps.start()