diff --git a/Cargo.lock b/Cargo.lock index 81540a1915..bf12576a28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -256,6 +256,29 @@ dependencies = [ "critical-section", ] +[[package]] +name = "attachment_service" +version = "0.1.0" +dependencies = [ + "anyhow", + "camino", + "clap", + "control_plane", + "futures", + "hyper", + "pageserver_api", + "pageserver_client", + "postgres_connection", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-util", + "tracing", + "utils", + "workspace_hack", +] + [[package]] name = "autocfg" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index 5de636778a..3992ce8d09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ resolver = "2" members = [ "compute_tools", "control_plane", + "control_plane/attachment_service", "pageserver", "pageserver/ctl", "pageserver/client", diff --git a/control_plane/attachment_service/Cargo.toml b/control_plane/attachment_service/Cargo.toml new file mode 100644 index 0000000000..641b760e82 --- /dev/null +++ b/control_plane/attachment_service/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "attachment_service" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +anyhow.workspace = true +camino.workspace = true +clap.workspace = true +futures.workspace = true +hyper.workspace = true +pageserver_api.workspace = true +pageserver_client.workspace = true +postgres_connection.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +tokio.workspace = true +tokio-util.workspace = true +tracing.workspace = true + +utils = { path = "../../libs/utils/" } +control_plane = { path = ".." } +workspace_hack = { version = "0.1", path = "../../workspace_hack" } + diff --git a/control_plane/attachment_service/src/compute_hook.rs b/control_plane/attachment_service/src/compute_hook.rs new file mode 100644 index 0000000000..02617cd065 --- /dev/null +++ b/control_plane/attachment_service/src/compute_hook.rs @@ -0,0 +1,116 @@ +use std::collections::HashMap; + +use control_plane::endpoint::ComputeControlPlane; +use control_plane::local_env::LocalEnv; +use pageserver_api::shard::{ShardCount, ShardIndex, TenantShardId}; +use postgres_connection::parse_host_port; +use utils::id::{NodeId, TenantId}; + +pub(super) struct ComputeHookTenant { + shards: Vec<(ShardIndex, NodeId)>, +} + +impl ComputeHookTenant { + pub(super) async fn maybe_reconfigure(&mut self, tenant_id: TenantId) -> anyhow::Result<()> { + // Find the highest shard count and drop any shards that aren't + // for that shard count. + let shard_count = self.shards.iter().map(|(k, _v)| k.shard_count).max(); + let Some(shard_count) = shard_count else { + // No shards, nothing to do. + tracing::info!("ComputeHookTenant::maybe_reconfigure: no shards"); + return Ok(()); + }; + + self.shards.retain(|(k, _v)| k.shard_count == shard_count); + self.shards + .sort_by_key(|(shard, _node_id)| shard.shard_number); + + if self.shards.len() == shard_count.0 as usize || shard_count == ShardCount(0) { + // We have pageservers for all the shards: proceed to reconfigure compute + let env = match LocalEnv::load_config() { + Ok(e) => e, + Err(e) => { + tracing::warn!( + "Couldn't load neon_local config, skipping compute update ({e})" + ); + return Ok(()); + } + }; + let cplane = ComputeControlPlane::load(env.clone()) + .expect("Error loading compute control plane"); + + let compute_pageservers = self + .shards + .iter() + .map(|(_shard, node_id)| { + let ps_conf = env + .get_pageserver_conf(*node_id) + .expect("Unknown pageserver"); + let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr) + .expect("Unable to parse listen_pg_addr"); + (pg_host, pg_port.unwrap_or(5432)) + }) + .collect::>(); + + for (endpoint_name, endpoint) in &cplane.endpoints { + if endpoint.tenant_id == tenant_id && endpoint.status() == "running" { + tracing::info!("🔁 Reconfiguring endpoint {}", endpoint_name,); + endpoint.reconfigure(compute_pageservers.clone()).await?; + } + } + } else { + tracing::info!( + "ComputeHookTenant::maybe_reconfigure: not enough shards ({}/{})", + self.shards.len(), + shard_count.0 + ); + } + + Ok(()) + } +} + +/// The compute hook is a destination for notifications about changes to tenant:pageserver +/// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures +/// the compute connection string. +pub(super) struct ComputeHook { + state: tokio::sync::Mutex>, +} + +impl ComputeHook { + pub(super) fn new() -> Self { + Self { + state: Default::default(), + } + } + + pub(super) async fn notify( + &self, + tenant_shard_id: TenantShardId, + node_id: NodeId, + ) -> anyhow::Result<()> { + tracing::info!("ComputeHook::notify: {}->{}", tenant_shard_id, node_id); + let mut locked = self.state.lock().await; + let entry = locked + .entry(tenant_shard_id.tenant_id) + .or_insert_with(|| ComputeHookTenant { shards: Vec::new() }); + + let shard_index = ShardIndex { + shard_count: tenant_shard_id.shard_count, + shard_number: tenant_shard_id.shard_number, + }; + + let mut set = false; + for (existing_shard, existing_node) in &mut entry.shards { + if *existing_shard == shard_index { + *existing_node = node_id; + set = true; + } + } + if !set { + entry.shards.push((shard_index, node_id)); + } + + entry.maybe_reconfigure(tenant_shard_id.tenant_id).await + } +} diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs new file mode 100644 index 0000000000..bfaaf4f092 --- /dev/null +++ b/control_plane/attachment_service/src/http.rs @@ -0,0 +1,204 @@ +use crate::reconciler::ReconcileError; +use crate::service::Service; +use hyper::{Body, Request, Response}; +use hyper::{StatusCode, Uri}; +use pageserver_api::models::{TenantCreateRequest, TimelineCreateRequest}; +use pageserver_api::shard::TenantShardId; +use std::sync::Arc; +use utils::auth::SwappableJwtAuth; +use utils::http::endpoint::{auth_middleware, request_span}; +use utils::http::request::parse_request_param; +use utils::id::TenantId; + +use utils::{ + http::{ + endpoint::{self}, + error::ApiError, + json::{json_request, json_response}, + RequestExt, RouterBuilder, + }, + id::NodeId, +}; + +use pageserver_api::control_api::{ReAttachRequest, ValidateRequest}; + +use control_plane::attachment_service::{ + AttachHookRequest, InspectRequest, NodeConfigureRequest, NodeRegisterRequest, + TenantShardMigrateRequest, +}; + +/// State available to HTTP request handlers +#[derive(Clone)] +pub struct HttpState { + service: Arc, + auth: Option>, + allowlist_routes: Vec, +} + +impl HttpState { + pub fn new(service: Arc, auth: Option>) -> Self { + let allowlist_routes = ["/status"] + .iter() + .map(|v| v.parse().unwrap()) + .collect::>(); + Self { + service, + auth, + allowlist_routes, + } + } +} + +#[inline(always)] +fn get_state(request: &Request) -> &HttpState { + request + .data::>() + .expect("unknown state type") + .as_ref() +} + +/// Pageserver calls into this on startup, to learn which tenants it should attach +async fn handle_re_attach(mut req: Request) -> Result, ApiError> { + let reattach_req = json_request::(&mut req).await?; + let state = get_state(&req); + json_response(StatusCode::OK, state.service.re_attach(reattach_req)) +} + +/// Pageserver calls into this before doing deletions, to confirm that it still +/// holds the latest generation for the tenants with deletions enqueued +async fn handle_validate(mut req: Request) -> Result, ApiError> { + let validate_req = json_request::(&mut req).await?; + let state = get_state(&req); + json_response(StatusCode::OK, state.service.validate(validate_req)) +} + +/// Call into this before attaching a tenant to a pageserver, to acquire a generation number +/// (in the real control plane this is unnecessary, because the same program is managing +/// generation numbers and doing attachments). +async fn handle_attach_hook(mut req: Request) -> Result, ApiError> { + let attach_req = json_request::(&mut req).await?; + let state = get_state(&req); + + json_response(StatusCode::OK, state.service.attach_hook(attach_req)) +} + +async fn handle_inspect(mut req: Request) -> Result, ApiError> { + let inspect_req = json_request::(&mut req).await?; + + let state = get_state(&req); + + json_response(StatusCode::OK, state.service.inspect(inspect_req)) +} + +async fn handle_tenant_create(mut req: Request) -> Result, ApiError> { + let create_req = json_request::(&mut req).await?; + let state = get_state(&req); + json_response( + StatusCode::OK, + state.service.tenant_create(create_req).await?, + ) +} + +async fn handle_tenant_timeline_create(mut req: Request) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + let create_req = json_request::(&mut req).await?; + + let state = get_state(&req); + json_response( + StatusCode::OK, + state + .service + .tenant_timeline_create(tenant_id, create_req) + .await?, + ) +} + +async fn handle_tenant_locate(req: Request) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + let state = get_state(&req); + + json_response(StatusCode::OK, state.service.tenant_locate(tenant_id)?) +} + +async fn handle_node_register(mut req: Request) -> Result, ApiError> { + let register_req = json_request::(&mut req).await?; + let state = get_state(&req); + state.service.node_register(register_req); + json_response(StatusCode::OK, ()) +} + +async fn handle_node_configure(mut req: Request) -> Result, ApiError> { + let node_id: NodeId = parse_request_param(&req, "node_id")?; + let config_req = json_request::(&mut req).await?; + if node_id != config_req.node_id { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "Path and body node_id differ" + ))); + } + let state = get_state(&req); + + json_response(StatusCode::OK, state.service.node_configure(config_req)?) +} + +async fn handle_tenant_shard_migrate(mut req: Request) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?; + let migrate_req = json_request::(&mut req).await?; + let state = get_state(&req); + json_response( + StatusCode::OK, + state + .service + .tenant_shard_migrate(tenant_shard_id, migrate_req) + .await?, + ) +} + +/// Status endpoint is just used for checking that our HTTP listener is up +async fn handle_status(_req: Request) -> Result, ApiError> { + json_response(StatusCode::OK, ()) +} + +impl From for ApiError { + fn from(value: ReconcileError) -> Self { + ApiError::Conflict(format!("Reconciliation error: {}", value)) + } +} + +pub fn make_router( + service: Arc, + auth: Option>, +) -> RouterBuilder { + let mut router = endpoint::make_router(); + if auth.is_some() { + router = router.middleware(auth_middleware(|request| { + let state = get_state(request); + if state.allowlist_routes.contains(request.uri()) { + None + } else { + state.auth.as_deref() + } + })) + } + + router + .data(Arc::new(HttpState::new(service, auth))) + .get("/status", |r| request_span(r, handle_status)) + .post("/re-attach", |r| request_span(r, handle_re_attach)) + .post("/validate", |r| request_span(r, handle_validate)) + .post("/attach-hook", |r| request_span(r, handle_attach_hook)) + .post("/inspect", |r| request_span(r, handle_inspect)) + .post("/node", |r| request_span(r, handle_node_register)) + .put("/node/:node_id/config", |r| { + request_span(r, handle_node_configure) + }) + .post("/tenant", |r| request_span(r, handle_tenant_create)) + .post("/tenant/:tenant_id/timeline", |r| { + request_span(r, handle_tenant_timeline_create) + }) + .get("/tenant/:tenant_id/locate", |r| { + request_span(r, handle_tenant_locate) + }) + .put("/tenant/:tenant_shard_id/migrate", |r| { + request_span(r, handle_tenant_shard_migrate) + }) +} diff --git a/control_plane/attachment_service/src/lib.rs b/control_plane/attachment_service/src/lib.rs new file mode 100644 index 0000000000..a6bc71ecc0 --- /dev/null +++ b/control_plane/attachment_service/src/lib.rs @@ -0,0 +1,49 @@ +use utils::seqwait::MonotonicCounter; + +mod compute_hook; +pub mod http; +mod node; +mod reconciler; +mod scheduler; +pub mod service; +mod tenant_state; + +#[derive(Clone)] +enum PlacementPolicy { + /// Cheapest way to attach a tenant: just one pageserver, no secondary + Single, + /// Production-ready way to attach a tenant: one attached pageserver and + /// some number of secondaries. + Double(usize), +} + +#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone)] +struct Sequence(u64); + +impl std::fmt::Display for Sequence { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl MonotonicCounter for Sequence { + fn cnt_advance(&mut self, v: Sequence) { + assert!(*self <= v); + *self = v; + } + fn cnt_value(&self) -> Sequence { + *self + } +} + +impl Sequence { + fn next(&self) -> Sequence { + Sequence(self.0 + 1) + } +} + +impl Default for PlacementPolicy { + fn default() -> Self { + PlacementPolicy::Double(1) + } +} diff --git a/control_plane/attachment_service/src/main.rs b/control_plane/attachment_service/src/main.rs new file mode 100644 index 0000000000..de39e0b59e --- /dev/null +++ b/control_plane/attachment_service/src/main.rs @@ -0,0 +1,87 @@ +/// The attachment service mimics the aspects of the control plane API +/// that are required for a pageserver to operate. +/// +/// This enables running & testing pageservers without a full-blown +/// deployment of the Neon cloud platform. +/// +use anyhow::anyhow; +use attachment_service::http::make_router; +use attachment_service::service::{Config, Service}; +use clap::Parser; +use std::path::PathBuf; +use std::sync::Arc; +use utils::auth::{JwtAuth, SwappableJwtAuth}; +use utils::logging::{self, LogFormat}; +use utils::signals::{ShutdownSignals, Signal}; + +use utils::tcp_listener; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +#[command(arg_required_else_help(true))] +struct Cli { + /// Host and port to listen on, like `127.0.0.1:1234` + #[arg(short, long)] + listen: std::net::SocketAddr, + + /// Path to public key for JWT authentication of clients + #[arg(long)] + public_key: Option, + + /// Token for authentication this service with the pageservers it controls + #[arg(short, long)] + jwt_token: Option, + + /// Path to the .json file to store state (will be created if it doesn't exist) + #[arg(short, long)] + path: PathBuf, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + logging::init( + LogFormat::Plain, + logging::TracingErrorLayerEnablement::Disabled, + logging::Output::Stdout, + )?; + + let args = Cli::parse(); + tracing::info!( + "Starting, state at {}, listening on {}", + args.path.to_string_lossy(), + args.listen + ); + + let config = Config { + jwt_token: args.jwt_token, + }; + let service = Service::spawn(config); + + let http_listener = tcp_listener::bind(args.listen)?; + + let auth = if let Some(public_key_path) = &args.public_key { + let jwt_auth = JwtAuth::from_key_path(public_key_path)?; + Some(Arc::new(SwappableJwtAuth::new(jwt_auth))) + } else { + None + }; + let router = make_router(service, auth) + .build() + .map_err(|err| anyhow!(err))?; + let service = utils::http::RouterService::new(router).unwrap(); + let server = hyper::Server::from_tcp(http_listener)?.serve(service); + + tracing::info!("Serving on {0}", args.listen); + + tokio::task::spawn(server); + + ShutdownSignals::handle(|signal| match signal { + Signal::Interrupt | Signal::Terminate | Signal::Quit => { + tracing::info!("Got {}. Terminating", signal.name()); + // We're just a test helper: no graceful shutdown. + std::process::exit(0); + } + })?; + + Ok(()) +} diff --git a/control_plane/attachment_service/src/node.rs b/control_plane/attachment_service/src/node.rs new file mode 100644 index 0000000000..efd3f8f49b --- /dev/null +++ b/control_plane/attachment_service/src/node.rs @@ -0,0 +1,37 @@ +use control_plane::attachment_service::{NodeAvailability, NodeSchedulingPolicy}; +use utils::id::NodeId; + +#[derive(Clone)] +pub(crate) struct Node { + pub(crate) id: NodeId, + + pub(crate) availability: NodeAvailability, + pub(crate) scheduling: NodeSchedulingPolicy, + + pub(crate) listen_http_addr: String, + pub(crate) listen_http_port: u16, + + pub(crate) listen_pg_addr: String, + pub(crate) listen_pg_port: u16, +} + +impl Node { + pub(crate) fn base_url(&self) -> String { + format!("http://{}:{}", self.listen_http_addr, self.listen_http_port) + } + + /// Is this node elegible to have work scheduled onto it? + pub(crate) fn may_schedule(&self) -> bool { + match self.availability { + NodeAvailability::Active => {} + NodeAvailability::Offline => return false, + } + + match self.scheduling { + NodeSchedulingPolicy::Active => true, + NodeSchedulingPolicy::Draining => false, + NodeSchedulingPolicy::Filling => true, + NodeSchedulingPolicy::Pause => false, + } + } +} diff --git a/control_plane/attachment_service/src/reconciler.rs b/control_plane/attachment_service/src/reconciler.rs new file mode 100644 index 0000000000..628b7b16a4 --- /dev/null +++ b/control_plane/attachment_service/src/reconciler.rs @@ -0,0 +1,473 @@ +use crate::service; +use control_plane::attachment_service::NodeAvailability; +use pageserver_api::models::{ + LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, +}; +use pageserver_api::shard::{ShardIdentity, TenantShardId}; +use pageserver_client::mgmt_api; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use tokio_util::sync::CancellationToken; +use utils::generation::Generation; +use utils::id::{NodeId, TimelineId}; +use utils::lsn::Lsn; + +use crate::compute_hook::ComputeHook; +use crate::node::Node; +use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation}; + +/// Object with the lifetime of the background reconcile task that is created +/// for tenants which have a difference between their intent and observed states. +pub(super) struct Reconciler { + /// See [`crate::tenant_state::TenantState`] for the meanings of these fields: they are a snapshot + /// of a tenant's state from when we spawned a reconcile task. + pub(super) tenant_shard_id: TenantShardId, + pub(crate) shard: ShardIdentity, + pub(crate) generation: Generation, + pub(crate) intent: IntentState, + pub(crate) config: TenantConfig, + pub(crate) observed: ObservedState, + + pub(crate) service_config: service::Config, + + /// A snapshot of the pageservers as they were when we were asked + /// to reconcile. + pub(crate) pageservers: Arc>, + + /// A hook to notify the running postgres instances when we change the location + /// of a tenant + pub(crate) compute_hook: Arc, + + /// A means to abort background reconciliation: it is essential to + /// call this when something changes in the original TenantState that + /// will make this reconciliation impossible or unnecessary, for + /// example when a pageserver node goes offline, or the PlacementPolicy for + /// the tenant is changed. + pub(crate) cancel: CancellationToken, +} + +#[derive(thiserror::Error, Debug)] +pub enum ReconcileError { + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +impl Reconciler { + async fn location_config( + &mut self, + node_id: NodeId, + config: LocationConfig, + flush_ms: Option, + ) -> anyhow::Result<()> { + let node = self + .pageservers + .get(&node_id) + .expect("Pageserver may not be removed while referenced"); + + self.observed + .locations + .insert(node.id, ObservedStateLocation { conf: None }); + + let client = + mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref()); + client + .location_config(self.tenant_shard_id, config.clone(), flush_ms) + .await?; + + self.observed + .locations + .insert(node.id, ObservedStateLocation { conf: Some(config) }); + + Ok(()) + } + + async fn maybe_live_migrate(&mut self) -> Result<(), ReconcileError> { + let destination = if let Some(node_id) = self.intent.attached { + match self.observed.locations.get(&node_id) { + Some(conf) => { + // We will do a live migration only if the intended destination is not + // currently in an attached state. + match &conf.conf { + Some(conf) if conf.mode == LocationConfigMode::Secondary => { + // Fall through to do a live migration + node_id + } + None | Some(_) => { + // Attached or uncertain: don't do a live migration, proceed + // with a general-case reconciliation + tracing::info!("maybe_live_migrate: destination is None or attached"); + return Ok(()); + } + } + } + None => { + // Our destination is not attached: maybe live migrate if some other + // node is currently attached. Fall through. + node_id + } + } + } else { + // No intent to be attached + tracing::info!("maybe_live_migrate: no attached intent"); + return Ok(()); + }; + + let mut origin = None; + for (node_id, state) in &self.observed.locations { + if let Some(observed_conf) = &state.conf { + if observed_conf.mode == LocationConfigMode::AttachedSingle { + let node = self + .pageservers + .get(node_id) + .expect("Nodes may not be removed while referenced"); + // We will only attempt live migration if the origin is not offline: this + // avoids trying to do it while reconciling after responding to an HA failover. + if !matches!(node.availability, NodeAvailability::Offline) { + origin = Some(*node_id); + break; + } + } + } + } + + let Some(origin) = origin else { + tracing::info!("maybe_live_migrate: no origin found"); + return Ok(()); + }; + + // We have an origin and a destination: proceed to do the live migration + tracing::info!("Live migrating {}->{}", origin, destination); + self.live_migrate(origin, destination).await?; + + Ok(()) + } + + async fn get_lsns( + &self, + tenant_shard_id: TenantShardId, + node_id: &NodeId, + ) -> anyhow::Result> { + let node = self + .pageservers + .get(node_id) + .expect("Pageserver may not be removed while referenced"); + + let client = + mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref()); + + let timelines = client.timeline_list(&tenant_shard_id).await?; + Ok(timelines + .into_iter() + .map(|t| (t.timeline_id, t.last_record_lsn)) + .collect()) + } + + async fn secondary_download(&self, tenant_shard_id: TenantShardId, node_id: &NodeId) { + let node = self + .pageservers + .get(node_id) + .expect("Pageserver may not be removed while referenced"); + + let client = + mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref()); + + match client.tenant_secondary_download(tenant_shard_id).await { + Ok(()) => {} + Err(_) => { + tracing::info!(" (skipping, destination wasn't in secondary mode)") + } + } + } + + async fn await_lsn( + &self, + tenant_shard_id: TenantShardId, + pageserver_id: &NodeId, + baseline: HashMap, + ) -> anyhow::Result<()> { + loop { + let latest = match self.get_lsns(tenant_shard_id, pageserver_id).await { + Ok(l) => l, + Err(e) => { + println!( + "🕑 Can't get LSNs on pageserver {} yet, waiting ({e})", + pageserver_id + ); + std::thread::sleep(Duration::from_millis(500)); + continue; + } + }; + + let mut any_behind: bool = false; + for (timeline_id, baseline_lsn) in &baseline { + match latest.get(timeline_id) { + Some(latest_lsn) => { + println!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}"); + if latest_lsn < baseline_lsn { + any_behind = true; + } + } + None => { + // Expected timeline isn't yet visible on migration destination. + // (IRL we would have to account for timeline deletion, but this + // is just test helper) + any_behind = true; + } + } + } + + if !any_behind { + println!("✅ LSN caught up. Proceeding..."); + break; + } else { + std::thread::sleep(Duration::from_millis(500)); + } + } + + Ok(()) + } + + pub async fn live_migrate( + &mut self, + origin_ps_id: NodeId, + dest_ps_id: NodeId, + ) -> anyhow::Result<()> { + // `maybe_live_migrate` is responsibble for sanity of inputs + assert!(origin_ps_id != dest_ps_id); + + fn build_location_config( + shard: &ShardIdentity, + config: &TenantConfig, + mode: LocationConfigMode, + generation: Option, + secondary_conf: Option, + ) -> LocationConfig { + LocationConfig { + mode, + generation: generation.map(|g| g.into().unwrap()), + secondary_conf, + tenant_conf: config.clone(), + shard_number: shard.number.0, + shard_count: shard.count.0, + shard_stripe_size: shard.stripe_size.0, + } + } + + tracing::info!( + "🔁 Switching origin pageserver {} to stale mode", + origin_ps_id + ); + + // FIXME: it is incorrect to use self.generation here, we should use the generation + // from the ObservedState of the origin pageserver (it might be older than self.generation) + let stale_conf = build_location_config( + &self.shard, + &self.config, + LocationConfigMode::AttachedStale, + Some(self.generation), + None, + ); + self.location_config(origin_ps_id, stale_conf, Some(Duration::from_secs(10))) + .await?; + + let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps_id).await?); + + tracing::info!( + "🔁 Downloading latest layers to destination pageserver {}", + dest_ps_id, + ); + self.secondary_download(self.tenant_shard_id, &dest_ps_id) + .await; + + // Increment generation before attaching to new pageserver + self.generation = self.generation.next(); + + let dest_conf = build_location_config( + &self.shard, + &self.config, + LocationConfigMode::AttachedMulti, + Some(self.generation), + None, + ); + + tracing::info!("🔁 Attaching to pageserver {}", dest_ps_id); + self.location_config(dest_ps_id, dest_conf, None).await?; + + if let Some(baseline) = baseline_lsns { + tracing::info!("🕑 Waiting for LSN to catch up..."); + self.await_lsn(self.tenant_shard_id, &dest_ps_id, baseline) + .await?; + } + + tracing::info!("🔁 Notifying compute to use pageserver {}", dest_ps_id); + self.compute_hook + .notify(self.tenant_shard_id, dest_ps_id) + .await?; + + // Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Single, then + // this location will be deleted in the general case reconciliation that runs after this. + let origin_secondary_conf = build_location_config( + &self.shard, + &self.config, + LocationConfigMode::Secondary, + None, + Some(LocationConfigSecondary { warm: true }), + ); + self.location_config(origin_ps_id, origin_secondary_conf.clone(), None) + .await?; + // TODO: we should also be setting the ObservedState on earlier API calls, in case we fail + // partway through. In fact, all location conf API calls should be in a wrapper that sets + // the observed state to None, then runs, then sets it to what we wrote. + self.observed.locations.insert( + origin_ps_id, + ObservedStateLocation { + conf: Some(origin_secondary_conf), + }, + ); + + println!( + "🔁 Switching to AttachedSingle mode on pageserver {}", + dest_ps_id + ); + let dest_final_conf = build_location_config( + &self.shard, + &self.config, + LocationConfigMode::AttachedSingle, + Some(self.generation), + None, + ); + self.location_config(dest_ps_id, dest_final_conf.clone(), None) + .await?; + self.observed.locations.insert( + dest_ps_id, + ObservedStateLocation { + conf: Some(dest_final_conf), + }, + ); + + println!("✅ Migration complete"); + + Ok(()) + } + + /// Reconciling a tenant makes API calls to pageservers until the observed state + /// matches the intended state. + /// + /// First we apply special case handling (e.g. for live migrations), and then a + /// general case reconciliation where we walk through the intent by pageserver + /// and call out to the pageserver to apply the desired state. + pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> { + // TODO: if any of self.observed is None, call to remote pageservers + // to learn correct state. + + // Special case: live migration + self.maybe_live_migrate().await?; + + // If the attached pageserver is not attached, do so now. + if let Some(node_id) = self.intent.attached { + let mut wanted_conf = + attached_location_conf(self.generation, &self.shard, &self.config); + match self.observed.locations.get(&node_id) { + Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => { + // Nothing to do + tracing::info!("Observed configuration already correct.") + } + Some(_) | None => { + // If there is no observed configuration, or if its value does not equal our intent, then we must call out to the pageserver. + self.generation = self.generation.next(); + wanted_conf.generation = self.generation.into(); + tracing::info!("Observed configuration requires update."); + self.location_config(node_id, wanted_conf, None).await?; + if let Err(e) = self + .compute_hook + .notify(self.tenant_shard_id, node_id) + .await + { + tracing::warn!( + "Failed to notify compute of newly attached pageserver {node_id}: {e}" + ); + } + } + } + } + + // Configure secondary locations: if these were previously attached this + // implicitly downgrades them from attached to secondary. + let mut changes = Vec::new(); + for node_id in &self.intent.secondary { + let wanted_conf = secondary_location_conf(&self.shard, &self.config); + match self.observed.locations.get(node_id) { + Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => { + // Nothing to do + tracing::info!(%node_id, "Observed configuration already correct.") + } + Some(_) | None => { + // If there is no observed configuration, or if its value does not equal our intent, then we must call out to the pageserver. + tracing::info!(%node_id, "Observed configuration requires update."); + changes.push((*node_id, wanted_conf)) + } + } + } + + // Detach any extraneous pageservers that are no longer referenced + // by our intent. + let all_pageservers = self.intent.all_pageservers(); + for node_id in self.observed.locations.keys() { + if all_pageservers.contains(node_id) { + // We are only detaching pageservers that aren't used at all. + continue; + } + + changes.push(( + *node_id, + LocationConfig { + mode: LocationConfigMode::Detached, + generation: None, + secondary_conf: None, + shard_number: self.shard.number.0, + shard_count: self.shard.count.0, + shard_stripe_size: self.shard.stripe_size.0, + tenant_conf: self.config.clone(), + }, + )); + } + + for (node_id, conf) in changes { + self.location_config(node_id, conf, None).await?; + } + + Ok(()) + } +} + +pub(crate) fn attached_location_conf( + generation: Generation, + shard: &ShardIdentity, + config: &TenantConfig, +) -> LocationConfig { + LocationConfig { + mode: LocationConfigMode::AttachedSingle, + generation: generation.into(), + secondary_conf: None, + shard_number: shard.number.0, + shard_count: shard.count.0, + shard_stripe_size: shard.stripe_size.0, + tenant_conf: config.clone(), + } +} + +pub(crate) fn secondary_location_conf( + shard: &ShardIdentity, + config: &TenantConfig, +) -> LocationConfig { + LocationConfig { + mode: LocationConfigMode::Secondary, + generation: None, + secondary_conf: Some(LocationConfigSecondary { warm: true }), + shard_number: shard.number.0, + shard_count: shard.count.0, + shard_stripe_size: shard.stripe_size.0, + tenant_conf: config.clone(), + } +} diff --git a/control_plane/attachment_service/src/scheduler.rs b/control_plane/attachment_service/src/scheduler.rs new file mode 100644 index 0000000000..1966a7ea2a --- /dev/null +++ b/control_plane/attachment_service/src/scheduler.rs @@ -0,0 +1,89 @@ +use pageserver_api::shard::TenantShardId; +use std::collections::{BTreeMap, HashMap}; +use utils::{http::error::ApiError, id::NodeId}; + +use crate::{node::Node, tenant_state::TenantState}; + +/// Scenarios in which we cannot find a suitable location for a tenant shard +#[derive(thiserror::Error, Debug)] +pub enum ScheduleError { + #[error("No pageservers found")] + NoPageservers, + #[error("No pageserver found matching constraint")] + ImpossibleConstraint, +} + +impl From for ApiError { + fn from(value: ScheduleError) -> Self { + ApiError::Conflict(format!("Scheduling error: {}", value)) + } +} + +pub(crate) struct Scheduler { + tenant_counts: HashMap, +} + +impl Scheduler { + pub(crate) fn new( + tenants: &BTreeMap, + nodes: &HashMap, + ) -> Self { + let mut tenant_counts = HashMap::new(); + for node_id in nodes.keys() { + tenant_counts.insert(*node_id, 0); + } + + for tenant in tenants.values() { + if let Some(ps) = tenant.intent.attached { + let entry = tenant_counts.entry(ps).or_insert(0); + *entry += 1; + } + } + + for (node_id, node) in nodes { + if !node.may_schedule() { + tenant_counts.remove(node_id); + } + } + + Self { tenant_counts } + } + + pub(crate) fn schedule_shard( + &mut self, + hard_exclude: &[NodeId], + ) -> Result { + if self.tenant_counts.is_empty() { + return Err(ScheduleError::NoPageservers); + } + + let mut tenant_counts: Vec<(NodeId, usize)> = self + .tenant_counts + .iter() + .filter_map(|(k, v)| { + if hard_exclude.contains(k) { + None + } else { + Some((*k, *v)) + } + }) + .collect(); + + // Sort by tenant count. Nodes with the same tenant count are sorted by ID. + tenant_counts.sort_by_key(|i| (i.1, i.0)); + + if tenant_counts.is_empty() { + // After applying constraints, no pageservers were left + return Err(ScheduleError::ImpossibleConstraint); + } + + for (node_id, count) in &tenant_counts { + tracing::info!("tenant_counts[{node_id}]={count}"); + } + + let node_id = tenant_counts.first().unwrap().0; + tracing::info!("scheduler selected node {node_id}"); + *self.tenant_counts.get_mut(&node_id).unwrap() += 1; + Ok(node_id) + } +} diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs new file mode 100644 index 0000000000..5cf9aeb6fe --- /dev/null +++ b/control_plane/attachment_service/src/service.rs @@ -0,0 +1,760 @@ +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, + time::{Duration, Instant}, +}; + +use control_plane::attachment_service::{ + AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, NodeAvailability, + NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, TenantCreateResponse, + TenantCreateResponseShard, TenantLocateResponse, TenantLocateResponseShard, + TenantShardMigrateRequest, TenantShardMigrateResponse, +}; +use pageserver_api::{ + control_api::{ + ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, + ValidateResponse, ValidateResponseTenant, + }, + models::{ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo}, + shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId}, +}; +use pageserver_client::mgmt_api; +use utils::{ + generation::Generation, + http::error::ApiError, + id::{NodeId, TenantId}, +}; + +use crate::{ + compute_hook::ComputeHook, + node::Node, + scheduler::Scheduler, + tenant_state::{ReconcileResult, ReconcilerWaiter, TenantState}, + PlacementPolicy, +}; + +const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); + +// Top level state available to all HTTP handlers +struct ServiceState { + tenants: BTreeMap, + + nodes: Arc>, + + compute_hook: Arc, + + result_tx: tokio::sync::mpsc::UnboundedSender, +} + +impl ServiceState { + fn new(result_tx: tokio::sync::mpsc::UnboundedSender) -> Self { + Self { + tenants: BTreeMap::new(), + nodes: Arc::new(HashMap::new()), + compute_hook: Arc::new(ComputeHook::new()), + result_tx, + } + } +} + +#[derive(Clone)] +pub struct Config { + // All pageservers managed by one instance of this service must have + // the same public key. + pub jwt_token: Option, +} + +pub struct Service { + inner: Arc>, + config: Config, +} + +impl Service { + pub fn spawn(config: Config) -> Arc { + let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel(); + + let this = Arc::new(Self { + inner: Arc::new(std::sync::RwLock::new(ServiceState::new(result_tx))), + config, + }); + + let result_task_this = this.clone(); + tokio::task::spawn(async move { + while let Some(result) = result_rx.recv().await { + tracing::info!( + "Reconcile result for sequence {}, ok={}", + result.sequence, + result.result.is_ok() + ); + let mut locked = result_task_this.inner.write().unwrap(); + if let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) { + tenant.generation = result.generation; + match result.result { + Ok(()) => { + for (node_id, loc) in &result.observed.locations { + if let Some(conf) = &loc.conf { + tracing::info!( + "Updating observed location {}: {:?}", + node_id, + conf + ); + } else { + tracing::info!("Setting observed location {} to None", node_id,) + } + } + tenant.observed = result.observed; + tenant.waiter.advance(result.sequence); + } + Err(e) => { + // TODO: some observability, record on teh tenant its last reconcile error + tracing::warn!( + "Reconcile error on tenant {}: {}", + tenant.tenant_shard_id, + e + ); + + for (node_id, o) in result.observed.locations { + tenant.observed.locations.insert(node_id, o); + } + } + } + } + } + }); + + this + } + + pub(crate) fn attach_hook(&self, attach_req: AttachHookRequest) -> AttachHookResponse { + let mut locked = self.inner.write().unwrap(); + + let tenant_state = locked + .tenants + .entry(attach_req.tenant_shard_id) + .or_insert_with(|| { + TenantState::new( + attach_req.tenant_shard_id, + ShardIdentity::unsharded(), + PlacementPolicy::Single, + ) + }); + + if let Some(attaching_pageserver) = attach_req.node_id.as_ref() { + tenant_state.generation = tenant_state.generation.next(); + tracing::info!( + tenant_id = %attach_req.tenant_shard_id, + ps_id = %attaching_pageserver, + generation = ?tenant_state.generation, + "issuing", + ); + } else if let Some(ps_id) = tenant_state.intent.attached { + tracing::info!( + tenant_id = %attach_req.tenant_shard_id, + %ps_id, + generation = ?tenant_state.generation, + "dropping", + ); + } else { + tracing::info!( + tenant_id = %attach_req.tenant_shard_id, + "no-op: tenant already has no pageserver"); + } + tenant_state.intent.attached = attach_req.node_id; + let generation = tenant_state.generation; + + tracing::info!( + "attach_hook: tenant {} set generation {:?}, pageserver {}", + attach_req.tenant_shard_id, + tenant_state.generation, + attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff)) + ); + + AttachHookResponse { + gen: attach_req.node_id.map(|_| generation.into().unwrap()), + } + } + + pub(crate) fn inspect(&self, inspect_req: InspectRequest) -> InspectResponse { + let locked = self.inner.read().unwrap(); + + let tenant_state = locked.tenants.get(&inspect_req.tenant_shard_id); + + InspectResponse { + attachment: tenant_state.and_then(|s| { + s.intent + .attached + .map(|ps| (s.generation.into().unwrap(), ps)) + }), + } + } + + pub(crate) fn re_attach(&self, reattach_req: ReAttachRequest) -> ReAttachResponse { + let mut locked = self.inner.write().unwrap(); + + let mut response = ReAttachResponse { + tenants: Vec::new(), + }; + for (t, state) in &mut locked.tenants { + if state.intent.attached == Some(reattach_req.node_id) { + state.generation = state.generation.next(); + response.tenants.push(ReAttachResponseTenant { + id: *t, + gen: state.generation.into().unwrap(), + }); + } + } + response + } + + pub(crate) fn validate(&self, validate_req: ValidateRequest) -> ValidateResponse { + let locked = self.inner.read().unwrap(); + + let mut response = ValidateResponse { + tenants: Vec::new(), + }; + + for req_tenant in validate_req.tenants { + if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) { + let valid = tenant_state.generation == Generation::new(req_tenant.gen); + tracing::info!( + "handle_validate: {}(gen {}): valid={valid} (latest {:?})", + req_tenant.id, + req_tenant.gen, + tenant_state.generation + ); + response.tenants.push(ValidateResponseTenant { + id: req_tenant.id, + valid, + }); + } + } + response + } + + pub(crate) async fn tenant_create( + &self, + create_req: TenantCreateRequest, + ) -> Result { + let (waiters, response_shards) = { + let mut locked = self.inner.write().unwrap(); + + tracing::info!( + "Creating tenant {}, shard_count={:?}, have {} pageservers", + create_req.new_tenant_id, + create_req.shard_parameters.count, + locked.nodes.len() + ); + + // This service expects to handle sharding itself: it is an error to try and directly create + // a particular shard here. + let tenant_id = if create_req.new_tenant_id.shard_count > ShardCount(1) { + return Err(ApiError::BadRequest(anyhow::anyhow!( + "Attempted to create a specific shard, this API is for creating the whole tenant" + ))); + } else { + create_req.new_tenant_id.tenant_id + }; + + // Shard count 0 is valid: it means create a single shard (ShardCount(0) means "unsharded") + let literal_shard_count = if create_req.shard_parameters.is_unsharded() { + 1 + } else { + create_req.shard_parameters.count.0 + }; + + let mut response_shards = Vec::new(); + + let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes); + + for i in 0..literal_shard_count { + let shard_number = ShardNumber(i); + + let tenant_shard_id = TenantShardId { + tenant_id, + shard_number, + shard_count: create_req.shard_parameters.count, + }; + tracing::info!("Creating shard {tenant_shard_id}..."); + + use std::collections::btree_map::Entry; + match locked.tenants.entry(tenant_shard_id) { + Entry::Occupied(mut entry) => { + tracing::info!( + "Tenant shard {tenant_shard_id} already exists while creating" + ); + + // TODO: schedule() should take an anti-affinity expression that pushes + // attached and secondary locations (independently) away frorm those + // pageservers also holding a shard for this tenant. + + entry.get_mut().schedule(&mut scheduler).map_err(|e| { + ApiError::Conflict(format!( + "Failed to schedule shard {tenant_shard_id}: {e}" + )) + })?; + + response_shards.push(TenantCreateResponseShard { + node_id: entry + .get() + .intent + .attached + .expect("We just set pageserver if it was None"), + generation: entry.get().generation.into().unwrap(), + }); + + continue; + } + Entry::Vacant(entry) => { + let mut state = TenantState::new( + tenant_shard_id, + ShardIdentity::from_params(shard_number, &create_req.shard_parameters), + PlacementPolicy::Single, + ); + + if let Some(create_gen) = create_req.generation { + state.generation = Generation::new(create_gen); + } + state.config = create_req.config.clone(); + + state.schedule(&mut scheduler).map_err(|e| { + ApiError::Conflict(format!( + "Failed to schedule shard {tenant_shard_id}: {e}" + )) + })?; + + response_shards.push(TenantCreateResponseShard { + node_id: state + .intent + .attached + .expect("We just set pageserver if it was None"), + generation: state.generation.into().unwrap(), + }); + entry.insert(state) + } + }; + } + + // Take a snapshot of pageservers + let pageservers = locked.nodes.clone(); + + let mut waiters = Vec::new(); + let result_tx = locked.result_tx.clone(); + let compute_hook = locked.compute_hook.clone(); + + for (_tenant_shard_id, shard) in locked + .tenants + .range_mut(TenantShardId::tenant_range(tenant_id)) + { + if let Some(waiter) = shard.maybe_reconcile( + result_tx.clone(), + &pageservers, + &compute_hook, + &self.config, + ) { + waiters.push(waiter); + } + } + (waiters, response_shards) + }; + + let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap(); + for waiter in waiters { + let timeout = deadline.duration_since(Instant::now()); + waiter.wait_timeout(timeout).await.map_err(|_| { + ApiError::Timeout( + format!( + "Timeout waiting for reconciliation of tenant shard {}", + waiter.tenant_shard_id + ) + .into(), + ) + })?; + } + Ok(TenantCreateResponse { + shards: response_shards, + }) + } + + pub(crate) async fn tenant_timeline_create( + &self, + tenant_id: TenantId, + mut create_req: TimelineCreateRequest, + ) -> Result { + let mut timeline_info = None; + let ensure_waiters = { + let locked = self.inner.write().unwrap(); + + tracing::info!( + "Creating timeline {}/{}, have {} pageservers", + tenant_id, + create_req.new_timeline_id, + locked.nodes.len() + ); + + self.ensure_attached(locked, tenant_id) + .map_err(ApiError::InternalServerError)? + }; + + let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap(); + for waiter in ensure_waiters { + let timeout = deadline.duration_since(Instant::now()); + waiter.wait_timeout(timeout).await.map_err(|_| { + ApiError::Timeout( + format!( + "Timeout waiting for reconciliation of tenant shard {}", + waiter.tenant_shard_id + ) + .into(), + ) + })?; + } + + let targets = { + let locked = self.inner.read().unwrap(); + let mut targets = Vec::new(); + + for (tenant_shard_id, shard) in + locked.tenants.range(TenantShardId::tenant_range(tenant_id)) + { + let node_id = shard.intent.attached.ok_or_else(|| { + ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled")) + })?; + let node = locked + .nodes + .get(&node_id) + .expect("Pageservers may not be deleted while referenced"); + + targets.push((*tenant_shard_id, node.clone())); + } + targets + }; + + if targets.is_empty() { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant not found").into(), + )); + } + + for (tenant_shard_id, node) in targets { + // TODO: issue shard timeline creates in parallel, once the 0th is done. + + let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref()); + + let shard_timeline_info = client + .timeline_create(tenant_shard_id, &create_req) + .await + .map_err(|e| ApiError::Conflict(format!("Failed to create timeline: {e}")))?; + + if timeline_info.is_none() { + // If the caller specified an ancestor but no ancestor LSN, we are responsible for + // propagating the LSN chosen by the first shard to the other shards: it is important + // that all shards end up with the same ancestor_start_lsn. + if create_req.ancestor_timeline_id.is_some() + && create_req.ancestor_start_lsn.is_none() + { + create_req.ancestor_start_lsn = shard_timeline_info.ancestor_lsn; + } + + // We will return the TimelineInfo from the first shard + timeline_info = Some(shard_timeline_info); + } + } + Ok(timeline_info.expect("targets cannot be empty")) + } + + pub(crate) fn tenant_locate( + &self, + tenant_id: TenantId, + ) -> Result { + let locked = self.inner.read().unwrap(); + tracing::info!("Locating shards for tenant {tenant_id}"); + + // Take a snapshot of pageservers + let pageservers = locked.nodes.clone(); + + let mut result = Vec::new(); + let mut shard_params: Option = None; + + for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) + { + let node_id = shard + .intent + .attached + .ok_or(ApiError::BadRequest(anyhow::anyhow!( + "Cannot locate a tenant that is not attached" + )))?; + + let node = pageservers + .get(&node_id) + .expect("Pageservers may not be deleted while referenced"); + + result.push(TenantLocateResponseShard { + shard_id: *tenant_shard_id, + node_id, + listen_http_addr: node.listen_http_addr.clone(), + listen_http_port: node.listen_http_port, + listen_pg_addr: node.listen_pg_addr.clone(), + listen_pg_port: node.listen_pg_port, + }); + + match &shard_params { + None => { + shard_params = Some(ShardParameters { + stripe_size: Some(shard.shard.stripe_size), + count: shard.shard.count, + }); + } + Some(params) => { + if params.stripe_size != Some(shard.shard.stripe_size) { + // This should never happen. We enforce at runtime because it's simpler than + // adding an extra per-tenant data structure to store the things that should be the same + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Inconsistent shard stripe size parameters!" + ))); + } + } + } + } + + if result.is_empty() { + return Err(ApiError::NotFound( + anyhow::anyhow!("No shards for this tenant ID found").into(), + )); + } + let shard_params = shard_params.expect("result is non-empty, therefore this is set"); + tracing::info!( + "Located tenant {} with params {:?} on shards {}", + tenant_id, + shard_params, + result + .iter() + .map(|s| format!("{:?}", s)) + .collect::>() + .join(",") + ); + + Ok(TenantLocateResponse { + shards: result, + shard_params, + }) + } + + pub(crate) async fn tenant_shard_migrate( + &self, + tenant_shard_id: TenantShardId, + migrate_req: TenantShardMigrateRequest, + ) -> Result { + let waiter = { + let mut locked = self.inner.write().unwrap(); + + let result_tx = locked.result_tx.clone(); + let pageservers = locked.nodes.clone(); + let compute_hook = locked.compute_hook.clone(); + + let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant shard not found").into(), + )); + }; + + if shard.intent.attached == Some(migrate_req.node_id) { + // No-op case: we will still proceed to wait for reconciliation in case it is + // incomplete from an earlier update to the intent. + tracing::info!("Migrating: intent is unchanged {:?}", shard.intent); + } else { + let old_attached = shard.intent.attached; + + shard.intent.attached = Some(migrate_req.node_id); + match shard.policy { + PlacementPolicy::Single => { + shard.intent.secondary.clear(); + } + PlacementPolicy::Double(_n) => { + // If our new attached node was a secondary, it no longer should be. + shard.intent.secondary.retain(|s| s != &migrate_req.node_id); + + // If we were already attached to something, demote that to a secondary + if let Some(old_attached) = old_attached { + shard.intent.secondary.push(old_attached); + } + } + } + + tracing::info!("Migrating: new intent {:?}", shard.intent); + shard.sequence = shard.sequence.next(); + } + + shard.maybe_reconcile(result_tx, &pageservers, &compute_hook, &self.config) + }; + + if let Some(waiter) = waiter { + waiter + .wait_timeout(RECONCILE_TIMEOUT) + .await + .map_err(|e| ApiError::Timeout(format!("{}", e).into()))?; + } else { + tracing::warn!("Migration is a no-op"); + } + + Ok(TenantShardMigrateResponse {}) + } + + pub(crate) fn node_register(&self, register_req: NodeRegisterRequest) { + let mut locked = self.inner.write().unwrap(); + + let mut new_nodes = (*locked.nodes).clone(); + + new_nodes.insert( + register_req.node_id, + Node { + id: register_req.node_id, + listen_http_addr: register_req.listen_http_addr, + listen_http_port: register_req.listen_http_port, + listen_pg_addr: register_req.listen_pg_addr, + listen_pg_port: register_req.listen_pg_port, + scheduling: NodeSchedulingPolicy::Filling, + // TODO: we shouldn't really call this Active until we've heartbeated it. + availability: NodeAvailability::Active, + }, + ); + + locked.nodes = Arc::new(new_nodes); + + tracing::info!( + "Registered pageserver {}, now have {} pageservers", + register_req.node_id, + locked.nodes.len() + ); + } + + pub(crate) fn node_configure(&self, config_req: NodeConfigureRequest) -> Result<(), ApiError> { + let mut locked = self.inner.write().unwrap(); + let result_tx = locked.result_tx.clone(); + let compute_hook = locked.compute_hook.clone(); + + let mut new_nodes = (*locked.nodes).clone(); + + let Some(node) = new_nodes.get_mut(&config_req.node_id) else { + return Err(ApiError::NotFound( + anyhow::anyhow!("Node not registered").into(), + )); + }; + + let mut offline_transition = false; + let mut active_transition = false; + + if let Some(availability) = &config_req.availability { + match (availability, &node.availability) { + (NodeAvailability::Offline, NodeAvailability::Active) => { + tracing::info!("Node {} transition to offline", config_req.node_id); + offline_transition = true; + } + (NodeAvailability::Active, NodeAvailability::Offline) => { + tracing::info!("Node {} transition to active", config_req.node_id); + active_transition = true; + } + _ => { + tracing::info!("Node {} no change during config", config_req.node_id); + // No change + } + }; + node.availability = *availability; + } + + if let Some(scheduling) = config_req.scheduling { + node.scheduling = scheduling; + + // TODO: once we have a background scheduling ticker for fill/drain, kick it + // to wake up and start working. + } + + let new_nodes = Arc::new(new_nodes); + + let mut scheduler = Scheduler::new(&locked.tenants, &new_nodes); + if offline_transition { + for (tenant_shard_id, tenant_state) in &mut locked.tenants { + if let Some(observed_loc) = + tenant_state.observed.locations.get_mut(&config_req.node_id) + { + // When a node goes offline, we set its observed configuration to None, indicating unknown: we will + // not assume our knowledge of the node's configuration is accurate until it comes back online + observed_loc.conf = None; + } + + if tenant_state.intent.notify_offline(config_req.node_id) { + tenant_state.sequence = tenant_state.sequence.next(); + match tenant_state.schedule(&mut scheduler) { + Err(e) => { + // It is possible that some tenants will become unschedulable when too many pageservers + // go offline: in this case there isn't much we can do other than make the issue observable. + // TODO: give TenantState a scheduling error attribute to be queried later. + tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id); + } + Ok(()) => { + tenant_state.maybe_reconcile( + result_tx.clone(), + &new_nodes, + &compute_hook, + &self.config, + ); + } + } + } + } + } + + if active_transition { + // When a node comes back online, we must reconcile any tenant that has a None observed + // location on the node. + for tenant_state in locked.tenants.values_mut() { + if let Some(observed_loc) = + tenant_state.observed.locations.get_mut(&config_req.node_id) + { + if observed_loc.conf.is_none() { + tenant_state.maybe_reconcile( + result_tx.clone(), + &new_nodes, + &compute_hook, + &self.config, + ); + } + } + } + + // TODO: in the background, we should balance work back onto this pageserver + } + + locked.nodes = new_nodes; + + Ok(()) + } + + /// Helper for methods that will try and call pageserver APIs for + /// a tenant, such as timeline CRUD: they cannot proceed unless the tenant + /// is attached somewhere. + fn ensure_attached( + &self, + mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>, + tenant_id: TenantId, + ) -> Result, anyhow::Error> { + let mut waiters = Vec::new(); + let result_tx = locked.result_tx.clone(); + let compute_hook = locked.compute_hook.clone(); + let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes); + let pageservers = locked.nodes.clone(); + + for (_tenant_shard_id, shard) in locked + .tenants + .range_mut(TenantShardId::tenant_range(tenant_id)) + { + shard.schedule(&mut scheduler)?; + + if let Some(waiter) = + shard.maybe_reconcile(result_tx.clone(), &pageservers, &compute_hook, &self.config) + { + waiters.push(waiter); + } + } + Ok(waiters) + } +} diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs new file mode 100644 index 0000000000..72ec664197 --- /dev/null +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -0,0 +1,346 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use control_plane::attachment_service::NodeAvailability; +use pageserver_api::{ + models::{LocationConfig, TenantConfig}, + shard::{ShardIdentity, TenantShardId}, +}; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use utils::{ + generation::Generation, + id::NodeId, + seqwait::{SeqWait, SeqWaitError}, +}; + +use crate::{ + compute_hook::ComputeHook, + node::Node, + reconciler::{attached_location_conf, secondary_location_conf, ReconcileError, Reconciler}, + scheduler::{ScheduleError, Scheduler}, + service, PlacementPolicy, Sequence, +}; + +pub(crate) struct TenantState { + pub(crate) tenant_shard_id: TenantShardId, + + pub(crate) shard: ShardIdentity, + + pub(crate) sequence: Sequence, + + // Latest generation number: next time we attach, increment this + // and use the incremented number when attaching + pub(crate) generation: Generation, + + // High level description of how the tenant should be set up. Provided + // externally. + pub(crate) policy: PlacementPolicy, + + // Low level description of exactly which pageservers should fulfil + // which role. Generated by `Self::schedule`. + pub(crate) intent: IntentState, + + // Low level description of how the tenant is configured on pageservers: + // if this does not match `Self::intent` then the tenant needs reconciliation + // with `Self::reconcile`. + pub(crate) observed: ObservedState, + + pub(crate) config: TenantConfig, + + /// If a reconcile task is currently in flight, it may be joined here (it is + /// only safe to join if either the result has been received or the reconciler's + /// cancellation token has been fired) + pub(crate) reconciler: Option, + + /// Optionally wait for reconciliation to complete up to a particular + /// sequence number. + pub(crate) waiter: std::sync::Arc>, +} + +#[derive(Default, Clone, Debug)] +pub(crate) struct IntentState { + pub(crate) attached: Option, + pub(crate) secondary: Vec, +} + +#[derive(Default, Clone)] +pub(crate) struct ObservedState { + pub(crate) locations: HashMap, +} + +/// Our latest knowledge of how this tenant is configured in the outside world. +/// +/// Meaning: +/// * No instance of this type exists for a node: we are certain that we have nothing configured on that +/// node for this shard. +/// * Instance exists with conf==None: we *might* have some state on that node, but we don't know +/// what it is (e.g. we failed partway through configuring it) +/// * Instance exists with conf==Some: this tells us what we last successfully configured on this node, +/// and that configuration will still be present unless something external interfered. +#[derive(Clone)] +pub(crate) struct ObservedStateLocation { + /// If None, it means we do not know the status of this shard's location on this node, but + /// we know that we might have some state on this node. + pub(crate) conf: Option, +} +pub(crate) struct ReconcilerWaiter { + // For observability purposes, remember the ID of the shard we're + // waiting for. + pub(crate) tenant_shard_id: TenantShardId, + + seq_wait: std::sync::Arc>, + seq: Sequence, +} + +impl ReconcilerWaiter { + pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), SeqWaitError> { + self.seq_wait.wait_for_timeout(self.seq, timeout).await + } +} + +/// Having spawned a reconciler task, the tenant shard's state will carry enough +/// information to optionally cancel & await it later. +pub(crate) struct ReconcilerHandle { + sequence: Sequence, + handle: JoinHandle<()>, + cancel: CancellationToken, +} + +/// When a reconcile task completes, it sends this result object +/// to be applied to the primary TenantState. +pub(crate) struct ReconcileResult { + pub(crate) sequence: Sequence, + /// On errors, `observed` should be treated as an incompleted description + /// of state (i.e. any nodes present in the result should override nodes + /// present in the parent tenant state, but any unmentioned nodes should + /// not be removed from parent tenant state) + pub(crate) result: Result<(), ReconcileError>, + + pub(crate) tenant_shard_id: TenantShardId, + pub(crate) generation: Generation, + pub(crate) observed: ObservedState, +} + +impl IntentState { + pub(crate) fn all_pageservers(&self) -> Vec { + let mut result = Vec::new(); + if let Some(p) = self.attached { + result.push(p) + } + + result.extend(self.secondary.iter().copied()); + + result + } + + /// When a node goes offline, we update intents to avoid using it + /// as their attached pageserver. + /// + /// Returns true if a change was made + pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool { + if self.attached == Some(node_id) { + self.attached = None; + self.secondary.push(node_id); + true + } else { + false + } + } +} + +impl TenantState { + pub(crate) fn new( + tenant_shard_id: TenantShardId, + shard: ShardIdentity, + policy: PlacementPolicy, + ) -> Self { + Self { + tenant_shard_id, + policy, + intent: IntentState::default(), + generation: Generation::new(0), + shard, + observed: ObservedState::default(), + config: TenantConfig::default(), + reconciler: None, + sequence: Sequence(1), + waiter: Arc::new(SeqWait::new(Sequence(0))), + } + } + + pub(crate) fn schedule(&mut self, scheduler: &mut Scheduler) -> Result<(), ScheduleError> { + // TODO: before scheduling new nodes, check if any existing content in + // self.intent refers to pageservers that are offline, and pick other + // pageservers if so. + + // Build the set of pageservers already in use by this tenant, to avoid scheduling + // more work on the same pageservers we're already using. + let mut used_pageservers = self.intent.all_pageservers(); + let mut modified = false; + + use PlacementPolicy::*; + match self.policy { + Single => { + // Should have exactly one attached, and zero secondaries + if self.intent.attached.is_none() { + let node_id = scheduler.schedule_shard(&used_pageservers)?; + self.intent.attached = Some(node_id); + used_pageservers.push(node_id); + modified = true; + } + if !self.intent.secondary.is_empty() { + self.intent.secondary.clear(); + modified = true; + } + } + Double(secondary_count) => { + // Should have exactly one attached, and N secondaries + if self.intent.attached.is_none() { + let node_id = scheduler.schedule_shard(&used_pageservers)?; + self.intent.attached = Some(node_id); + used_pageservers.push(node_id); + modified = true; + } + + while self.intent.secondary.len() < secondary_count { + let node_id = scheduler.schedule_shard(&used_pageservers)?; + self.intent.secondary.push(node_id); + used_pageservers.push(node_id); + modified = true; + } + } + } + + if modified { + self.sequence.0 += 1; + } + + Ok(()) + } + + fn dirty(&self) -> bool { + if let Some(node_id) = self.intent.attached { + let wanted_conf = attached_location_conf(self.generation, &self.shard, &self.config); + match self.observed.locations.get(&node_id) { + Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {} + Some(_) | None => { + return true; + } + } + } + + for node_id in &self.intent.secondary { + let wanted_conf = secondary_location_conf(&self.shard, &self.config); + match self.observed.locations.get(node_id) { + Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {} + Some(_) | None => { + return true; + } + } + } + + false + } + + pub(crate) fn maybe_reconcile( + &mut self, + result_tx: tokio::sync::mpsc::UnboundedSender, + pageservers: &Arc>, + compute_hook: &Arc, + service_config: &service::Config, + ) -> Option { + // If there are any ambiguous observed states, and the nodes they refer to are available, + // we should reconcile to clean them up. + let mut dirty_observed = false; + for (node_id, observed_loc) in &self.observed.locations { + let node = pageservers + .get(node_id) + .expect("Nodes may not be removed while referenced"); + if observed_loc.conf.is_none() + && !matches!(node.availability, NodeAvailability::Offline) + { + dirty_observed = true; + break; + } + } + + if !self.dirty() && !dirty_observed { + tracing::info!("Not dirty, no reconciliation needed."); + return None; + } + + // Reconcile already in flight for the current sequence? + if let Some(handle) = &self.reconciler { + if handle.sequence == self.sequence { + return Some(ReconcilerWaiter { + tenant_shard_id: self.tenant_shard_id, + seq_wait: self.waiter.clone(), + seq: self.sequence, + }); + } + } + + // Reconcile in flight for a stale sequence? Our sequence's task will wait for it before + // doing our sequence's work. + let old_handle = self.reconciler.take(); + + let cancel = CancellationToken::new(); + let mut reconciler = Reconciler { + tenant_shard_id: self.tenant_shard_id, + shard: self.shard, + generation: self.generation, + intent: self.intent.clone(), + config: self.config.clone(), + observed: self.observed.clone(), + pageservers: pageservers.clone(), + compute_hook: compute_hook.clone(), + service_config: service_config.clone(), + cancel: cancel.clone(), + }; + + let reconcile_seq = self.sequence; + + tracing::info!("Spawning Reconciler for sequence {}", self.sequence); + let join_handle = tokio::task::spawn(async move { + // Wait for any previous reconcile task to complete before we start + if let Some(old_handle) = old_handle { + old_handle.cancel.cancel(); + if let Err(e) = old_handle.handle.await { + // We can't do much with this other than log it: the task is done, so + // we may proceed with our work. + tracing::error!("Unexpected join error waiting for reconcile task: {e}"); + } + } + + // Early check for cancellation before doing any work + // TODO: wrap all remote API operations in cancellation check + // as well. + if reconciler.cancel.is_cancelled() { + return; + } + + let result = reconciler.reconcile().await; + result_tx + .send(ReconcileResult { + sequence: reconcile_seq, + result, + tenant_shard_id: reconciler.tenant_shard_id, + generation: reconciler.generation, + observed: reconciler.observed, + }) + .ok(); + }); + + self.reconciler = Some(ReconcilerHandle { + sequence: self.sequence, + handle: join_handle, + cancel, + }); + + Some(ReconcilerWaiter { + tenant_shard_id: self.tenant_shard_id, + seq_wait: self.waiter.clone(), + seq: self.sequence, + }) + } +} diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs index 731c05809e..08b4961358 100644 --- a/control_plane/src/attachment_service.rs +++ b/control_plane/src/attachment_service.rs @@ -1,14 +1,27 @@ use crate::{background_process, local_env::LocalEnv}; use anyhow::anyhow; use camino::Utf8PathBuf; -use serde::{Deserialize, Serialize}; -use std::{path::PathBuf, process::Child}; -use utils::id::{NodeId, TenantId}; +use hyper::{Method, StatusCode}; +use pageserver_api::{ + models::{ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo}, + shard::TenantShardId, +}; +use postgres_backend::AuthType; +use postgres_connection::parse_host_port; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::{path::PathBuf, process::Child, str::FromStr}; +use tracing::instrument; +use utils::{ + auth::{Claims, Scope}, + id::{NodeId, TenantId}, +}; pub struct AttachmentService { env: LocalEnv, listen: String, path: PathBuf, + jwt_token: Option, + public_key_path: Option, client: reqwest::Client, } @@ -16,7 +29,7 @@ const COMMAND: &str = "attachment_service"; #[derive(Serialize, Deserialize)] pub struct AttachHookRequest { - pub tenant_id: TenantId, + pub tenant_shard_id: TenantShardId, pub node_id: Option, } @@ -27,7 +40,7 @@ pub struct AttachHookResponse { #[derive(Serialize, Deserialize)] pub struct InspectRequest { - pub tenant_id: TenantId, + pub tenant_shard_id: TenantShardId, } #[derive(Serialize, Deserialize)] @@ -35,6 +48,117 @@ pub struct InspectResponse { pub attachment: Option<(u32, NodeId)>, } +#[derive(Serialize, Deserialize)] +pub struct TenantCreateResponseShard { + pub node_id: NodeId, + pub generation: u32, +} + +#[derive(Serialize, Deserialize)] +pub struct TenantCreateResponse { + pub shards: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct NodeRegisterRequest { + pub node_id: NodeId, + + pub listen_pg_addr: String, + pub listen_pg_port: u16, + + pub listen_http_addr: String, + pub listen_http_port: u16, +} + +#[derive(Serialize, Deserialize)] +pub struct NodeConfigureRequest { + pub node_id: NodeId, + + pub availability: Option, + pub scheduling: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct TenantLocateResponseShard { + pub shard_id: TenantShardId, + pub node_id: NodeId, + + pub listen_pg_addr: String, + pub listen_pg_port: u16, + + pub listen_http_addr: String, + pub listen_http_port: u16, +} + +#[derive(Serialize, Deserialize)] +pub struct TenantLocateResponse { + pub shards: Vec, + pub shard_params: ShardParameters, +} + +/// Explicitly migrating a particular shard is a low level operation +/// TODO: higher level "Reschedule tenant" operation where the request +/// specifies some constraints, e.g. asking it to get off particular node(s) +#[derive(Serialize, Deserialize, Debug)] +pub struct TenantShardMigrateRequest { + pub tenant_shard_id: TenantShardId, + pub node_id: NodeId, +} + +#[derive(Serialize, Deserialize, Clone, Copy)] +pub enum NodeAvailability { + // Normal, happy state + Active, + // Offline: Tenants shouldn't try to attach here, but they may assume that their + // secondary locations on this node still exist. Newly added nodes are in this + // state until we successfully contact them. + Offline, +} + +impl FromStr for NodeAvailability { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s { + "active" => Ok(Self::Active), + "offline" => Ok(Self::Offline), + _ => Err(anyhow::anyhow!("Unknown availability state '{s}'")), + } + } +} + +#[derive(Serialize, Deserialize, Clone, Copy)] +pub enum NodeSchedulingPolicy { + // Normal, happy state + Active, + + // A newly added node: gradually move some work here. + Filling, + + // Do not schedule new work here, but leave configured locations in place. + Pause, + + // Do not schedule work here. Gracefully move work away, as resources allow. + Draining, +} + +impl FromStr for NodeSchedulingPolicy { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s { + "active" => Ok(Self::Active), + "filling" => Ok(Self::Filling), + "pause" => Ok(Self::Pause), + "draining" => Ok(Self::Draining), + _ => Err(anyhow::anyhow!("Unknown scheduling state '{s}'")), + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct TenantShardMigrateResponse {} + impl AttachmentService { pub fn from_env(env: &LocalEnv) -> Self { let path = env.base_data_dir.join("attachments.json"); @@ -49,10 +173,34 @@ impl AttachmentService { listen_url.port().unwrap() ); + // Assume all pageservers have symmetric auth configuration: this service + // expects to use one JWT token to talk to all of them. + let ps_conf = env + .pageservers + .first() + .expect("Config is validated to contain at least one pageserver"); + let (jwt_token, public_key_path) = match ps_conf.http_auth_type { + AuthType::Trust => (None, None), + AuthType::NeonJWT => { + let jwt_token = env + .generate_auth_token(&Claims::new(None, Scope::PageServerApi)) + .unwrap(); + + // If pageserver auth is enabled, this implicitly enables auth for this service, + // using the same credentials. + let public_key_path = + camino::Utf8PathBuf::try_from(env.base_data_dir.join("auth_public_key.pem")) + .unwrap(); + (Some(jwt_token), Some(public_key_path)) + } + }; + Self { env: env.clone(), path, listen, + jwt_token, + public_key_path, client: reqwest::ClientBuilder::new() .build() .expect("Failed to construct http client"), @@ -67,72 +215,202 @@ impl AttachmentService { pub async fn start(&self) -> anyhow::Result { let path_str = self.path.to_string_lossy(); - background_process::start_process( + let mut args = vec!["-l", &self.listen, "-p", &path_str] + .into_iter() + .map(|s| s.to_string()) + .collect::>(); + if let Some(jwt_token) = &self.jwt_token { + args.push(format!("--jwt-token={jwt_token}")); + } + + if let Some(public_key_path) = &self.public_key_path { + args.push(format!("--public-key={public_key_path}")); + } + + let result = background_process::start_process( COMMAND, &self.env.base_data_dir, &self.env.attachment_service_bin(), - ["-l", &self.listen, "-p", &path_str], + args, [], background_process::InitialPidFile::Create(self.pid_file()), - // TODO: a real status check - || async move { anyhow::Ok(true) }, + || async { + match self.status().await { + Ok(_) => Ok(true), + Err(_) => Ok(false), + } + }, ) - .await + .await; + + for ps_conf in &self.env.pageservers { + let (pg_host, pg_port) = + parse_host_port(&ps_conf.listen_pg_addr).expect("Unable to parse listen_pg_addr"); + let (http_host, http_port) = parse_host_port(&ps_conf.listen_http_addr) + .expect("Unable to parse listen_http_addr"); + self.node_register(NodeRegisterRequest { + node_id: ps_conf.id, + listen_pg_addr: pg_host.to_string(), + listen_pg_port: pg_port.unwrap_or(5432), + listen_http_addr: http_host.to_string(), + listen_http_port: http_port.unwrap_or(80), + }) + .await?; + } + + result } pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { background_process::stop_process(immediate, COMMAND, &self.pid_file()) } - - /// Call into the attach_hook API, for use before handing out attachments to pageservers - pub async fn attach_hook( + /// Simple HTTP request wrapper for calling into attachment service + async fn dispatch( &self, - tenant_id: TenantId, - pageserver_id: NodeId, - ) -> anyhow::Result> { - use hyper::StatusCode; - + method: hyper::Method, + path: String, + body: Option, + ) -> anyhow::Result + where + RQ: Serialize + Sized, + RS: DeserializeOwned + Sized, + { let url = self .env .control_plane_api .clone() .unwrap() - .join("attach-hook") + .join(&path) .unwrap(); + let mut builder = self.client.request(method, url); + if let Some(body) = body { + builder = builder.json(&body) + } + if let Some(jwt_token) = &self.jwt_token { + eprintln!("dispatch: using token {jwt_token}"); + builder = builder.header( + reqwest::header::AUTHORIZATION, + format!("Bearer {jwt_token}"), + ); + } else { + eprintln!("dispatch: no JWT token"); + } + + let response = builder.send().await?; + if response.status() != StatusCode::OK { + return Err(anyhow!( + "Unexpected status {} on {}", + response.status(), + path + )); + } + + Ok(response.json().await?) + } + + /// Call into the attach_hook API, for use before handing out attachments to pageservers + #[instrument(skip(self))] + pub async fn attach_hook( + &self, + tenant_shard_id: TenantShardId, + pageserver_id: NodeId, + ) -> anyhow::Result> { let request = AttachHookRequest { - tenant_id, + tenant_shard_id, node_id: Some(pageserver_id), }; - let response = self.client.post(url).json(&request).send().await?; - if response.status() != StatusCode::OK { - return Err(anyhow!("Unexpected status {}", response.status())); - } + let response = self + .dispatch::<_, AttachHookResponse>( + Method::POST, + "attach-hook".to_string(), + Some(request), + ) + .await?; - let response = response.json::().await?; Ok(response.gen) } - pub async fn inspect(&self, tenant_id: TenantId) -> anyhow::Result> { - use hyper::StatusCode; + #[instrument(skip(self))] + pub async fn inspect( + &self, + tenant_shard_id: TenantShardId, + ) -> anyhow::Result> { + let request = InspectRequest { tenant_shard_id }; - let url = self - .env - .control_plane_api - .clone() - .unwrap() - .join("inspect") - .unwrap(); + let response = self + .dispatch::<_, InspectResponse>(Method::POST, "inspect".to_string(), Some(request)) + .await?; - let request = InspectRequest { tenant_id }; - - let response = self.client.post(url).json(&request).send().await?; - if response.status() != StatusCode::OK { - return Err(anyhow!("Unexpected status {}", response.status())); - } - - let response = response.json::().await?; Ok(response.attachment) } + + #[instrument(skip(self))] + pub async fn tenant_create( + &self, + req: TenantCreateRequest, + ) -> anyhow::Result { + self.dispatch(Method::POST, "tenant".to_string(), Some(req)) + .await + } + + #[instrument(skip(self))] + pub async fn tenant_locate(&self, tenant_id: TenantId) -> anyhow::Result { + self.dispatch::<(), _>(Method::GET, format!("tenant/{tenant_id}/locate"), None) + .await + } + + #[instrument(skip(self))] + pub async fn tenant_migrate( + &self, + tenant_shard_id: TenantShardId, + node_id: NodeId, + ) -> anyhow::Result { + self.dispatch( + Method::PUT, + format!("tenant/{tenant_shard_id}/migrate"), + Some(TenantShardMigrateRequest { + tenant_shard_id, + node_id, + }), + ) + .await + } + + #[instrument(skip_all, fields(node_id=%req.node_id))] + pub async fn node_register(&self, req: NodeRegisterRequest) -> anyhow::Result<()> { + self.dispatch::<_, ()>(Method::POST, "node".to_string(), Some(req)) + .await + } + + #[instrument(skip_all, fields(node_id=%req.node_id))] + pub async fn node_configure(&self, req: NodeConfigureRequest) -> anyhow::Result<()> { + self.dispatch::<_, ()>( + Method::PUT, + format!("node/{}/config", req.node_id), + Some(req), + ) + .await + } + + #[instrument(skip(self))] + pub async fn status(&self) -> anyhow::Result<()> { + self.dispatch::<(), ()>(Method::GET, "status".to_string(), None) + .await + } + + #[instrument(skip_all, fields(%tenant_id, timeline_id=%req.new_timeline_id))] + pub async fn tenant_timeline_create( + &self, + tenant_id: TenantId, + req: TimelineCreateRequest, + ) -> anyhow::Result { + self.dispatch( + Method::POST, + format!("tenant/{tenant_id}/timeline"), + Some(req), + ) + .await + } } diff --git a/control_plane/src/bin/attachment_service.rs b/control_plane/src/bin/attachment_service.rs deleted file mode 100644 index 9db36ebd55..0000000000 --- a/control_plane/src/bin/attachment_service.rs +++ /dev/null @@ -1,355 +0,0 @@ -/// The attachment service mimics the aspects of the control plane API -/// that are required for a pageserver to operate. -/// -/// This enables running & testing pageservers without a full-blown -/// deployment of the Neon cloud platform. -/// -use anyhow::anyhow; -use clap::Parser; -use hex::FromHex; -use hyper::StatusCode; -use hyper::{Body, Request, Response}; -use pageserver_api::shard::TenantShardId; -use serde::{Deserialize, Serialize}; -use std::path::{Path, PathBuf}; -use std::{collections::HashMap, sync::Arc}; -use utils::http::endpoint::request_span; -use utils::logging::{self, LogFormat}; -use utils::signals::{ShutdownSignals, Signal}; - -use utils::{ - http::{ - endpoint::{self}, - error::ApiError, - json::{json_request, json_response}, - RequestExt, RouterBuilder, - }, - id::{NodeId, TenantId}, - tcp_listener, -}; - -use pageserver_api::control_api::{ - ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateResponse, - ValidateResponseTenant, -}; - -use control_plane::attachment_service::{ - AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, -}; - -#[derive(Parser)] -#[command(author, version, about, long_about = None)] -#[command(arg_required_else_help(true))] -struct Cli { - /// Host and port to listen on, like `127.0.0.1:1234` - #[arg(short, long)] - listen: std::net::SocketAddr, - - /// Path to the .json file to store state (will be created if it doesn't exist) - #[arg(short, long)] - path: PathBuf, -} - -// The persistent state of each Tenant -#[derive(Serialize, Deserialize, Clone)] -struct TenantState { - // Currently attached pageserver - pageserver: Option, - - // Latest generation number: next time we attach, increment this - // and use the incremented number when attaching - generation: u32, -} - -fn to_hex_map(input: &HashMap, serializer: S) -> Result -where - S: serde::Serializer, - V: Clone + Serialize, -{ - let transformed = input.iter().map(|(k, v)| (hex::encode(k), v.clone())); - - transformed - .collect::>() - .serialize(serializer) -} - -fn from_hex_map<'de, D, V>(deserializer: D) -> Result, D::Error> -where - D: serde::de::Deserializer<'de>, - V: Deserialize<'de>, -{ - let hex_map = HashMap::::deserialize(deserializer)?; - hex_map - .into_iter() - .map(|(k, v)| { - TenantId::from_hex(k) - .map(|k| (k, v)) - .map_err(serde::de::Error::custom) - }) - .collect() -} - -// Top level state available to all HTTP handlers -#[derive(Serialize, Deserialize)] -struct PersistentState { - #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")] - tenants: HashMap, - - #[serde(skip)] - path: PathBuf, -} - -impl PersistentState { - async fn save(&self) -> anyhow::Result<()> { - let bytes = serde_json::to_vec(self)?; - tokio::fs::write(&self.path, &bytes).await?; - - Ok(()) - } - - async fn load(path: &Path) -> anyhow::Result { - let bytes = tokio::fs::read(path).await?; - let mut decoded = serde_json::from_slice::(&bytes)?; - decoded.path = path.to_owned(); - Ok(decoded) - } - - async fn load_or_new(path: &Path) -> Self { - match Self::load(path).await { - Ok(s) => { - tracing::info!("Loaded state file at {}", path.display()); - s - } - Err(e) - if e.downcast_ref::() - .map(|e| e.kind() == std::io::ErrorKind::NotFound) - .unwrap_or(false) => - { - tracing::info!("Will create state file at {}", path.display()); - Self { - tenants: HashMap::new(), - path: path.to_owned(), - } - } - Err(e) => { - panic!("Failed to load state from '{}': {e:#} (maybe your .neon/ dir was written by an older version?)", path.display()) - } - } - } -} - -/// State available to HTTP request handlers -#[derive(Clone)] -struct State { - inner: Arc>, -} - -impl State { - fn new(persistent_state: PersistentState) -> State { - Self { - inner: Arc::new(tokio::sync::RwLock::new(persistent_state)), - } - } -} - -#[inline(always)] -fn get_state(request: &Request) -> &State { - request - .data::>() - .expect("unknown state type") - .as_ref() -} - -/// Pageserver calls into this on startup, to learn which tenants it should attach -async fn handle_re_attach(mut req: Request) -> Result, ApiError> { - let reattach_req = json_request::(&mut req).await?; - - let state = get_state(&req).inner.clone(); - let mut locked = state.write().await; - - let mut response = ReAttachResponse { - tenants: Vec::new(), - }; - for (t, state) in &mut locked.tenants { - if state.pageserver == Some(reattach_req.node_id) { - state.generation += 1; - response.tenants.push(ReAttachResponseTenant { - // TODO(sharding): make this shard-aware - id: TenantShardId::unsharded(*t), - gen: state.generation, - }); - } - } - - locked.save().await.map_err(ApiError::InternalServerError)?; - - json_response(StatusCode::OK, response) -} - -/// Pageserver calls into this before doing deletions, to confirm that it still -/// holds the latest generation for the tenants with deletions enqueued -async fn handle_validate(mut req: Request) -> Result, ApiError> { - let validate_req = json_request::(&mut req).await?; - - let locked = get_state(&req).inner.read().await; - - let mut response = ValidateResponse { - tenants: Vec::new(), - }; - - for req_tenant in validate_req.tenants { - // TODO(sharding): make this shard-aware - if let Some(tenant_state) = locked.tenants.get(&req_tenant.id.tenant_id) { - let valid = tenant_state.generation == req_tenant.gen; - tracing::info!( - "handle_validate: {}(gen {}): valid={valid} (latest {})", - req_tenant.id, - req_tenant.gen, - tenant_state.generation - ); - response.tenants.push(ValidateResponseTenant { - id: req_tenant.id, - valid, - }); - } - } - - json_response(StatusCode::OK, response) -} -/// Call into this before attaching a tenant to a pageserver, to acquire a generation number -/// (in the real control plane this is unnecessary, because the same program is managing -/// generation numbers and doing attachments). -async fn handle_attach_hook(mut req: Request) -> Result, ApiError> { - let attach_req = json_request::(&mut req).await?; - - let state = get_state(&req).inner.clone(); - let mut locked = state.write().await; - - let tenant_state = locked - .tenants - .entry(attach_req.tenant_id) - .or_insert_with(|| TenantState { - pageserver: attach_req.node_id, - generation: 0, - }); - - if let Some(attaching_pageserver) = attach_req.node_id.as_ref() { - tenant_state.generation += 1; - tracing::info!( - tenant_id = %attach_req.tenant_id, - ps_id = %attaching_pageserver, - generation = %tenant_state.generation, - "issuing", - ); - } else if let Some(ps_id) = tenant_state.pageserver { - tracing::info!( - tenant_id = %attach_req.tenant_id, - %ps_id, - generation = %tenant_state.generation, - "dropping", - ); - } else { - tracing::info!( - tenant_id = %attach_req.tenant_id, - "no-op: tenant already has no pageserver"); - } - tenant_state.pageserver = attach_req.node_id; - let generation = tenant_state.generation; - - tracing::info!( - "handle_attach_hook: tenant {} set generation {}, pageserver {}", - attach_req.tenant_id, - tenant_state.generation, - attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff)) - ); - - locked.save().await.map_err(ApiError::InternalServerError)?; - - json_response( - StatusCode::OK, - AttachHookResponse { - gen: attach_req.node_id.map(|_| generation), - }, - ) -} - -async fn handle_inspect(mut req: Request) -> Result, ApiError> { - let inspect_req = json_request::(&mut req).await?; - - let state = get_state(&req).inner.clone(); - let locked = state.write().await; - let tenant_state = locked.tenants.get(&inspect_req.tenant_id); - - json_response( - StatusCode::OK, - InspectResponse { - attachment: tenant_state.and_then(|s| s.pageserver.map(|ps| (s.generation, ps))), - }, - ) -} - -async fn handle_tenant_create(mut req: Request) -> Result, ApiError> { - let inspect_req = json_request::(&mut req).await?; - - let state = get_state(&req).inner.clone(); - let locked = state.write().await; - let tenant_state = locked.tenants.get(&inspect_req.tenant_shard_id); - - json_response( - StatusCode::OK, - InspectResponse { - attachment: tenant_state.and_then(|s| s.pageserver.map(|ps| (s.generation, ps))), - }, - ) -} - -fn make_router(persistent_state: PersistentState) -> RouterBuilder { - endpoint::make_router() - .data(Arc::new(State::new(persistent_state))) - .post("/re-attach", |r| request_span(r, handle_re_attach)) - .post("/validate", |r| request_span(r, handle_validate)) - .post("/attach-hook", |r| request_span(r, handle_attach_hook)) - .post("/inspect", |r| request_span(r, handle_inspect)) - .post("/tenant/:tenant_id", |r| { - request_span(r, handle_tenant_create) - }) -} - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - logging::init( - LogFormat::Plain, - logging::TracingErrorLayerEnablement::Disabled, - logging::Output::Stdout, - )?; - - let args = Cli::parse(); - tracing::info!( - "Starting, state at {}, listening on {}", - args.path.to_string_lossy(), - args.listen - ); - - let persistent_state = PersistentState::load_or_new(&args.path).await; - - let http_listener = tcp_listener::bind(args.listen)?; - let router = make_router(persistent_state) - .build() - .map_err(|err| anyhow!(err))?; - let service = utils::http::RouterService::new(router).unwrap(); - let server = hyper::Server::from_tcp(http_listener)?.serve(service); - - tracing::info!("Serving on {0}", args.listen); - - tokio::task::spawn(server); - - ShutdownSignals::handle(|signal| match signal { - Signal::Interrupt | Signal::Terminate | Signal::Quit => { - tracing::info!("Got {}. Terminating", signal.name()); - // We're just a test helper: no graceful shutdown. - std::process::exit(0); - } - })?; - - Ok(()) -} diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 03e69010f7..329a06780b 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -8,14 +8,18 @@ use anyhow::{anyhow, bail, Context, Result}; use clap::{value_parser, Arg, ArgAction, ArgMatches, Command}; use compute_api::spec::ComputeMode; -use control_plane::attachment_service::AttachmentService; +use control_plane::attachment_service::{ + AttachmentService, NodeAvailability, NodeConfigureRequest, NodeSchedulingPolicy, +}; use control_plane::endpoint::ComputeControlPlane; use control_plane::local_env::LocalEnv; use control_plane::pageserver::{PageServerNode, PAGESERVER_REMOTE_STORAGE_DIR}; use control_plane::safekeeper::SafekeeperNode; -use control_plane::tenant_migration::migrate_tenant; use control_plane::{broker, local_env}; -use pageserver_api::models::TimelineInfo; +use pageserver_api::models::{ + ShardParameters, TenantCreateRequest, TimelineCreateRequest, TimelineInfo, +}; +use pageserver_api::shard::{ShardCount, ShardStripeSize, TenantShardId}; use pageserver_api::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT, @@ -30,6 +34,7 @@ use std::path::PathBuf; use std::process::exit; use std::str::FromStr; use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR; +use url::Host; use utils::{ auth::{Claims, Scope}, id::{NodeId, TenantId, TenantTimelineId, TimelineId}, @@ -276,10 +281,10 @@ fn print_timeline( /// Connects to the pageserver to query this information. async fn get_timeline_infos( env: &local_env::LocalEnv, - tenant_id: &TenantId, + tenant_shard_id: &TenantShardId, ) -> Result> { Ok(get_default_pageserver(env) - .timeline_list(tenant_id) + .timeline_list(tenant_shard_id) .await? .into_iter() .map(|timeline_info| (timeline_info.timeline_id, timeline_info)) @@ -297,6 +302,20 @@ fn get_tenant_id(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::R } } +// Helper function to parse --tenant_id option, for commands that accept a shard suffix +fn get_tenant_shard_id( + sub_match: &ArgMatches, + env: &local_env::LocalEnv, +) -> anyhow::Result { + if let Some(tenant_id_from_arguments) = parse_tenant_shard_id(sub_match).transpose() { + tenant_id_from_arguments + } else if let Some(default_id) = env.default_tenant_id { + Ok(TenantShardId::unsharded(default_id)) + } else { + anyhow::bail!("No tenant shard id. Use --tenant-id, or set a default tenant"); + } +} + fn parse_tenant_id(sub_match: &ArgMatches) -> anyhow::Result> { sub_match .get_one::("tenant-id") @@ -305,6 +324,14 @@ fn parse_tenant_id(sub_match: &ArgMatches) -> anyhow::Result> { .context("Failed to parse tenant id from the argument string") } +fn parse_tenant_shard_id(sub_match: &ArgMatches) -> anyhow::Result> { + sub_match + .get_one::("tenant-id") + .map(|id_str| TenantShardId::from_str(id_str)) + .transpose() + .context("Failed to parse tenant shard id from the argument string") +} + fn parse_timeline_id(sub_match: &ArgMatches) -> anyhow::Result> { sub_match .get_one::("timeline-id") @@ -393,47 +420,66 @@ async fn handle_tenant( Some(("create", create_match)) => { let tenant_conf: HashMap<_, _> = create_match .get_many::("config") - .map(|vals| vals.flat_map(|c| c.split_once(':')).collect()) + .map(|vals: clap::parser::ValuesRef<'_, String>| { + vals.flat_map(|c| c.split_once(':')).collect() + }) .unwrap_or_default(); + let shard_count: u8 = create_match + .get_one::("shard-count") + .cloned() + .unwrap_or(0); + + let shard_stripe_size: Option = + create_match.get_one::("shard-stripe-size").cloned(); + + let tenant_conf = PageServerNode::parse_config(tenant_conf)?; + // If tenant ID was not specified, generate one let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate); - let generation = if env.control_plane_api.is_some() { - // We must register the tenant with the attachment service, so - // that when the pageserver restarts, it will be re-attached. - let attachment_service = AttachmentService::from_env(env); - attachment_service - .attach_hook(tenant_id, pageserver.conf.id) - .await? - } else { - None - }; - - pageserver - .tenant_create(tenant_id, generation, tenant_conf) + // We must register the tenant with the attachment service, so + // that when the pageserver restarts, it will be re-attached. + let attachment_service = AttachmentService::from_env(env); + attachment_service + .tenant_create(TenantCreateRequest { + // Note that ::unsharded here isn't actually because the tenant is unsharded, its because the + // attachment service expecfs a shard-naive tenant_id in this attribute, and the TenantCreateRequest + // type is used both in attachment service (for creating tenants) and in pageserver (for creating shards) + new_tenant_id: TenantShardId::unsharded(tenant_id), + generation: None, + shard_parameters: ShardParameters { + count: ShardCount(shard_count), + stripe_size: shard_stripe_size.map(ShardStripeSize), + }, + config: tenant_conf, + }) .await?; println!("tenant {tenant_id} successfully created on the pageserver"); // Create an initial timeline for the new tenant - let new_timeline_id = parse_timeline_id(create_match)?; + let new_timeline_id = + parse_timeline_id(create_match)?.unwrap_or(TimelineId::generate()); let pg_version = create_match .get_one::("pg-version") .copied() .context("Failed to parse postgres version from the argument string")?; - let timeline_info = pageserver - .timeline_create( + // FIXME: passing None for ancestor_start_lsn is not kosher in a sharded world: we can't have + // different shards picking different start lsns. Maybe we have to teach attachment service + // to let shard 0 branch first and then propagate the chosen LSN to other shards. + attachment_service + .tenant_timeline_create( tenant_id, - new_timeline_id, - None, - None, - Some(pg_version), - None, + TimelineCreateRequest { + new_timeline_id, + ancestor_timeline_id: None, + ancestor_start_lsn: None, + existing_initdb_timeline_id: None, + pg_version: Some(pg_version), + }, ) .await?; - let new_timeline_id = timeline_info.timeline_id; - let last_record_lsn = timeline_info.last_record_lsn; env.register_branch_mapping( DEFAULT_BRANCH_NAME.to_string(), @@ -441,9 +487,7 @@ async fn handle_tenant( new_timeline_id, )?; - println!( - "Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}", - ); + println!("Created an initial timeline '{new_timeline_id}' for tenant: {tenant_id}",); if create_match.get_flag("set-default") { println!("Setting tenant {tenant_id} as a default one"); @@ -470,14 +514,64 @@ async fn handle_tenant( println!("tenant {tenant_id} successfully configured on the pageserver"); } Some(("migrate", matches)) => { - let tenant_id = get_tenant_id(matches, env)?; + let tenant_shard_id = get_tenant_shard_id(matches, env)?; let new_pageserver = get_pageserver(env, matches)?; let new_pageserver_id = new_pageserver.conf.id; - migrate_tenant(env, tenant_id, new_pageserver).await?; - println!("tenant {tenant_id} migrated to {}", new_pageserver_id); - } + let attachment_service = AttachmentService::from_env(env); + attachment_service + .tenant_migrate(tenant_shard_id, new_pageserver_id) + .await?; + println!("tenant {tenant_shard_id} migrated to {}", new_pageserver_id); + } + Some(("status", matches)) => { + let tenant_id = get_tenant_id(matches, env)?; + + let mut shard_table = comfy_table::Table::new(); + shard_table.set_header(["Shard", "Pageserver", "Physical Size"]); + + let mut tenant_synthetic_size = None; + + let attachment_service = AttachmentService::from_env(env); + for shard in attachment_service.tenant_locate(tenant_id).await?.shards { + let pageserver = + PageServerNode::from_env(env, env.get_pageserver_conf(shard.node_id)?); + + let size = pageserver + .http_client + .tenant_details(shard.shard_id) + .await? + .tenant_info + .current_physical_size + .unwrap(); + + shard_table.add_row([ + format!("{}", shard.shard_id.shard_slug()), + format!("{}", shard.node_id.0), + format!("{} MiB", size / (1024 * 1024)), + ]); + + if shard.shard_id.is_zero() { + tenant_synthetic_size = + Some(pageserver.tenant_synthetic_size(shard.shard_id).await?); + } + } + + let Some(synthetic_size) = tenant_synthetic_size else { + bail!("Shard 0 not found") + }; + + let mut tenant_table = comfy_table::Table::new(); + tenant_table.add_row(["Tenant ID".to_string(), tenant_id.to_string()]); + tenant_table.add_row([ + "Synthetic size".to_string(), + format!("{} MiB", synthetic_size.size.unwrap_or(0) / (1024 * 1024)), + ]); + + println!("{tenant_table}"); + println!("{shard_table}"); + } Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name), None => bail!("no tenant subcommand provided"), } @@ -489,8 +583,10 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local match timeline_match.subcommand() { Some(("list", list_match)) => { - let tenant_id = get_tenant_id(list_match, env)?; - let timelines = pageserver.timeline_list(&tenant_id).await?; + // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the attachment service + // where shard 0 is attached, and query there. + let tenant_shard_id = get_tenant_shard_id(list_match, env)?; + let timelines = pageserver.timeline_list(&tenant_shard_id).await?; print_timelines_tree(timelines, env.timeline_name_mappings())?; } Some(("create", create_match)) => { @@ -505,18 +601,19 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local .context("Failed to parse postgres version from the argument string")?; let new_timeline_id_opt = parse_timeline_id(create_match)?; + let new_timeline_id = new_timeline_id_opt.unwrap_or(TimelineId::generate()); - let timeline_info = pageserver - .timeline_create( - tenant_id, - new_timeline_id_opt, - None, - None, - Some(pg_version), - None, - ) + let attachment_service = AttachmentService::from_env(env); + let create_req = TimelineCreateRequest { + new_timeline_id, + ancestor_timeline_id: None, + existing_initdb_timeline_id: None, + ancestor_start_lsn: None, + pg_version: Some(pg_version), + }; + let timeline_info = attachment_service + .tenant_timeline_create(tenant_id, create_req) .await?; - let new_timeline_id = timeline_info.timeline_id; let last_record_lsn = timeline_info.last_record_lsn; env.register_branch_mapping(new_branch_name.to_string(), tenant_id, new_timeline_id)?; @@ -574,7 +671,6 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local None, pg_version, ComputeMode::Primary, - DEFAULT_PAGESERVER_ID, )?; println!("Done"); } @@ -598,17 +694,18 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local .map(|lsn_str| Lsn::from_str(lsn_str)) .transpose() .context("Failed to parse ancestor start Lsn from the request")?; - let timeline_info = pageserver - .timeline_create( - tenant_id, - None, - start_lsn, - Some(ancestor_timeline_id), - None, - None, - ) + let new_timeline_id = TimelineId::generate(); + let attachment_service = AttachmentService::from_env(env); + let create_req = TimelineCreateRequest { + new_timeline_id, + ancestor_timeline_id: Some(ancestor_timeline_id), + existing_initdb_timeline_id: None, + ancestor_start_lsn: start_lsn, + pg_version: None, + }; + let timeline_info = attachment_service + .tenant_timeline_create(tenant_id, create_req) .await?; - let new_timeline_id = timeline_info.timeline_id; let last_record_lsn = timeline_info.last_record_lsn; @@ -635,8 +732,10 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re match sub_name { "list" => { - let tenant_id = get_tenant_id(sub_args, env)?; - let timeline_infos = get_timeline_infos(env, &tenant_id) + // TODO(sharding): this command shouldn't have to specify a shard ID: we should ask the attachment service + // where shard 0 is attached, and query there. + let tenant_shard_id = get_tenant_shard_id(sub_args, env)?; + let timeline_infos = get_timeline_infos(env, &tenant_shard_id) .await .unwrap_or_else(|e| { eprintln!("Failed to load timeline info: {}", e); @@ -661,7 +760,7 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re for (endpoint_id, endpoint) in cplane .endpoints .iter() - .filter(|(_, endpoint)| endpoint.tenant_id == tenant_id) + .filter(|(_, endpoint)| endpoint.tenant_id == tenant_shard_id.tenant_id) { let lsn_str = match endpoint.mode { ComputeMode::Static(lsn) => { @@ -680,7 +779,10 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re }; let branch_name = timeline_name_mappings - .get(&TenantTimelineId::new(tenant_id, endpoint.timeline_id)) + .get(&TenantTimelineId::new( + tenant_shard_id.tenant_id, + endpoint.timeline_id, + )) .map(|name| name.as_str()) .unwrap_or("?"); @@ -728,13 +830,6 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re .copied() .unwrap_or(false); - let pageserver_id = - if let Some(id_str) = sub_args.get_one::("endpoint-pageserver-id") { - NodeId(id_str.parse().context("while parsing pageserver id")?) - } else { - DEFAULT_PAGESERVER_ID - }; - let mode = match (lsn, hot_standby) { (Some(lsn), false) => ComputeMode::Static(lsn), (None, true) => ComputeMode::Replica, @@ -762,7 +857,6 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re http_port, pg_version, mode, - pageserver_id, )?; } "start" => { @@ -805,6 +899,22 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re endpoint.timeline_id, )?; + let attachment_service = AttachmentService::from_env(env); + let locate_result = attachment_service.tenant_locate(endpoint.tenant_id).await?; + let pageservers = locate_result + .shards + .into_iter() + .map(|shard| { + ( + Host::parse(&shard.listen_pg_addr) + .expect("Attachment service reported bad hostname"), + shard.listen_pg_port, + ) + }) + .collect::>(); + assert!(!pageservers.is_empty()); + let stripe_size = locate_result.shard_params.stripe_size.map(|s| s.0 as usize); + let ps_conf = env.get_pageserver_conf(pageserver_id)?; let auth_token = if matches!(ps_conf.pg_auth_type, AuthType::NeonJWT) { let claims = Claims::new(Some(endpoint.tenant_id), Scope::Tenant); @@ -816,7 +926,13 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re println!("Starting existing endpoint {endpoint_id}..."); endpoint - .start(&auth_token, safekeepers, remote_ext_config) + .start( + &auth_token, + safekeepers, + pageservers, + remote_ext_config, + stripe_size, + ) .await?; } "reconfigure" => { @@ -827,15 +943,31 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re .endpoints .get(endpoint_id.as_str()) .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?; - let pageserver_id = + let pageservers = if let Some(id_str) = sub_args.get_one::("endpoint-pageserver-id") { - Some(NodeId( - id_str.parse().context("while parsing pageserver id")?, - )) + let ps_id = NodeId(id_str.parse().context("while parsing pageserver id")?); + let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?); + vec![( + pageserver.pg_connection_config.host().clone(), + pageserver.pg_connection_config.port(), + )] } else { - None + let attachment_service = AttachmentService::from_env(env); + attachment_service + .tenant_locate(endpoint.tenant_id) + .await? + .shards + .into_iter() + .map(|shard| { + ( + Host::parse(&shard.listen_pg_addr) + .expect("Attachment service reported malformed host"), + shard.listen_pg_port, + ) + }) + .collect::>() }; - endpoint.reconfigure(pageserver_id).await?; + endpoint.reconfigure(pageservers).await?; } "stop" => { let endpoint_id = sub_args @@ -959,6 +1091,21 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> } } + Some(("set-state", subcommand_args)) => { + let pageserver = get_pageserver(env, subcommand_args)?; + let scheduling = subcommand_args.get_one("scheduling"); + let availability = subcommand_args.get_one("availability"); + + let attachment_service = AttachmentService::from_env(env); + attachment_service + .node_configure(NodeConfigureRequest { + node_id: pageserver.conf.id, + scheduling: scheduling.cloned(), + availability: availability.cloned(), + }) + .await?; + } + Some(("status", subcommand_args)) => { match get_pageserver(env, subcommand_args)?.check_status().await { Ok(_) => println!("Page server is up and running"), @@ -1352,6 +1499,8 @@ fn cli() -> Command { .arg(pg_version_arg.clone()) .arg(Arg::new("set-default").long("set-default").action(ArgAction::SetTrue).required(false) .help("Use this tenant in future CLI commands where tenant_id is needed, but not specified")) + .arg(Arg::new("shard-count").value_parser(value_parser!(u8)).long("shard-count").action(ArgAction::Set).help("Number of shards in the new tenant (default 1)")) + .arg(Arg::new("shard-stripe-size").value_parser(value_parser!(u32)).long("shard-stripe-size").action(ArgAction::Set).help("Sharding stripe size in pages")) ) .subcommand(Command::new("set-default").arg(tenant_id_arg.clone().required(true)) .about("Set a particular tenant as default in future CLI commands where tenant_id is needed, but not specified")) @@ -1362,6 +1511,9 @@ fn cli() -> Command { .about("Migrate a tenant from one pageserver to another") .arg(tenant_id_arg.clone()) .arg(pageserver_id_arg.clone())) + .subcommand(Command::new("status") + .about("Human readable summary of the tenant's shards and attachment locations") + .arg(tenant_id_arg.clone())) ) .subcommand( Command::new("pageserver") @@ -1381,6 +1533,12 @@ fn cli() -> Command { .about("Restart local pageserver") .arg(pageserver_config_args.clone()) ) + .subcommand(Command::new("set-state") + .arg(Arg::new("availability").value_parser(value_parser!(NodeAvailability)).long("availability").action(ArgAction::Set).help("Availability state: offline,active")) + .arg(Arg::new("scheduling").value_parser(value_parser!(NodeSchedulingPolicy)).long("scheduling").action(ArgAction::Set).help("Scheduling state: draining,pause,filling,active")) + .about("Set scheduling or availability state of pageserver node") + .arg(pageserver_config_args.clone()) + ) ) .subcommand( Command::new("attachment_service") diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 3d5dfd6311..3f391b8eb3 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -49,10 +49,11 @@ use compute_api::spec::RemoteExtSpec; use nix::sys::signal::kill; use nix::sys::signal::Signal; use serde::{Deserialize, Serialize}; +use url::Host; use utils::id::{NodeId, TenantId, TimelineId}; +use crate::attachment_service::AttachmentService; use crate::local_env::LocalEnv; -use crate::pageserver::PageServerNode; use crate::postgresql_conf::PostgresConf; use compute_api::responses::{ComputeState, ComputeStatus}; @@ -69,7 +70,6 @@ pub struct EndpointConf { http_port: u16, pg_version: u32, skip_pg_catalog_updates: bool, - pageserver_id: NodeId, } // @@ -121,19 +121,14 @@ impl ComputeControlPlane { http_port: Option, pg_version: u32, mode: ComputeMode, - pageserver_id: NodeId, ) -> Result> { let pg_port = pg_port.unwrap_or_else(|| self.get_port()); let http_port = http_port.unwrap_or_else(|| self.get_port() + 1); - let pageserver = - PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?); - let ep = Arc::new(Endpoint { endpoint_id: endpoint_id.to_owned(), pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), pg_port), http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), http_port), env: self.env.clone(), - pageserver, timeline_id, mode, tenant_id, @@ -159,7 +154,6 @@ impl ComputeControlPlane { pg_port, pg_version, skip_pg_catalog_updates: true, - pageserver_id, })?, )?; std::fs::write( @@ -218,7 +212,6 @@ pub struct Endpoint { // These are not part of the endpoint as such, but the environment // the endpoint runs in. pub env: LocalEnv, - pageserver: PageServerNode, // Optimizations skip_pg_catalog_updates: bool, @@ -241,15 +234,11 @@ impl Endpoint { let conf: EndpointConf = serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?; - let pageserver = - PageServerNode::from_env(env, env.get_pageserver_conf(conf.pageserver_id)?); - Ok(Endpoint { pg_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.pg_port), http_address: SocketAddr::new("127.0.0.1".parse().unwrap(), conf.http_port), endpoint_id, env: env.clone(), - pageserver, timeline_id: conf.timeline_id, mode: conf.mode, tenant_id: conf.tenant_id, @@ -469,11 +458,21 @@ impl Endpoint { } } + fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String { + pageservers + .iter() + .map(|(host, port)| format!("postgresql://no_user@{host}:{port}")) + .collect::>() + .join(",") + } + pub async fn start( &self, auth_token: &Option, safekeepers: Vec, + pageservers: Vec<(Host, u16)>, remote_ext_config: Option<&String>, + shard_stripe_size: Option, ) -> Result<()> { if self.status() == "running" { anyhow::bail!("The endpoint is already running"); @@ -487,13 +486,9 @@ impl Endpoint { std::fs::remove_dir_all(self.pgdata())?; } - let pageserver_connstring = { - let config = &self.pageserver.pg_connection_config; - let (host, port) = (config.host(), config.port()); + let pageserver_connstring = Self::build_pageserver_connstr(&pageservers); + assert!(!pageserver_connstring.is_empty()); - // NOTE: avoid spaces in connection string, because it is less error prone if we forward it somewhere. - format!("postgresql://no_user@{host}:{port}") - }; let mut safekeeper_connstrings = Vec::new(); if self.mode == ComputeMode::Primary { for sk_id in safekeepers { @@ -543,6 +538,7 @@ impl Endpoint { storage_auth_token: auth_token.clone(), remote_extensions, pgbouncer_settings: None, + shard_stripe_size, }; let spec_path = self.endpoint_path().join("spec.json"); std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?; @@ -665,7 +661,7 @@ impl Endpoint { } } - pub async fn reconfigure(&self, pageserver_id: Option) -> Result<()> { + pub async fn reconfigure(&self, mut pageservers: Vec<(Host, u16)>) -> Result<()> { let mut spec: ComputeSpec = { let spec_path = self.endpoint_path().join("spec.json"); let file = std::fs::File::open(spec_path)?; @@ -675,25 +671,27 @@ impl Endpoint { let postgresql_conf = self.read_postgresql_conf()?; spec.cluster.postgresql_conf = Some(postgresql_conf); - if let Some(pageserver_id) = pageserver_id { - let endpoint_config_path = self.endpoint_path().join("endpoint.json"); - let mut endpoint_conf: EndpointConf = { - let file = std::fs::File::open(&endpoint_config_path)?; - serde_json::from_reader(file)? - }; - endpoint_conf.pageserver_id = pageserver_id; - std::fs::write( - endpoint_config_path, - serde_json::to_string_pretty(&endpoint_conf)?, - )?; - - let pageserver = - PageServerNode::from_env(&self.env, self.env.get_pageserver_conf(pageserver_id)?); - let ps_http_conf = &pageserver.pg_connection_config; - let (host, port) = (ps_http_conf.host(), ps_http_conf.port()); - spec.pageserver_connstring = Some(format!("postgresql://no_user@{host}:{port}")); + // If we weren't given explicit pageservers, query the attachment service + if pageservers.is_empty() { + let attachment_service = AttachmentService::from_env(&self.env); + let locate_result = attachment_service.tenant_locate(self.tenant_id).await?; + pageservers = locate_result + .shards + .into_iter() + .map(|shard| { + ( + Host::parse(&shard.listen_pg_addr) + .expect("Attachment service reported bad hostname"), + shard.listen_pg_port, + ) + }) + .collect::>(); } + let pageserver_connstr = Self::build_pageserver_connstr(&pageservers); + assert!(!pageserver_connstr.is_empty()); + spec.pageserver_connstring = Some(pageserver_connstr); + let client = reqwest::Client::new(); let response = client .post(format!( diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index 52a0e20429..bb79d36bfc 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -14,4 +14,3 @@ pub mod local_env; pub mod pageserver; pub mod postgresql_conf; pub mod safekeeper; -pub mod tenant_migration; diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 3445bb7816..18ccf6bd98 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -17,7 +17,9 @@ use std::time::Duration; use anyhow::{bail, Context}; use camino::Utf8PathBuf; use futures::SinkExt; -use pageserver_api::models::{self, LocationConfig, ShardParameters, TenantInfo, TimelineInfo}; +use pageserver_api::models::{ + self, LocationConfig, ShardParameters, TenantHistorySize, TenantInfo, TimelineInfo, +}; use pageserver_api::shard::TenantShardId; use pageserver_client::mgmt_api; use postgres_backend::AuthType; @@ -106,6 +108,16 @@ impl PageServerNode { "control_plane_api='{}'", control_plane_api.as_str() )); + + // Attachment service uses the same auth as pageserver: if JWT is enabled + // for us, we will also need it to talk to them. + if matches!(self.conf.http_auth_type, AuthType::NeonJWT) { + let jwt_token = self + .env + .generate_auth_token(&Claims::new(None, Scope::PageServerApi)) + .unwrap(); + overrides.push(format!("control_plane_api_token='{}'", jwt_token)); + } } if !cli_overrides @@ -301,16 +313,8 @@ impl PageServerNode { pub async fn tenant_list(&self) -> mgmt_api::Result> { self.http_client.list_tenants().await } - - pub async fn tenant_create( - &self, - new_tenant_id: TenantId, - generation: Option, - settings: HashMap<&str, &str>, - ) -> anyhow::Result { - let mut settings = settings.clone(); - - let config = models::TenantConfig { + pub fn parse_config(mut settings: HashMap<&str, &str>) -> anyhow::Result { + let result = models::TenantConfig { checkpoint_distance: settings .remove("checkpoint_distance") .map(|x| x.parse::()) @@ -371,6 +375,20 @@ impl PageServerNode { .context("Failed to parse 'gc_feedback' as bool")?, heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()), }; + if !settings.is_empty() { + bail!("Unrecognized tenant settings: {settings:?}") + } else { + Ok(result) + } + } + + pub async fn tenant_create( + &self, + new_tenant_id: TenantId, + generation: Option, + settings: HashMap<&str, &str>, + ) -> anyhow::Result { + let config = Self::parse_config(settings.clone())?; let request = models::TenantCreateRequest { new_tenant_id: TenantShardId::unsharded(new_tenant_id), @@ -472,18 +490,21 @@ impl PageServerNode { pub async fn location_config( &self, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, config: LocationConfig, flush_ms: Option, ) -> anyhow::Result<()> { Ok(self .http_client - .location_config(tenant_id, config, flush_ms) + .location_config(tenant_shard_id, config, flush_ms) .await?) } - pub async fn timeline_list(&self, tenant_id: &TenantId) -> anyhow::Result> { - Ok(self.http_client.list_timelines(*tenant_id).await?) + pub async fn timeline_list( + &self, + tenant_shard_id: &TenantShardId, + ) -> anyhow::Result> { + Ok(self.http_client.list_timelines(*tenant_shard_id).await?) } pub async fn tenant_secondary_download(&self, tenant_id: &TenantShardId) -> anyhow::Result<()> { @@ -495,15 +516,13 @@ impl PageServerNode { pub async fn timeline_create( &self, - tenant_id: TenantId, - new_timeline_id: Option, + tenant_shard_id: TenantShardId, + new_timeline_id: TimelineId, ancestor_start_lsn: Option, ancestor_timeline_id: Option, pg_version: Option, existing_initdb_timeline_id: Option, ) -> anyhow::Result { - // If timeline ID was not specified, generate one - let new_timeline_id = new_timeline_id.unwrap_or(TimelineId::generate()); let req = models::TimelineCreateRequest { new_timeline_id, ancestor_start_lsn, @@ -511,7 +530,10 @@ impl PageServerNode { pg_version, existing_initdb_timeline_id, }; - Ok(self.http_client.timeline_create(tenant_id, &req).await?) + Ok(self + .http_client + .timeline_create(tenant_shard_id, &req) + .await?) } /// Import a basebackup prepared using either: @@ -589,4 +611,14 @@ impl PageServerNode { Ok(()) } + + pub async fn tenant_synthetic_size( + &self, + tenant_shard_id: TenantShardId, + ) -> anyhow::Result { + Ok(self + .http_client + .tenant_synthetic_size(tenant_shard_id) + .await?) + } } diff --git a/control_plane/src/tenant_migration.rs b/control_plane/src/tenant_migration.rs deleted file mode 100644 index 23ea8f4060..0000000000 --- a/control_plane/src/tenant_migration.rs +++ /dev/null @@ -1,220 +0,0 @@ -//! -//! Functionality for migrating tenants across pageservers: unlike most of neon_local, this code -//! isn't scoped to a particular physical service, as it needs to update compute endpoints to -//! point to the new pageserver. -//! -use crate::local_env::LocalEnv; -use crate::{ - attachment_service::AttachmentService, endpoint::ComputeControlPlane, - pageserver::PageServerNode, -}; -use pageserver_api::models::{ - LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, -}; -use pageserver_api::shard::TenantShardId; -use std::collections::HashMap; -use std::time::Duration; -use utils::{ - id::{TenantId, TimelineId}, - lsn::Lsn, -}; - -/// Given an attached pageserver, retrieve the LSN for all timelines -async fn get_lsns( - tenant_id: TenantId, - pageserver: &PageServerNode, -) -> anyhow::Result> { - let timelines = pageserver.timeline_list(&tenant_id).await?; - Ok(timelines - .into_iter() - .map(|t| (t.timeline_id, t.last_record_lsn)) - .collect()) -} - -/// Wait for the timeline LSNs on `pageserver` to catch up with or overtake -/// `baseline`. -async fn await_lsn( - tenant_id: TenantId, - pageserver: &PageServerNode, - baseline: HashMap, -) -> anyhow::Result<()> { - loop { - let latest = match get_lsns(tenant_id, pageserver).await { - Ok(l) => l, - Err(_e) => { - println!( - "🕑 Waiting for pageserver {} to activate...", - pageserver.conf.id - ); - std::thread::sleep(Duration::from_millis(500)); - continue; - } - }; - - let mut any_behind: bool = false; - for (timeline_id, baseline_lsn) in &baseline { - match latest.get(timeline_id) { - Some(latest_lsn) => { - println!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}"); - if latest_lsn < baseline_lsn { - any_behind = true; - } - } - None => { - // Expected timeline isn't yet visible on migration destination. - // (IRL we would have to account for timeline deletion, but this - // is just test helper) - any_behind = true; - } - } - } - - if !any_behind { - println!("✅ LSN caught up. Proceeding..."); - break; - } else { - std::thread::sleep(Duration::from_millis(500)); - } - } - - Ok(()) -} - -/// This function spans multiple services, to demonstrate live migration of a tenant -/// between pageservers: -/// - Coordinate attach/secondary/detach on pageservers -/// - call into attachment_service for generations -/// - reconfigure compute endpoints to point to new attached pageserver -pub async fn migrate_tenant( - env: &LocalEnv, - tenant_id: TenantId, - dest_ps: PageServerNode, -) -> anyhow::Result<()> { - println!("🤔 Checking existing status..."); - let attachment_service = AttachmentService::from_env(env); - - fn build_location_config( - mode: LocationConfigMode, - generation: Option, - secondary_conf: Option, - ) -> LocationConfig { - LocationConfig { - mode, - generation, - secondary_conf, - tenant_conf: TenantConfig::default(), - shard_number: 0, - shard_count: 0, - shard_stripe_size: 0, - } - } - - let previous = attachment_service.inspect(tenant_id).await?; - let mut baseline_lsns = None; - if let Some((generation, origin_ps_id)) = &previous { - let origin_ps = PageServerNode::from_env(env, env.get_pageserver_conf(*origin_ps_id)?); - - if origin_ps_id == &dest_ps.conf.id { - println!("🔁 Already attached to {origin_ps_id}, freshening..."); - let gen = attachment_service - .attach_hook(tenant_id, dest_ps.conf.id) - .await?; - let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None); - dest_ps.location_config(tenant_id, dest_conf, None).await?; - println!("✅ Migration complete"); - return Ok(()); - } - - println!("🔁 Switching origin pageserver {origin_ps_id} to stale mode"); - - let stale_conf = - build_location_config(LocationConfigMode::AttachedStale, Some(*generation), None); - origin_ps - .location_config(tenant_id, stale_conf, Some(Duration::from_secs(10))) - .await?; - - baseline_lsns = Some(get_lsns(tenant_id, &origin_ps).await?); - } - - println!( - "🔁 Downloading latest layers to destination pageserver {}", - dest_ps.conf.id - ); - match dest_ps - .tenant_secondary_download(&TenantShardId::unsharded(tenant_id)) - .await - { - Ok(()) => {} - Err(_) => { - println!(" (skipping, destination wasn't in secondary mode)") - } - } - - let gen = attachment_service - .attach_hook(tenant_id, dest_ps.conf.id) - .await?; - let dest_conf = build_location_config(LocationConfigMode::AttachedMulti, gen, None); - - println!("🔁 Attaching to pageserver {}", dest_ps.conf.id); - dest_ps.location_config(tenant_id, dest_conf, None).await?; - - if let Some(baseline) = baseline_lsns { - println!("🕑 Waiting for LSN to catch up..."); - await_lsn(tenant_id, &dest_ps, baseline).await?; - } - - let cplane = ComputeControlPlane::load(env.clone())?; - for (endpoint_name, endpoint) in &cplane.endpoints { - if endpoint.tenant_id == tenant_id { - println!( - "🔁 Reconfiguring endpoint {} to use pageserver {}", - endpoint_name, dest_ps.conf.id - ); - endpoint.reconfigure(Some(dest_ps.conf.id)).await?; - } - } - - for other_ps_conf in &env.pageservers { - if other_ps_conf.id == dest_ps.conf.id { - continue; - } - - let other_ps = PageServerNode::from_env(env, other_ps_conf); - let other_ps_tenants = other_ps.tenant_list().await?; - - // Check if this tenant is attached - let found = other_ps_tenants - .into_iter() - .map(|t| t.id) - .any(|i| i.tenant_id == tenant_id); - if !found { - continue; - } - - // Downgrade to a secondary location - let secondary_conf = build_location_config( - LocationConfigMode::Secondary, - None, - Some(LocationConfigSecondary { warm: true }), - ); - - println!( - "💤 Switching to secondary mode on pageserver {}", - other_ps.conf.id - ); - other_ps - .location_config(tenant_id, secondary_conf, None) - .await?; - } - - println!( - "🔁 Switching to AttachedSingle mode on pageserver {}", - dest_ps.conf.id - ); - let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None); - dest_ps.location_config(tenant_id, dest_conf, None).await?; - - println!("✅ Migration complete"); - - Ok(()) -} diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 9ee18f1fc4..f5ef1d8848 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -201,12 +201,12 @@ impl Client { pub async fn timeline_create( &self, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, req: &TimelineCreateRequest, ) -> Result { let uri = format!( "{}/v1/tenant/{}/timeline", - self.mgmt_api_endpoint, tenant_id + self.mgmt_api_endpoint, tenant_shard_id ); self.request(Method::POST, &uri, req) .await? @@ -215,6 +215,21 @@ impl Client { .map_err(Error::ReceiveBody) } + pub async fn timeline_list( + &self, + tenant_shard_id: &TenantShardId, + ) -> Result> { + let uri = format!( + "{}/v1/tenant/{}/timeline", + self.mgmt_api_endpoint, tenant_shard_id + ); + self.get(&uri) + .await? + .json() + .await + .map_err(Error::ReceiveBody) + } + pub async fn tenant_synthetic_size( &self, tenant_shard_id: TenantShardId,