diff --git a/Cargo.lock b/Cargo.lock index 503c944aff..e811a06fac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1057,6 +1057,8 @@ dependencies = [ "comfy-table", "compute_api", "git-version", + "hex", + "hyper", "nix 0.26.2", "once_cell", "pageserver_api", @@ -1072,6 +1074,7 @@ dependencies = [ "storage_broker", "tar", "thiserror", + "tokio", "toml", "tracing", "url", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index d2c99c5f36..ec685915f9 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -12,6 +12,8 @@ git-version.workspace = true nix.workspace = true once_cell.workspace = true postgres.workspace = true +hex.workspace = true +hyper.workspace = true regex.workspace = true reqwest = { workspace = true, features = ["blocking", "json"] } serde.workspace = true @@ -20,6 +22,7 @@ serde_with.workspace = true tar.workspace = true thiserror.workspace = true toml.workspace = true +tokio.workspace = true url.workspace = true # Note: Do not directly depend on pageserver or safekeeper; use pageserver_api or safekeeper_api # instead, so that recompile times are better. diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs new file mode 100644 index 0000000000..2bd4260aa8 --- /dev/null +++ b/control_plane/src/attachment_service.rs @@ -0,0 +1,106 @@ +use crate::{background_process, local_env::LocalEnv}; +use anyhow::anyhow; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; +use std::{path::PathBuf, process::Child}; +use utils::id::{NodeId, TenantId}; + +pub struct AttachmentService { + env: LocalEnv, + listen: String, + path: PathBuf, +} + +const COMMAND: &str = "attachment_service"; + +#[serde_as] +#[derive(Serialize, Deserialize)] +pub struct AttachHookRequest { + #[serde_as(as = "DisplayFromStr")] + pub tenant_id: TenantId, + pub pageserver_id: Option, +} + +#[derive(Serialize, Deserialize)] +pub struct AttachHookResponse { + pub gen: Option, +} + +impl AttachmentService { + pub fn from_env(env: &LocalEnv) -> Self { + let path = env.base_data_dir.join("attachments.json"); + + // Makes no sense to construct this if pageservers aren't going to use it: assume + // pageservers have control plane API set + let listen_url = env.pageserver.control_plane_api.clone().unwrap(); + + let listen = format!( + "{}:{}", + listen_url.host_str().unwrap(), + listen_url.port().unwrap() + ); + + Self { + env: env.clone(), + path, + listen, + } + } + + fn pid_file(&self) -> PathBuf { + self.env.base_data_dir.join("attachment_service.pid") + } + + pub fn start(&self) -> anyhow::Result { + let path_str = self.path.to_string_lossy(); + + background_process::start_process( + COMMAND, + &self.env.base_data_dir, + &self.env.attachment_service_bin(), + ["-l", &self.listen, "-p", &path_str], + [], + background_process::InitialPidFile::Create(&self.pid_file()), + // TODO: a real status check + || Ok(true), + ) + } + + pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { + background_process::stop_process(immediate, COMMAND, &self.pid_file()) + } + + /// Call into the attach_hook API, for use before handing out attachments to pageservers + pub fn attach_hook( + &self, + tenant_id: TenantId, + pageserver_id: NodeId, + ) -> anyhow::Result> { + use hyper::StatusCode; + + let url = self + .env + .pageserver + .control_plane_api + .clone() + .unwrap() + .join("attach_hook") + .unwrap(); + let client = reqwest::blocking::ClientBuilder::new() + .build() + .expect("Failed to construct http client"); + + let request = AttachHookRequest { + tenant_id, + pageserver_id: Some(pageserver_id), + }; + + let response = client.post(url).json(&request).send()?; + if response.status() != StatusCode::OK { + return Err(anyhow!("Unexpected status {}", response.status())); + } + + let response = response.json::()?; + Ok(response.gen) + } +} diff --git a/control_plane/src/bin/attachment_service.rs b/control_plane/src/bin/attachment_service.rs new file mode 100644 index 0000000000..e879646b63 --- /dev/null +++ b/control_plane/src/bin/attachment_service.rs @@ -0,0 +1,273 @@ +/// The attachment service mimics the aspects of the control plane API +/// that are required for a pageserver to operate. +/// +/// This enables running & testing pageservers without a full-blown +/// deployment of the Neon cloud platform. +/// +use anyhow::anyhow; +use clap::Parser; +use hex::FromHex; +use hyper::StatusCode; +use hyper::{Body, Request, Response}; +use serde::{Deserialize, Serialize}; +use std::path::{Path, PathBuf}; +use std::{collections::HashMap, sync::Arc}; +use utils::logging::{self, LogFormat}; + +use utils::{ + http::{ + endpoint::{self}, + error::ApiError, + json::{json_request, json_response}, + RequestExt, RouterBuilder, + }, + id::{NodeId, TenantId}, + tcp_listener, +}; + +use pageserver_api::control_api::{ + ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest, ValidateResponse, + ValidateResponseTenant, +}; + +use control_plane::attachment_service::{AttachHookRequest, AttachHookResponse}; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +#[command(arg_required_else_help(true))] +struct Cli { + /// Host and port to listen on, like `127.0.0.1:1234` + #[arg(short, long)] + listen: std::net::SocketAddr, + + /// Path to the .json file to store state (will be created if it doesn't exist) + #[arg(short, long)] + path: PathBuf, +} + +// The persistent state of each Tenant +#[derive(Serialize, Deserialize, Clone)] +struct TenantState { + // Currently attached pageserver + pageserver: Option, + + // Latest generation number: next time we attach, increment this + // and use the incremented number when attaching + generation: u32, +} + +fn to_hex_map(input: &HashMap, serializer: S) -> Result +where + S: serde::Serializer, + V: Clone + Serialize, +{ + let transformed = input.iter().map(|(k, v)| (hex::encode(k), v.clone())); + + transformed + .collect::>() + .serialize(serializer) +} + +fn from_hex_map<'de, D, V>(deserializer: D) -> Result, D::Error> +where + D: serde::de::Deserializer<'de>, + V: Deserialize<'de>, +{ + let hex_map = HashMap::::deserialize(deserializer)?; + hex_map + .into_iter() + .map(|(k, v)| { + TenantId::from_hex(k) + .map(|k| (k, v)) + .map_err(serde::de::Error::custom) + }) + .collect() +} + +// Top level state available to all HTTP handlers +#[derive(Serialize, Deserialize)] +struct PersistentState { + #[serde(serialize_with = "to_hex_map", deserialize_with = "from_hex_map")] + tenants: HashMap, + + #[serde(skip)] + path: PathBuf, +} + +impl PersistentState { + async fn save(&self) -> anyhow::Result<()> { + let bytes = serde_json::to_vec(self)?; + tokio::fs::write(&self.path, &bytes).await?; + + Ok(()) + } + + async fn load(path: &Path) -> anyhow::Result { + let bytes = tokio::fs::read(path).await?; + let mut decoded = serde_json::from_slice::(&bytes)?; + decoded.path = path.to_owned(); + Ok(decoded) + } + + async fn load_or_new(path: &Path) -> Self { + match Self::load(path).await { + Ok(s) => { + tracing::info!("Loaded state file at {}", path.display()); + s + } + Err(e) + if e.downcast_ref::() + .map(|e| e.kind() == std::io::ErrorKind::NotFound) + .unwrap_or(false) => + { + tracing::info!("Will create state file at {}", path.display()); + Self { + tenants: HashMap::new(), + path: path.to_owned(), + } + } + Err(e) => { + panic!("Failed to load state from '{}': {e:#} (maybe your .neon/ dir was written by an older version?)", path.display()) + } + } + } +} + +/// State available to HTTP request handlers +#[derive(Clone)] +struct State { + inner: Arc>, +} + +impl State { + fn new(persistent_state: PersistentState) -> State { + Self { + inner: Arc::new(tokio::sync::RwLock::new(persistent_state)), + } + } +} + +#[inline(always)] +fn get_state(request: &Request) -> &State { + request + .data::>() + .expect("unknown state type") + .as_ref() +} + +/// Pageserver calls into this on startup, to learn which tenants it should attach +async fn handle_re_attach(mut req: Request) -> Result, ApiError> { + let reattach_req = json_request::(&mut req).await?; + + let state = get_state(&req).inner.clone(); + let mut locked = state.write().await; + + let mut response = ReAttachResponse { + tenants: Vec::new(), + }; + for (t, state) in &mut locked.tenants { + if state.pageserver == Some(reattach_req.node_id) { + state.generation += 1; + response.tenants.push(ReAttachResponseTenant { + id: *t, + generation: state.generation, + }); + } + } + + locked.save().await.map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, response) +} + +/// Pageserver calls into this before doing deletions, to confirm that it still +/// holds the latest generation for the tenants with deletions enqueued +async fn handle_validate(mut req: Request) -> Result, ApiError> { + let validate_req = json_request::(&mut req).await?; + + let locked = get_state(&req).inner.read().await; + + let mut response = ValidateResponse { + tenants: Vec::new(), + }; + + for req_tenant in validate_req.tenants { + if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) { + let valid = tenant_state.generation == req_tenant.gen; + response.tenants.push(ValidateResponseTenant { + id: req_tenant.id, + valid, + }); + } + } + + json_response(StatusCode::OK, response) +} +/// Call into this before attaching a tenant to a pageserver, to acquire a generation number +/// (in the real control plane this is unnecessary, because the same program is managing +/// generation numbers and doing attachments). +async fn handle_attach_hook(mut req: Request) -> Result, ApiError> { + let attach_req = json_request::(&mut req).await?; + + let state = get_state(&req).inner.clone(); + let mut locked = state.write().await; + + let tenant_state = locked + .tenants + .entry(attach_req.tenant_id) + .or_insert_with(|| TenantState { + pageserver: attach_req.pageserver_id, + generation: 0, + }); + + if attach_req.pageserver_id.is_some() { + tenant_state.generation += 1; + } + let generation = tenant_state.generation; + + locked.save().await.map_err(ApiError::InternalServerError)?; + + json_response( + StatusCode::OK, + AttachHookResponse { + gen: attach_req.pageserver_id.map(|_| generation), + }, + ) +} + +fn make_router(persistent_state: PersistentState) -> RouterBuilder { + endpoint::make_router() + .data(Arc::new(State::new(persistent_state))) + .post("/re-attach", handle_re_attach) + .post("/validate", handle_validate) + .post("/attach_hook", handle_attach_hook) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + logging::init( + LogFormat::Plain, + logging::TracingErrorLayerEnablement::Disabled, + )?; + + let args = Cli::parse(); + tracing::info!( + "Starting, state at {}, listening on {}", + args.path.to_string_lossy(), + args.listen + ); + + let persistent_state = PersistentState::load_or_new(&args.path).await; + + let http_listener = tcp_listener::bind(args.listen)?; + let router = make_router(persistent_state) + .build() + .map_err(|err| anyhow!(err))?; + let service = utils::http::RouterService::new(router).unwrap(); + let server = hyper::Server::from_tcp(http_listener)?.serve(service); + + tracing::info!("Serving on {0}", args.listen); + server.await?; + + Ok(()) +} diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index ef308cb2d2..6b49b92cfa 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -8,6 +8,7 @@ use anyhow::{anyhow, bail, Context, Result}; use clap::{value_parser, Arg, ArgAction, ArgMatches, Command}; use compute_api::spec::ComputeMode; +use control_plane::attachment_service::AttachmentService; use control_plane::endpoint::ComputeControlPlane; use control_plane::local_env::LocalEnv; use control_plane::pageserver::PageServerNode; @@ -43,6 +44,8 @@ project_git_version!(GIT_VERSION); const DEFAULT_PG_VERSION: &str = "15"; +const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/"; + fn default_conf() -> String { format!( r#" @@ -56,11 +59,13 @@ listen_pg_addr = '{DEFAULT_PAGESERVER_PG_ADDR}' listen_http_addr = '{DEFAULT_PAGESERVER_HTTP_ADDR}' pg_auth_type = '{trust_auth}' http_auth_type = '{trust_auth}' +control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}' [[safekeepers]] id = {DEFAULT_SAFEKEEPER_ID} pg_port = {DEFAULT_SAFEKEEPER_PG_PORT} http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT} + "#, trust_auth = AuthType::Trust, ) @@ -107,6 +112,7 @@ fn main() -> Result<()> { "start" => handle_start_all(sub_args, &env), "stop" => handle_stop_all(sub_args, &env), "pageserver" => handle_pageserver(sub_args, &env), + "attachment_service" => handle_attachment_service(sub_args, &env), "safekeeper" => handle_safekeeper(sub_args, &env), "endpoint" => handle_endpoint(sub_args, &env), "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"), @@ -342,13 +348,25 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an } } Some(("create", create_match)) => { - let initial_tenant_id = parse_tenant_id(create_match)?; let tenant_conf: HashMap<_, _> = create_match .get_many::("config") .map(|vals| vals.flat_map(|c| c.split_once(':')).collect()) .unwrap_or_default(); - let new_tenant_id = pageserver.tenant_create(initial_tenant_id, tenant_conf)?; - println!("tenant {new_tenant_id} successfully created on the pageserver"); + + // If tenant ID was not specified, generate one + let tenant_id = parse_tenant_id(create_match)?.unwrap_or_else(TenantId::generate); + + let generation = if env.pageserver.control_plane_api.is_some() { + // We must register the tenant with the attachment service, so + // that when the pageserver restarts, it will be re-attached. + let attachment_service = AttachmentService::from_env(env); + attachment_service.attach_hook(tenant_id, env.pageserver.id)? + } else { + None + }; + + pageserver.tenant_create(tenant_id, generation, tenant_conf)?; + println!("tenant {tenant_id} successfully created on the pageserver"); // Create an initial timeline for the new tenant let new_timeline_id = parse_timeline_id(create_match)?; @@ -358,7 +376,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an .context("Failed to parse postgres version from the argument string")?; let timeline_info = pageserver.timeline_create( - new_tenant_id, + tenant_id, new_timeline_id, None, None, @@ -369,17 +387,17 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an env.register_branch_mapping( DEFAULT_BRANCH_NAME.to_string(), - new_tenant_id, + tenant_id, new_timeline_id, )?; println!( - "Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {new_tenant_id}", + "Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {tenant_id}", ); if create_match.get_flag("set-default") { - println!("Setting tenant {new_tenant_id} as a default one"); - env.default_tenant_id = Some(new_tenant_id); + println!("Setting tenant {tenant_id} as a default one"); + env.default_tenant_id = Some(tenant_id); } } Some(("set-default", set_default_match)) => { @@ -817,6 +835,33 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul Ok(()) } +fn handle_attachment_service(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { + let svc = AttachmentService::from_env(env); + match sub_match.subcommand() { + Some(("start", _start_match)) => { + if let Err(e) = svc.start() { + eprintln!("start failed: {e}"); + exit(1); + } + } + + Some(("stop", stop_match)) => { + let immediate = stop_match + .get_one::("stop-mode") + .map(|s| s.as_str()) + == Some("immediate"); + + if let Err(e) = svc.stop(immediate) { + eprintln!("stop failed: {}", e); + exit(1); + } + } + Some((sub_name, _)) => bail!("Unexpected attachment_service subcommand '{}'", sub_name), + None => bail!("no attachment_service subcommand provided"), + } + Ok(()) +} + fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result { if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) { Ok(SafekeeperNode::from_env(env, node)) @@ -897,6 +942,16 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow broker::start_broker_process(env)?; + // Only start the attachment service if the pageserver is configured to need it + if env.pageserver.control_plane_api.is_some() { + let attachment_service = AttachmentService::from_env(env); + if let Err(e) = attachment_service.start() { + eprintln!("attachment_service start failed: {:#}", e); + try_stop_all(env, true); + exit(1); + } + } + let pageserver = PageServerNode::from_env(env); if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) { eprintln!("pageserver {} start failed: {:#}", env.pageserver.id, e); @@ -955,6 +1010,13 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { if let Err(e) = broker::stop_broker_process(env) { eprintln!("neon broker stop failed: {e:#}"); } + + if env.pageserver.control_plane_api.is_some() { + let attachment_service = AttachmentService::from_env(env); + if let Err(e) = attachment_service.stop(immediate) { + eprintln!("attachment service stop failed: {e:#}"); + } + } } fn cli() -> Command { @@ -1138,6 +1200,14 @@ fn cli() -> Command { .arg(stop_mode_arg.clone())) .subcommand(Command::new("restart").about("Restart local pageserver").arg(pageserver_config_args.clone())) ) + .subcommand( + Command::new("attachment_service") + .arg_required_else_help(true) + .about("Manage attachment_service") + .subcommand(Command::new("start").about("Start local pageserver").arg(pageserver_config_args.clone())) + .subcommand(Command::new("stop").about("Stop local pageserver") + .arg(stop_mode_arg.clone())) + ) .subcommand( Command::new("safekeeper") .arg_required_else_help(true) diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index a773b8dcc3..7592880402 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -7,6 +7,7 @@ // local installations. // +pub mod attachment_service; mod background_process; pub mod broker; pub mod endpoint; diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 9e42c2e333..0215ab1bb5 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -118,6 +118,9 @@ pub struct PageServerConf { // auth type used for the PG and HTTP ports pub pg_auth_type: AuthType, pub http_auth_type: AuthType, + + // Control plane location + pub control_plane_api: Option, } impl Default for PageServerConf { @@ -128,6 +131,7 @@ impl Default for PageServerConf { listen_http_addr: String::new(), pg_auth_type: AuthType::Trust, http_auth_type: AuthType::Trust, + control_plane_api: None, } } } @@ -202,6 +206,10 @@ impl LocalEnv { self.neon_distrib_dir.join("pageserver") } + pub fn attachment_service_bin(&self) -> PathBuf { + self.neon_distrib_dir.join("attachment_service") + } + pub fn safekeeper_bin(&self) -> PathBuf { self.neon_distrib_dir.join("safekeeper") } diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 2ff09021e5..eecc2479ff 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -126,6 +126,13 @@ impl PageServerNode { broker_endpoint_param, ]; + if let Some(control_plane_api) = &self.env.pageserver.control_plane_api { + overrides.push(format!( + "control_plane_api='{}'", + control_plane_api.as_str() + )); + } + if self.env.pageserver.http_auth_type != AuthType::Trust || self.env.pageserver.pg_auth_type != AuthType::Trust { @@ -316,7 +323,8 @@ impl PageServerNode { pub fn tenant_create( &self, - new_tenant_id: Option, + new_tenant_id: TenantId, + generation: Option, settings: HashMap<&str, &str>, ) -> anyhow::Result { let mut settings = settings.clone(); @@ -382,11 +390,9 @@ impl PageServerNode { .context("Failed to parse 'gc_feedback' as bool")?, }; - // If tenant ID was not specified, generate one - let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate()); - let request = models::TenantCreateRequest { new_tenant_id, + generation, config, }; if !settings.is_empty() { diff --git a/libs/pageserver_api/src/control_api.rs b/libs/pageserver_api/src/control_api.rs new file mode 100644 index 0000000000..a54fee47a5 --- /dev/null +++ b/libs/pageserver_api/src/control_api.rs @@ -0,0 +1,52 @@ +//! Types in this file are for pageserver's upward-facing API calls to the control plane, +//! required for acquiring and validating tenant generation numbers. +//! +//! See docs/rfcs/025-generation-numbers.md + +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DisplayFromStr}; +use utils::id::{NodeId, TenantId}; + +#[derive(Serialize, Deserialize)] +pub struct ReAttachRequest { + pub node_id: NodeId, +} + +#[serde_as] +#[derive(Serialize, Deserialize)] +pub struct ReAttachResponseTenant { + #[serde_as(as = "DisplayFromStr")] + pub id: TenantId, + pub generation: u32, +} + +#[derive(Serialize, Deserialize)] +pub struct ReAttachResponse { + pub tenants: Vec, +} + +#[serde_as] +#[derive(Serialize, Deserialize)] +pub struct ValidateRequestTenant { + #[serde_as(as = "DisplayFromStr")] + pub id: TenantId, + pub gen: u32, +} + +#[derive(Serialize, Deserialize)] +pub struct ValidateRequest { + pub tenants: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct ValidateResponse { + pub tenants: Vec, +} + +#[serde_as] +#[derive(Serialize, Deserialize)] +pub struct ValidateResponseTenant { + #[serde_as(as = "DisplayFromStr")] + pub id: TenantId, + pub valid: bool, +} diff --git a/libs/pageserver_api/src/lib.rs b/libs/pageserver_api/src/lib.rs index 4890d54f36..d844021785 100644 --- a/libs/pageserver_api/src/lib.rs +++ b/libs/pageserver_api/src/lib.rs @@ -1,6 +1,7 @@ use const_format::formatcp; /// Public API types +pub mod control_api; pub mod models; pub mod reltag; diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 2f4c21326e..340463afa7 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -194,10 +194,22 @@ pub struct TimelineCreateRequest { pub struct TenantCreateRequest { #[serde_as(as = "DisplayFromStr")] pub new_tenant_id: TenantId, + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub generation: Option, #[serde(flatten)] pub config: TenantConfig, // as we have a flattened field, we should reject all unknown fields in it } +#[serde_as] +#[derive(Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct TenantLoadRequest { + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub generation: Option, +} + impl std::ops::Deref for TenantCreateRequest { type Target = TenantConfig; @@ -241,15 +253,6 @@ pub struct StatusResponse { pub id: NodeId, } -impl TenantCreateRequest { - pub fn new(new_tenant_id: TenantId) -> TenantCreateRequest { - TenantCreateRequest { - new_tenant_id, - config: TenantConfig::default(), - } - } -} - #[serde_as] #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] @@ -293,9 +296,11 @@ impl TenantConfigRequest { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Deserialize)] pub struct TenantAttachRequest { pub config: TenantAttachConfig, + #[serde(default)] + pub generation: Option, } /// Newtype to enforce deny_unknown_fields on TenantConfig for diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index f1095ad8b8..5040183045 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -148,21 +148,55 @@ impl RemoteStorage for LocalFs { Some(folder) => folder.with_base(&self.storage_root), None => self.storage_root.clone(), }; - let mut files = vec![]; - let mut directory_queue = vec![full_path.clone()]; + // If we were given a directory, we may use it as our starting point. + // Otherwise, we must go up to the parent directory. This is because + // S3 object list prefixes can be arbitrary strings, but when reading + // the local filesystem we need a directory to start calling read_dir on. + let mut initial_dir = full_path.clone(); + match fs::metadata(full_path.clone()).await { + Ok(meta) => { + if !meta.is_dir() { + // It's not a directory: strip back to the parent + initial_dir.pop(); + } + } + Err(e) if e.kind() == ErrorKind::NotFound => { + // It's not a file that exists: strip the prefix back to the parent directory + initial_dir.pop(); + } + Err(e) => { + // Unexpected I/O error + anyhow::bail!(e) + } + } + + // Note that PathBuf starts_with only considers full path segments, but + // object prefixes are arbitrary strings, so we need the strings for doing + // starts_with later. + let prefix = full_path.to_string_lossy(); + + let mut files = vec![]; + let mut directory_queue = vec![initial_dir.clone()]; while let Some(cur_folder) = directory_queue.pop() { let mut entries = fs::read_dir(cur_folder.clone()).await?; while let Some(entry) = entries.next_entry().await? { let file_name: PathBuf = entry.file_name().into(); let full_file_name = cur_folder.clone().join(&file_name); - let file_remote_path = self.local_file_to_relative_path(full_file_name.clone()); - files.push(file_remote_path.clone()); - if full_file_name.is_dir() { - directory_queue.push(full_file_name); + if full_file_name + .to_str() + .map(|s| s.starts_with(prefix.as_ref())) + .unwrap_or(false) + { + let file_remote_path = self.local_file_to_relative_path(full_file_name.clone()); + files.push(file_remote_path.clone()); + if full_file_name.is_dir() { + directory_queue.push(full_file_name); + } } } } + Ok(files) } diff --git a/libs/utils/src/generation.rs b/libs/utils/src/generation.rs index 87c6361255..163c8c0467 100644 --- a/libs/utils/src/generation.rs +++ b/libs/utils/src/generation.rs @@ -53,6 +53,7 @@ impl Generation { matches!(self, Self::None) } + #[track_caller] pub fn get_suffix(&self) -> String { match self { Self::Valid(v) => { @@ -64,6 +65,30 @@ impl Generation { } } } + + /// `suffix` is the part after "-" in a key + /// + /// Returns None if parsing was unsuccessful + pub fn parse_suffix(suffix: &str) -> Option { + u32::from_str_radix(suffix, 16).map(Generation::new).ok() + } + + #[track_caller] + pub fn previous(&self) -> Generation { + match self { + Self::Valid(n) => { + if *n == 0 { + // Since a tenant may be upgraded from a pre-generations state, interpret the "previous" generation + // to 0 as being "no generation". + Self::None + } else { + Self::Valid(n - 1) + } + } + Self::None => Self::None, + Self::Broken => panic!("Attempted to use a broken generation"), + } + } } impl Serialize for Generation { diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 71e3a0ff3f..bf0ad760cb 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -388,6 +388,7 @@ fn start_pageserver( remote_storage: remote_storage.clone(), }, order, + shutdown_pageserver.clone(), ))?; BACKGROUND_RUNTIME.spawn({ diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index ab485c969d..624503a3a5 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -205,6 +205,8 @@ pub struct PageServerConf { /// has it's initial logical size calculated. Not running background tasks for some seconds is /// not terrible. pub background_task_maximum_delay: Duration, + + pub control_plane_api: Option, } /// We do not want to store this in a PageServerConf because the latter may be logged @@ -279,6 +281,8 @@ struct PageServerConfigBuilder { ondemand_download_behavior_treat_error_as_warn: BuilderValue, background_task_maximum_delay: BuilderValue, + + control_plane_api: BuilderValue>, } impl Default for PageServerConfigBuilder { @@ -341,6 +345,8 @@ impl Default for PageServerConfigBuilder { DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY, ) .unwrap()), + + control_plane_api: Set(None), } } } @@ -469,6 +475,10 @@ impl PageServerConfigBuilder { self.background_task_maximum_delay = BuilderValue::Set(delay); } + pub fn control_plane_api(&mut self, api: Url) { + self.control_plane_api = BuilderValue::Set(Some(api)) + } + pub fn build(self) -> anyhow::Result { let concurrent_tenant_size_logical_size_queries = self .concurrent_tenant_size_logical_size_queries @@ -554,6 +564,9 @@ impl PageServerConfigBuilder { background_task_maximum_delay: self .background_task_maximum_delay .ok_or(anyhow!("missing background_task_maximum_delay"))?, + control_plane_api: self + .control_plane_api + .ok_or(anyhow!("missing control_plane_api"))?, }) } } @@ -742,6 +755,7 @@ impl PageServerConf { }, "ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?), "background_task_maximum_delay" => builder.background_task_maximum_delay(parse_toml_duration(key, item)?), + "control_plane_api" => builder.control_plane_api(parse_toml_string(key, item)?.parse().context("failed to parse control plane URL")?), _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -910,6 +924,7 @@ impl PageServerConf { test_remote_failures: 0, ondemand_download_behavior_treat_error_as_warn: false, background_task_maximum_delay: Duration::ZERO, + control_plane_api: None, } } } @@ -1133,6 +1148,7 @@ background_task_maximum_delay = '334 s' background_task_maximum_delay: humantime::parse_duration( defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY )?, + control_plane_api: None }, "Correct defaults should be used when no config values are provided" ); @@ -1188,6 +1204,7 @@ background_task_maximum_delay = '334 s' test_remote_failures: 0, ondemand_download_behavior_treat_error_as_warn: false, background_task_maximum_delay: Duration::from_secs(334), + control_plane_api: None }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs new file mode 100644 index 0000000000..192eb16789 --- /dev/null +++ b/pageserver/src/control_plane_client.rs @@ -0,0 +1,119 @@ +use std::collections::HashMap; + +use hyper::StatusCode; +use pageserver_api::control_api::{ReAttachRequest, ReAttachResponse}; +use tokio_util::sync::CancellationToken; +use url::Url; +use utils::{ + backoff, + generation::Generation, + id::{NodeId, TenantId}, +}; + +use crate::config::PageServerConf; + +// Backoffs when control plane requests do not succeed: compromise between reducing load +// on control plane, and retrying frequently when we are blocked on a control plane +// response to make progress. +const BACKOFF_INCREMENT: f64 = 0.1; +const BACKOFF_MAX: f64 = 10.0; + +/// The Pageserver's client for using the control plane API: this is a small subset +/// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md) +pub(crate) struct ControlPlaneClient { + http_client: reqwest::Client, + base_url: Url, + node_id: NodeId, + cancel: CancellationToken, +} + +impl ControlPlaneClient { + /// A None return value indicates that the input `conf` object does not have control + /// plane API enabled. + pub(crate) fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option { + let mut url = match conf.control_plane_api.as_ref() { + Some(u) => u.clone(), + None => return None, + }; + + if let Ok(mut segs) = url.path_segments_mut() { + // This ensures that `url` ends with a slash if it doesn't already. + // That way, we can subsequently use join() to safely attach extra path elements. + segs.pop_if_empty().push(""); + } + + let client = reqwest::ClientBuilder::new() + .build() + .expect("Failed to construct http client"); + + Some(Self { + http_client: client, + base_url: url, + node_id: conf.id, + cancel: cancel.clone(), + }) + } + + async fn try_re_attach( + &self, + url: Url, + request: &ReAttachRequest, + ) -> anyhow::Result { + match self.http_client.post(url).json(request).send().await { + Err(e) => Err(anyhow::Error::from(e)), + Ok(r) => { + if r.status() == StatusCode::OK { + r.json::() + .await + .map_err(anyhow::Error::from) + } else { + Err(anyhow::anyhow!("Unexpected status {}", r.status())) + } + } + } + } + + /// Block until we get a successful response + pub(crate) async fn re_attach(&self) -> anyhow::Result> { + let re_attach_path = self + .base_url + .join("re-attach") + .expect("Failed to build re-attach path"); + let request = ReAttachRequest { + node_id: self.node_id, + }; + + let mut attempt = 0; + loop { + let result = self.try_re_attach(re_attach_path.clone(), &request).await; + match result { + Ok(res) => { + tracing::info!( + "Received re-attach response with {} tenants", + res.tenants.len() + ); + + return Ok(res + .tenants + .into_iter() + .map(|t| (t.id, Generation::new(t.generation))) + .collect::>()); + } + Err(e) => { + tracing::error!("Error re-attaching tenants, retrying: {e:#}"); + backoff::exponential_backoff( + attempt, + BACKOFF_INCREMENT, + BACKOFF_MAX, + &self.cancel, + ) + .await; + if self.cancel.is_cancelled() { + return Err(anyhow::anyhow!("Shutting down")); + } + attempt += 1; + } + } + } + } +} diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 38e07f172d..4988641d6a 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -383,7 +383,6 @@ paths: schema: type: string format: hex - post: description: | Schedules attach operation to happen in the background for the given tenant. @@ -1020,6 +1019,9 @@ components: properties: config: $ref: '#/components/schemas/TenantConfig' + generation: + type: integer + description: Attachment generation number. TenantConfigRequest: allOf: - $ref: '#/components/schemas/TenantConfig' diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index f86657fa77..286aa3dfb0 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -8,7 +8,9 @@ use anyhow::{anyhow, Context, Result}; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; use metrics::launch_timestamp::LaunchTimestamp; -use pageserver_api::models::{DownloadRemoteLayersTaskSpawnRequest, TenantAttachRequest}; +use pageserver_api::models::{ + DownloadRemoteLayersTaskSpawnRequest, TenantAttachRequest, TenantLoadRequest, +}; use remote_storage::GenericRemoteStorage; use storage_broker::BrokerClientChannel; use tenant_size_model::{SizeResult, StorageModel}; @@ -32,11 +34,13 @@ use crate::tenant::mgr::{ }; use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; -use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, Timeline}; +use crate::tenant::timeline::Timeline; +use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError}; use crate::{config::PageServerConf, tenant::mgr}; use crate::{disk_usage_eviction_task, tenant}; use utils::{ auth::JwtAuth, + generation::Generation, http::{ endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with}, error::{ApiError, HttpErrorBody}, @@ -472,7 +476,7 @@ async fn tenant_attach_handler( check_permission(&request, Some(tenant_id))?; let maybe_body: Option = json_request_or_empty_body(&mut request).await?; - let tenant_conf = match maybe_body { + let tenant_conf = match &maybe_body { Some(request) => TenantConfOpt::try_from(&*request.config).map_err(ApiError::BadRequest)?, None => TenantConfOpt::default(), }; @@ -483,10 +487,13 @@ async fn tenant_attach_handler( let state = get_state(&request); + let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?; + if let Some(remote_storage) = &state.remote_storage { mgr::attach_tenant( state.conf, tenant_id, + generation, tenant_conf, state.broker_client.clone(), remote_storage.clone(), @@ -538,7 +545,7 @@ async fn tenant_detach_handler( } async fn tenant_load_handler( - request: Request, + mut request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; @@ -546,10 +553,18 @@ async fn tenant_load_handler( let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + let maybe_body: Option = json_request_or_empty_body(&mut request).await?; + let state = get_state(&request); + + // The /load request is only usable when control_plane_api is not set. Once it is set, callers + // should always use /attach instead. + let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?; + mgr::load_tenant( state.conf, tenant_id, + generation, state.broker_client.clone(), state.remote_storage.clone(), &ctx, @@ -851,6 +866,21 @@ pub fn html_response(status: StatusCode, data: String) -> Result, Ok(response) } +/// Helper for requests that may take a generation, which is mandatory +/// when control_plane_api is set, but otherwise defaults to Generation::none() +fn get_request_generation(state: &State, req_gen: Option) -> Result { + if state.conf.control_plane_api.is_some() { + req_gen + .map(Generation::new) + .ok_or(ApiError::BadRequest(anyhow!( + "generation attribute missing" + ))) + } else { + // Legacy mode: all tenants operate with no generation + Ok(Generation::none()) + } +} + async fn tenant_create_handler( mut request: Request, _cancel: CancellationToken, @@ -867,14 +897,17 @@ async fn tenant_create_handler( let tenant_conf = TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); - let state = get_state(&request); + let generation = get_request_generation(state, request_data.generation)?; + + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + let new_tenant = mgr::create_tenant( state.conf, tenant_conf, target_tenant_id, + generation, state.broker_client.clone(), state.remote_storage.clone(), &ctx, diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index cb20caba1f..3049ad6a4e 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -3,6 +3,7 @@ pub mod basebackup; pub mod config; pub mod consumption_metrics; pub mod context; +mod control_plane_client; pub mod disk_usage_eviction_task; pub mod http; pub mod import_datadir; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 02df8af51c..f0639844bd 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3381,7 +3381,7 @@ pub mod harness { pub tenant_conf: TenantConf, pub tenant_id: TenantId, pub generation: Generation, - remote_storage: GenericRemoteStorage, + pub remote_storage: GenericRemoteStorage, pub remote_fs_dir: PathBuf, } diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs index 7b05704e4f..75ffe09696 100644 --- a/pageserver/src/tenant/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -230,6 +230,23 @@ impl TimelineMetadata { pub fn pg_version(&self) -> u32 { self.body.pg_version } + + // Checksums make it awkward to build a valid instance by hand. This helper + // provides a TimelineMetadata with a valid checksum in its header. + #[cfg(test)] + pub fn example() -> Self { + let instance = Self::new( + "0/16960E8".parse::().unwrap(), + None, + None, + Lsn::from_hex("00000000").unwrap(), + Lsn::from_hex("00000000").unwrap(), + Lsn::from_hex("00000000").unwrap(), + 0, + ); + let bytes = instance.to_bytes().unwrap(); + Self::from_bytes(&bytes).unwrap() + } } impl<'de> Deserialize<'de> for TimelineMetadata { diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 72d150e0eb..b92c1f69c3 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -11,6 +11,7 @@ use anyhow::Context; use once_cell::sync::Lazy; use tokio::sync::RwLock; use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; use tracing::*; use remote_storage::GenericRemoteStorage; @@ -18,6 +19,7 @@ use utils::crashsafe; use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; +use crate::control_plane_client::ControlPlaneClient; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::TenantConfOpt; use crate::tenant::delete::DeleteTenantFlow; @@ -94,12 +96,21 @@ pub async fn init_tenant_mgr( conf: &'static PageServerConf, resources: TenantSharedResources, init_order: InitializationOrder, + cancel: CancellationToken, ) -> anyhow::Result<()> { // Scan local filesystem for attached tenants let tenants_dir = conf.tenants_path(); let mut tenants = HashMap::new(); + // If we are configured to use the control plane API, then it is the source of truth for what tenants to load. + let tenant_generations = if let Some(client) = ControlPlaneClient::new(conf, &cancel) { + Some(client.re_attach().await?) + } else { + info!("Control plane API not configured, tenant generations are disabled"); + None + }; + let mut dir_entries = fs::read_dir(&tenants_dir) .await .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?; @@ -149,9 +160,53 @@ pub async fn init_tenant_mgr( continue; } + let tenant_id = match tenant_dir_path + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + { + Ok(id) => id, + Err(_) => { + warn!( + "Invalid tenant path (garbage in our repo directory?): {}", + tenant_dir_path.display() + ); + continue; + } + }; + + let generation = if let Some(generations) = &tenant_generations { + // We have a generation map: treat it as the authority for whether + // this tenant is really attached. + if let Some(gen) = generations.get(&tenant_id) { + *gen + } else { + info!("Detaching tenant {tenant_id}, control plane omitted it in re-attach response"); + if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await { + error!( + "Failed to remove detached tenant directory '{}': {:?}", + tenant_dir_path.display(), + e + ); + } + continue; + } + } else { + // Legacy mode: no generation information, any tenant present + // on local disk may activate + info!( + "Starting tenant {} in legacy mode, no generation", + tenant_dir_path.display() + ); + Generation::none() + }; + match schedule_local_tenant_processing( conf, + tenant_id, &tenant_dir_path, + generation, resources.clone(), Some(init_order.clone()), &TENANTS, @@ -185,9 +240,12 @@ pub async fn init_tenant_mgr( Ok(()) } +#[allow(clippy::too_many_arguments)] pub(crate) fn schedule_local_tenant_processing( conf: &'static PageServerConf, + tenant_id: TenantId, tenant_path: &Path, + generation: Generation, resources: TenantSharedResources, init_order: Option, tenants: &'static tokio::sync::RwLock, @@ -208,15 +266,6 @@ pub(crate) fn schedule_local_tenant_processing( "Cannot load tenant from empty directory {tenant_path:?}" ); - let tenant_id = tenant_path - .file_name() - .and_then(OsStr::to_str) - .unwrap_or_default() - .parse::() - .with_context(|| { - format!("Could not parse tenant id out of the tenant dir name in path {tenant_path:?}") - })?; - let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); anyhow::ensure!( !conf.tenant_ignore_mark_file_path(&tenant_id).exists(), @@ -229,7 +278,7 @@ pub(crate) fn schedule_local_tenant_processing( match Tenant::spawn_attach( conf, tenant_id, - Generation::none(), + generation, resources.broker_client, tenants, remote_storage, @@ -253,13 +302,7 @@ pub(crate) fn schedule_local_tenant_processing( info!("tenant {tenant_id} is assumed to be loadable, starting load operation"); // Start loading the tenant into memory. It will initially be in Loading state. Tenant::spawn_load( - conf, - tenant_id, - Generation::none(), - resources, - init_order, - tenants, - ctx, + conf, tenant_id, generation, resources, init_order, tenants, ctx, ) }; Ok(tenant) @@ -383,6 +426,7 @@ pub async fn create_tenant( conf: &'static PageServerConf, tenant_conf: TenantConfOpt, tenant_id: TenantId, + generation: Generation, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, ctx: &RequestContext, @@ -400,7 +444,8 @@ pub async fn create_tenant( remote_storage, }; let created_tenant = - schedule_local_tenant_processing(conf, &tenant_directory, tenant_resources, None, &TENANTS, ctx)?; + schedule_local_tenant_processing(conf, tenant_id, &tenant_directory, + generation, tenant_resources, None, &TENANTS, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 @@ -548,6 +593,7 @@ async fn detach_tenant0( pub async fn load_tenant( conf: &'static PageServerConf, tenant_id: TenantId, + generation: Generation, broker_client: storage_broker::BrokerClientChannel, remote_storage: Option, ctx: &RequestContext, @@ -564,7 +610,7 @@ pub async fn load_tenant( broker_client, remote_storage, }; - let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, resources, None, &TENANTS, ctx) + let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, generation, resources, None, &TENANTS, ctx) .with_context(|| { format!("Failed to schedule tenant processing in path {tenant_path:?}") })?; @@ -628,6 +674,7 @@ pub async fn list_tenants() -> Result, TenantMapLis pub async fn attach_tenant( conf: &'static PageServerConf, tenant_id: TenantId, + generation: Generation, tenant_conf: TenantConfOpt, broker_client: storage_broker::BrokerClientChannel, remote_storage: GenericRemoteStorage, @@ -649,7 +696,7 @@ pub async fn attach_tenant( broker_client, remote_storage: Some(remote_storage), }; - let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, resources, None, &TENANTS, ctx)?; + let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, generation, resources, None, &TENANTS, ctx)?; // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here. // See https://github.com/neondatabase/neon/issues/4233 diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 13f3fac41c..6f42b54ac2 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1430,6 +1430,30 @@ pub fn remote_index_path( .expect("Failed to construct path") } +/// Given the key of an index, parse out the generation part of the name +pub(crate) fn parse_remote_index_path(path: RemotePath) -> Option { + let file_name = match path.get_path().file_name() { + Some(f) => f, + None => { + // Unexpected: we should be seeing index_part.json paths only + tracing::warn!("Malformed index key {}", path); + return None; + } + }; + + let file_name_str = match file_name.to_str() { + Some(s) => s, + None => { + tracing::warn!("Malformed index key {:?}", path); + return None; + } + }; + match file_name_str.split_once('-') { + Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix), + None => None, + } +} + /// Files on the remote storage are stored with paths, relative to the workdir. /// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path. /// @@ -1546,6 +1570,33 @@ mod tests { tenant_ctx: ctx, }) } + + /// Construct a RemoteTimelineClient in an arbitrary generation + fn build_client(&self, generation: Generation) -> Arc { + Arc::new(RemoteTimelineClient { + conf: self.harness.conf, + runtime: tokio::runtime::Handle::current(), + tenant_id: self.harness.tenant_id, + timeline_id: TIMELINE_ID, + generation, + storage_impl: self.harness.remote_storage.clone(), + upload_queue: Mutex::new(UploadQueue::Uninitialized), + metrics: Arc::new(RemoteTimelineClientMetrics::new( + &self.harness.tenant_id, + &TIMELINE_ID, + )), + }) + } + + /// A tracing::Span that satisfies remote_timeline_client methods that assert tenant_id + /// and timeline_id are present. + fn span(&self) -> tracing::Span { + tracing::info_span!( + "test", + tenant_id = %self.harness.tenant_id, + timeline_id = %TIMELINE_ID + ) + } } // Test scheduling @@ -1565,12 +1616,16 @@ mod tests { // Schedule another deletion. Check that it's launched immediately. // Schedule index upload. Check that it's queued + let test_setup = TestSetup::new("upload_scheduling").await.unwrap(); + let span = test_setup.span(); + let _guard = span.enter(); + let TestSetup { harness, tenant: _tenant, timeline, tenant_ctx: _tenant_ctx, - } = TestSetup::new("upload_scheduling").await.unwrap(); + } = test_setup; let client = timeline.remote_client.as_ref().unwrap(); @@ -1818,4 +1873,106 @@ mod tests { }; assert_eq!(actual_c, expected_c); } + + async fn inject_index_part(test_state: &TestSetup, generation: Generation) -> IndexPart { + // An empty IndexPart, just sufficient to ensure deserialization will succeed + let example_metadata = TimelineMetadata::example(); + let example_index_part = IndexPart::new( + HashMap::new(), + example_metadata.disk_consistent_lsn(), + example_metadata, + ); + + let index_part_bytes = serde_json::to_vec(&example_index_part).unwrap(); + + let timeline_path = test_state.harness.timeline_path(&TIMELINE_ID); + let remote_timeline_dir = test_state.harness.remote_fs_dir.join( + timeline_path + .strip_prefix(&test_state.harness.conf.workdir) + .unwrap(), + ); + + std::fs::create_dir_all(remote_timeline_dir).expect("creating test dir should work"); + + let index_path = test_state.harness.remote_fs_dir.join( + remote_index_path(&test_state.harness.tenant_id, &TIMELINE_ID, generation).get_path(), + ); + eprintln!("Writing {}", index_path.display()); + std::fs::write(&index_path, index_part_bytes).unwrap(); + example_index_part + } + + /// Assert that when a RemoteTimelineclient in generation `get_generation` fetches its + /// index, the IndexPart returned is equal to `expected` + async fn assert_got_index_part( + test_state: &TestSetup, + get_generation: Generation, + expected: &IndexPart, + ) { + let client = test_state.build_client(get_generation); + + let download_r = client + .download_index_file() + .await + .expect("download should always succeed"); + assert!(matches!(download_r, MaybeDeletedIndexPart::IndexPart(_))); + match download_r { + MaybeDeletedIndexPart::IndexPart(index_part) => { + assert_eq!(&index_part, expected); + } + MaybeDeletedIndexPart::Deleted(_index_part) => panic!("Test doesn't set deleted_at"), + } + } + + #[tokio::test] + async fn index_part_download_simple() -> anyhow::Result<()> { + let test_state = TestSetup::new("index_part_download_simple").await.unwrap(); + let span = test_state.span(); + let _guard = span.enter(); + + // Simple case: we are in generation N, load the index from generation N - 1 + let generation_n = 5; + let injected = inject_index_part(&test_state, Generation::new(generation_n - 1)).await; + + assert_got_index_part(&test_state, Generation::new(generation_n), &injected).await; + + Ok(()) + } + + #[tokio::test] + async fn index_part_download_ordering() -> anyhow::Result<()> { + let test_state = TestSetup::new("index_part_download_ordering") + .await + .unwrap(); + + let span = test_state.span(); + let _guard = span.enter(); + + // A generation-less IndexPart exists in the bucket, we should find it + let generation_n = 5; + let injected_none = inject_index_part(&test_state, Generation::none()).await; + assert_got_index_part(&test_state, Generation::new(generation_n), &injected_none).await; + + // If a more recent-than-none generation exists, we should prefer to load that + let injected_1 = inject_index_part(&test_state, Generation::new(1)).await; + assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await; + + // If a more-recent-than-me generation exists, we should ignore it. + let _injected_10 = inject_index_part(&test_state, Generation::new(10)).await; + assert_got_index_part(&test_state, Generation::new(generation_n), &injected_1).await; + + // If a directly previous generation exists, _and_ an index exists in my own + // generation, I should prefer my own generation. + let _injected_prev = + inject_index_part(&test_state, Generation::new(generation_n - 1)).await; + let injected_current = inject_index_part(&test_state, Generation::new(generation_n)).await; + assert_got_index_part( + &test_state, + Generation::new(generation_n), + &injected_current, + ) + .await; + + Ok(()) + } } diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index dc8d87b9e1..9863215529 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -24,7 +24,10 @@ use utils::crashsafe::path_with_suffix_extension; use utils::id::{TenantId, TimelineId}; use super::index::{IndexPart, LayerFileMetadata}; -use super::{remote_index_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES}; +use super::{ + parse_remote_index_path, remote_index_path, FAILED_DOWNLOAD_WARN_THRESHOLD, + FAILED_REMOTE_OP_RETRIES, +}; static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120); @@ -219,13 +222,13 @@ pub async fn list_remote_timelines( Ok(timeline_ids) } -pub(super) async fn download_index_part( +async fn do_download_index_part( storage: &GenericRemoteStorage, tenant_id: &TenantId, timeline_id: &TimelineId, - generation: Generation, + index_generation: Generation, ) -> Result { - let remote_path = remote_index_path(tenant_id, timeline_id, generation); + let remote_path = remote_index_path(tenant_id, timeline_id, index_generation); let index_part_bytes = download_retry( || async { @@ -252,6 +255,105 @@ pub(super) async fn download_index_part( Ok(index_part) } +/// index_part.json objects are suffixed with a generation number, so we cannot +/// directly GET the latest index part without doing some probing. +/// +/// In this function we probe for the most recent index in a generation <= our current generation. +/// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md +#[tracing::instrument(skip_all, fields(generation=?my_generation))] +pub(super) async fn download_index_part( + storage: &GenericRemoteStorage, + tenant_id: &TenantId, + timeline_id: &TimelineId, + my_generation: Generation, +) -> Result { + debug_assert_current_span_has_tenant_and_timeline_id(); + + if my_generation.is_none() { + // Operating without generations: just fetch the generation-less path + return do_download_index_part(storage, tenant_id, timeline_id, my_generation).await; + } + + // Stale case: If we were intentionally attached in a stale generation, there may already be a remote + // index in our generation. + // + // This is an optimization to avoid doing the listing for the general case below. + let res = do_download_index_part(storage, tenant_id, timeline_id, my_generation).await; + match res { + Ok(index_part) => { + tracing::debug!( + "Found index_part from current generation (this is a stale attachment)" + ); + return Ok(index_part); + } + Err(DownloadError::NotFound) => {} + Err(e) => return Err(e), + }; + + // Typical case: the previous generation of this tenant was running healthily, and had uploaded + // and index part. We may safely start from this index without doing a listing, because: + // - We checked for current generation case above + // - generations > my_generation are to be ignored + // - any other indices that exist would have an older generation than `previous_gen`, and + // we want to find the most recent index from a previous generation. + // + // This is an optimization to avoid doing the listing for the general case below. + let res = + do_download_index_part(storage, tenant_id, timeline_id, my_generation.previous()).await; + match res { + Ok(index_part) => { + tracing::debug!("Found index_part from previous generation"); + return Ok(index_part); + } + Err(DownloadError::NotFound) => { + tracing::debug!( + "No index_part found from previous generation, falling back to listing" + ); + } + Err(e) => { + return Err(e); + } + } + + // General case/fallback: if there is no index at my_generation or prev_generation, then list all index_part.json + // objects, and select the highest one with a generation <= my_generation. + let index_prefix = remote_index_path(tenant_id, timeline_id, Generation::none()); + let indices = backoff::retry( + || async { storage.list_files(Some(&index_prefix)).await }, + |_| false, + FAILED_DOWNLOAD_WARN_THRESHOLD, + FAILED_REMOTE_OP_RETRIES, + "listing index_part files", + // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066) + backoff::Cancel::new(CancellationToken::new(), || -> anyhow::Error { + unreachable!() + }), + ) + .await + .map_err(DownloadError::Other)?; + + // General case logic for which index to use: the latest index whose generation + // is <= our own. See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md + let max_previous_generation = indices + .into_iter() + .filter_map(parse_remote_index_path) + .filter(|g| g <= &my_generation) + .max(); + + match max_previous_generation { + Some(g) => { + tracing::debug!("Found index_part in generation {g:?}"); + do_download_index_part(storage, tenant_id, timeline_id, g).await + } + None => { + // Migration from legacy pre-generation state: we have a generation but no prior + // attached pageservers did. Try to load from a no-generation path. + tracing::info!("No index_part.json* found"); + do_download_index_part(storage, tenant_id, timeline_id, Generation::none()).await + } + } +} + /// Helper function to handle retries for a download operation. /// /// Remote operations can fail due to rate limits (IAM, S3), spurious network diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d7ff70a29c..7b6c6dbfad 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1724,11 +1724,18 @@ impl Timeline { for (name, decision) in decided { let decision = match decision { Ok(UseRemote { local, remote }) => { - path.push(name.file_name()); - init::cleanup_local_file_for_remote(&path, &local, &remote)?; - path.pop(); - - UseRemote { local, remote } + // Remote is authoritative, but we may still choose to retain + // the local file if the contents appear to match + if local.file_size() == remote.file_size() { + // Use the local file, but take the remote metadata so that we pick up + // the correct generation. + UseLocal(remote) + } else { + path.push(name.file_name()); + init::cleanup_local_file_for_remote(&path, &local, &remote)?; + path.pop(); + UseRemote { local, remote } + } } Ok(decision) => decision, Err(FutureLayer { local }) => { diff --git a/pageserver/src/tenant/timeline/init.rs b/pageserver/src/tenant/timeline/init.rs index 33effb4318..22976a514d 100644 --- a/pageserver/src/tenant/timeline/init.rs +++ b/pageserver/src/tenant/timeline/init.rs @@ -147,11 +147,7 @@ pub(super) fn reconcile( Err(FutureLayer { local }) } else { Ok(match (local, remote) { - (Some(local), Some(remote)) if local != remote => { - assert_eq!(local.generation, remote.generation); - - UseRemote { local, remote } - } + (Some(local), Some(remote)) if local != remote => UseRemote { local, remote }, (Some(x), Some(_)) => UseLocal(x), (None, Some(x)) => Evicted(x), (Some(x), None) => NeedsUpload(x), diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 4ea1d79705..a8ca3b50a1 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -456,6 +456,7 @@ class NeonEnvBuilder: self.preserve_database_files = preserve_database_files self.initial_tenant = initial_tenant or TenantId.generate() self.initial_timeline = initial_timeline or TimelineId.generate() + self.enable_generations = False self.scrub_on_exit = False self.test_output_dir = test_output_dir @@ -740,6 +741,9 @@ class NeonEnvBuilder: sk.stop(immediate=True) self.env.pageserver.stop(immediate=True) + if self.env.attachment_service is not None: + self.env.attachment_service.stop(immediate=True) + cleanup_error = None if self.scrub_on_exit: @@ -802,6 +806,8 @@ class NeonEnv: the tenant id """ + PAGESERVER_ID = 1 + def __init__(self, config: NeonEnvBuilder): self.repo_dir = config.repo_dir self.rust_log_override = config.rust_log_override @@ -825,6 +831,14 @@ class NeonEnv: self.initial_tenant = config.initial_tenant self.initial_timeline = config.initial_timeline + if config.enable_generations: + attachment_service_port = self.port_distributor.get_port() + self.control_plane_api: Optional[str] = f"http://127.0.0.1:{attachment_service_port}" + self.attachment_service: Optional[NeonAttachmentService] = NeonAttachmentService(self) + else: + self.control_plane_api = None + self.attachment_service = None + # Create a config file corresponding to the options toml = textwrap.dedent( f""" @@ -850,7 +864,7 @@ class NeonEnv: toml += textwrap.dedent( f""" [pageserver] - id=1 + id={self.PAGESERVER_ID} listen_pg_addr = 'localhost:{pageserver_port.pg}' listen_http_addr = 'localhost:{pageserver_port.http}' pg_auth_type = '{pg_auth_type}' @@ -858,6 +872,13 @@ class NeonEnv: """ ) + if self.control_plane_api is not None: + toml += textwrap.dedent( + f""" + control_plane_api = '{self.control_plane_api}' + """ + ) + # Create a corresponding NeonPageserver object self.pageserver = NeonPageserver( self, port=pageserver_port, config_override=config.pageserver_config_override @@ -904,6 +925,9 @@ class NeonEnv: def start(self): # Start up broker, pageserver and all safekeepers self.broker.try_start() + + if self.attachment_service is not None: + self.attachment_service.start() self.pageserver.start() for safekeeper in self.safekeepers: @@ -1331,6 +1355,16 @@ class NeonCli(AbstractNeonCli): res.check_returncode() return res + def attachment_service_start(self): + cmd = ["attachment_service", "start"] + return self.raw_cli(cmd) + + def attachment_service_stop(self, immediate: bool): + cmd = ["attachment_service", "stop"] + if immediate: + cmd.extend(["-m", "immediate"]) + return self.raw_cli(cmd) + def pageserver_start( self, overrides: Tuple[str, ...] = (), @@ -1512,6 +1546,35 @@ class ComputeCtl(AbstractNeonCli): COMMAND = "compute_ctl" +class NeonAttachmentService: + def __init__(self, env: NeonEnv): + self.env = env + self.running = False + + def start(self): + assert not self.running + self.env.neon_cli.attachment_service_start() + self.running = True + return self + + def stop(self, immediate: bool = False) -> "NeonAttachmentService": + if self.running: + self.env.neon_cli.attachment_service_stop(immediate) + self.running = False + return self + + def __enter__(self) -> "NeonAttachmentService": + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ): + self.stop(immediate=True) + + class NeonPageserver(PgProtocol): """ An object representing a running pageserver. @@ -1675,6 +1738,26 @@ class NeonPageserver(PgProtocol): return None + def tenant_attach( + self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False + ): + """ + Tenant attachment passes through here to acquire a generation number before proceeding + to call into the pageserver HTTP client. + """ + if self.env.attachment_service is not None: + response = requests.post( + f"{self.env.control_plane_api}/attach_hook", + json={"tenant_id": str(tenant_id), "pageserver_id": self.env.PAGESERVER_ID}, + ) + response.raise_for_status() + generation = response.json()["gen"] + else: + generation = None + + client = self.env.pageserver.http_client() + return client.tenant_attach(tenant_id, config, config_null, generation=generation) + def append_pageserver_param_overrides( params_to_update: List[str], diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index a179ebdd09..9373073abf 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -186,18 +186,25 @@ class PageserverHttpClient(requests.Session): return TenantId(new_tenant_id) def tenant_attach( - self, tenant_id: TenantId, config: None | Dict[str, Any] = None, config_null: bool = False + self, + tenant_id: TenantId, + config: None | Dict[str, Any] = None, + config_null: bool = False, + generation: Optional[int] = None, ): if config_null: assert config is None - body = "null" + body: Any = None else: # null-config is prohibited by the API config = config or {} - body = json.dumps({"config": config}) + body = {"config": config} + if generation is not None: + body.update({"generation": generation}) + res = self.post( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach", - data=body, + data=json.dumps(body), headers={"Content-Type": "application/json"}, ) self.verbose_error(res) diff --git a/test_runner/regress/test_pageserver_restart.py b/test_runner/regress/test_pageserver_restart.py index 567200ba1f..ad625df1cc 100644 --- a/test_runner/regress/test_pageserver_restart.py +++ b/test_runner/regress/test_pageserver_restart.py @@ -8,7 +8,9 @@ from fixtures.remote_storage import s3_storage # Test restarting page server, while safekeeper and compute node keep # running. -def test_pageserver_restart(neon_env_builder: NeonEnvBuilder): +@pytest.mark.parametrize("generations", [True, False]) +def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool): + neon_env_builder.enable_generations = generations neon_env_builder.enable_remote_storage(remote_storage_kind=s3_storage()) neon_env_builder.enable_scrub_on_exit() diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py index 8b75d35200..3d9655cb87 100644 --- a/test_runner/regress/test_remote_storage.py +++ b/test_runner/regress/test_remote_storage.py @@ -52,9 +52,9 @@ from requests import ReadTimeout # # The tests are done for all types of remote storage pageserver supports. @pytest.mark.parametrize("remote_storage_kind", available_remote_storages()) +@pytest.mark.parametrize("generations", [True, False]) def test_remote_storage_backup_and_restore( - neon_env_builder: NeonEnvBuilder, - remote_storage_kind: RemoteStorageKind, + neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, generations: bool ): # Use this test to check more realistic SK ids: some etcd key parsing bugs were related, # and this test needs SK to write data to pageserver, so it will be visible @@ -64,6 +64,8 @@ def test_remote_storage_backup_and_restore( remote_storage_kind=remote_storage_kind, ) + neon_env_builder.enable_generations = generations + # Exercise retry code path by making all uploads and downloads fail for the # first time. The retries print INFO-messages to the log; we will check # that they are present after the test. @@ -154,7 +156,7 @@ def test_remote_storage_backup_and_restore( # background task to load the tenant. In that background task, # listing the remote timelines will fail because of the failpoint, # and the tenant will be marked as Broken. - client.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) tenant_info = wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 15) assert tenant_info["attachment_status"] == { @@ -164,7 +166,7 @@ def test_remote_storage_backup_and_restore( # Ensure that even though the tenant is broken, we can't attach it again. with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state: Broken"): - client.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) # Restart again, this implicitly clears the failpoint. # test_remote_failures=1 remains active, though, as it's in the pageserver config. @@ -182,7 +184,7 @@ def test_remote_storage_backup_and_restore( # Ensure that the pageserver remembers that the tenant was attaching, by # trying to attach it again. It should fail. with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state:"): - client.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) log.info("waiting for tenant to become active. this should be quick with on-demand download") wait_until_tenant_active( @@ -362,7 +364,7 @@ def test_remote_storage_upload_queue_retries( env.pageserver.start() client = env.pageserver.http_client() - client.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) wait_until_tenant_active(client, tenant_id) @@ -499,7 +501,7 @@ def test_remote_timeline_client_calls_started_metric( env.pageserver.start() client = env.pageserver.http_client() - client.tenant_attach(tenant_id) + env.pageserver.tenant_attach(tenant_id) wait_until_tenant_active(client, tenant_id)