diff --git a/control_plane/src/attachment_service.rs b/control_plane/src/attachment_service.rs new file mode 100644 index 0000000000..d401518614 --- /dev/null +++ b/control_plane/src/attachment_service.rs @@ -0,0 +1,56 @@ +use std::{path::PathBuf, process::Child}; + +use crate::{background_process, local_env::LocalEnv}; + +pub struct AttachmentService { + env: LocalEnv, + listen: String, + path: PathBuf, +} + +const COMMAND: &str = "attachment_service"; + +impl AttachmentService { + pub fn from_env(env: &LocalEnv) -> Self { + let path = env.base_data_dir.join("attachments.json"); + + // Makes no sense to construct this if pageservers aren't going to use it: assume + // pageservers have control plane API set + let listen_url = env.pageserver.control_plane_api.clone().unwrap(); + + let listen = format!( + "{}:{}", + listen_url.host_str().unwrap(), + listen_url.port().unwrap() + ); + + Self { + env: env.clone(), + path, + listen, + } + } + + fn pid_file(&self) -> PathBuf { + self.env.base_data_dir.join("attachment_service.pid") + } + + pub fn start(&self) -> anyhow::Result { + let path_str = self.path.to_string_lossy(); + + background_process::start_process( + COMMAND, + &self.env.base_data_dir, + &self.env.attachment_service_bin(), + ["-l", &self.listen, "-p", &path_str], + [], + background_process::InitialPidFile::Create(&self.pid_file()), + // TODO: a real status check + || Ok(true), + ) + } + + pub fn stop(&self, immediate: bool) -> anyhow::Result<()> { + background_process::stop_process(immediate, COMMAND, &self.pid_file()) + } +} diff --git a/control_plane/src/bin/attachment_service.rs b/control_plane/src/bin/attachment_service.rs index a98b3be99b..103b721915 100644 --- a/control_plane/src/bin/attachment_service.rs +++ b/control_plane/src/bin/attachment_service.rs @@ -12,6 +12,7 @@ use pageserver_api::control_api::*; use serde::{Deserialize, Serialize}; use std::path::{Path, PathBuf}; use std::{collections::HashMap, sync::Arc}; +use utils::logging::{self, LogFormat}; use utils::{ http::{ @@ -218,7 +219,17 @@ fn make_router(persistent_state: PersistentState) -> RouterBuilder anyhow::Result<()> { + logging::init( + LogFormat::Plain, + logging::TracingErrorLayerEnablement::Disabled, + )?; + let args = Cli::parse(); + tracing::info!( + "Starting, state at {}, listening on {}", + args.path.to_string_lossy(), + args.listen + ); let persistent_state = PersistentState::load_or_new(&args.path).await; @@ -229,6 +240,7 @@ async fn main() -> anyhow::Result<()> { let service = utils::http::RouterService::new(router).unwrap(); let server = hyper::Server::from_tcp(http_listener)?.serve(service); + tracing::info!("Serving on {0}", args.listen.as_str()); server.await?; Ok(()) diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 2af79bed90..7dc659cd6c 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -8,6 +8,7 @@ use anyhow::{anyhow, bail, Context, Result}; use clap::{value_parser, Arg, ArgAction, ArgMatches, Command}; use compute_api::spec::ComputeMode; +use control_plane::attachment_service::AttachmentService; use control_plane::endpoint::ComputeControlPlane; use control_plane::local_env::LocalEnv; use control_plane::pageserver::PageServerNode; @@ -43,6 +44,8 @@ project_git_version!(GIT_VERSION); const DEFAULT_PG_VERSION: &str = "15"; +const DEFAULT_PAGESERVER_CONTROL_PLANE_API: &str = "http://127.0.0.1:1234/"; + fn default_conf() -> String { format!( r#" @@ -56,11 +59,13 @@ listen_pg_addr = '{DEFAULT_PAGESERVER_PG_ADDR}' listen_http_addr = '{DEFAULT_PAGESERVER_HTTP_ADDR}' pg_auth_type = '{trust_auth}' http_auth_type = '{trust_auth}' +control_plane_api = '{DEFAULT_PAGESERVER_CONTROL_PLANE_API}' [[safekeepers]] id = {DEFAULT_SAFEKEEPER_ID} pg_port = {DEFAULT_SAFEKEEPER_PG_PORT} http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT} + "#, trust_auth = AuthType::Trust, ) @@ -107,6 +112,7 @@ fn main() -> Result<()> { "start" => handle_start_all(sub_args, &env), "stop" => handle_stop_all(sub_args, &env), "pageserver" => handle_pageserver(sub_args, &env), + "attachment_service" => handle_attachment_service(sub_args, &env), "safekeeper" => handle_safekeeper(sub_args, &env), "endpoint" => handle_endpoint(sub_args, &env), "pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"), @@ -827,6 +833,33 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul Ok(()) } +fn handle_attachment_service(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { + let svc = AttachmentService::from_env(env); + match sub_match.subcommand() { + Some(("start", _start_match)) => { + if let Err(e) = svc.start() { + eprintln!("start failed: {e}"); + exit(1); + } + } + + Some(("stop", stop_match)) => { + let immediate = stop_match + .get_one::("stop-mode") + .map(|s| s.as_str()) + == Some("immediate"); + + if let Err(e) = svc.stop(immediate) { + eprintln!("stop failed: {}", e); + exit(1); + } + } + Some((sub_name, _)) => bail!("Unexpected attachment_service subcommand '{}'", sub_name), + None => bail!("no attachment_service subcommand provided"), + } + Ok(()) +} + fn get_safekeeper(env: &local_env::LocalEnv, id: NodeId) -> Result { if let Some(node) = env.safekeepers.iter().find(|node| node.id == id) { Ok(SafekeeperNode::from_env(env, node)) @@ -907,6 +940,13 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow broker::start_broker_process(env)?; + let attachment_service = AttachmentService::from_env(env); + if let Err(e) = attachment_service.start() { + eprintln!("attachment_service start failed: {:#}", e); + try_stop_all(env, true); + exit(1); + } + let pageserver = PageServerNode::from_env(env); if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) { eprintln!("pageserver {} start failed: {:#}", env.pageserver.id, e); @@ -965,6 +1005,11 @@ fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { if let Err(e) = broker::stop_broker_process(env) { eprintln!("neon broker stop failed: {e:#}"); } + + let attachment_service = AttachmentService::from_env(env); + if let Err(e) = attachment_service.stop(immediate) { + eprintln!("attachment service stop failed: {e:#}"); + } } fn cli() -> Command { @@ -1148,6 +1193,14 @@ fn cli() -> Command { .arg(stop_mode_arg.clone())) .subcommand(Command::new("restart").about("Restart local pageserver").arg(pageserver_config_args.clone())) ) + .subcommand( + Command::new("attachment_service") + .arg_required_else_help(true) + .about("Manage attachment_service") + .subcommand(Command::new("start").about("Start local pageserver").arg(pageserver_config_args.clone())) + .subcommand(Command::new("stop").about("Stop local pageserver") + .arg(stop_mode_arg.clone())) + ) .subcommand( Command::new("safekeeper") .arg_required_else_help(true) diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index a773b8dcc3..7592880402 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -7,6 +7,7 @@ // local installations. // +pub mod attachment_service; mod background_process; pub mod broker; pub mod endpoint; diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 9e42c2e333..0215ab1bb5 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -118,6 +118,9 @@ pub struct PageServerConf { // auth type used for the PG and HTTP ports pub pg_auth_type: AuthType, pub http_auth_type: AuthType, + + // Control plane location + pub control_plane_api: Option, } impl Default for PageServerConf { @@ -128,6 +131,7 @@ impl Default for PageServerConf { listen_http_addr: String::new(), pg_auth_type: AuthType::Trust, http_auth_type: AuthType::Trust, + control_plane_api: None, } } } @@ -202,6 +206,10 @@ impl LocalEnv { self.neon_distrib_dir.join("pageserver") } + pub fn attachment_service_bin(&self) -> PathBuf { + self.neon_distrib_dir.join("attachment_service") + } + pub fn safekeeper_bin(&self) -> PathBuf { self.neon_distrib_dir.join("safekeeper") } diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index c24ea7f717..c674b982c2 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -126,6 +126,13 @@ impl PageServerNode { broker_endpoint_param, ]; + if let Some(control_plane_api) = &self.env.pageserver.control_plane_api { + overrides.push(format!( + "control_plane_api='{}'", + control_plane_api.as_str() + )); + } + if self.env.pageserver.http_auth_type != AuthType::Trust || self.env.pageserver.pg_auth_type != AuthType::Trust {