diff --git a/Cargo.lock b/Cargo.lock index 9e0e343996..44143fa0da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -718,13 +718,13 @@ dependencies = [ [[package]] name = "axum" -version = "0.7.5" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", "axum-core", - "base64 0.21.1", + "base64 0.22.1", "bytes", "futures-util", "http 1.1.0", @@ -746,8 +746,8 @@ dependencies = [ "sha1", "sync_wrapper 1.0.1", "tokio", - "tokio-tungstenite", - "tower", + "tokio-tungstenite 0.24.0", + "tower 0.5.2", "tower-layer", "tower-service", "tracing", @@ -1267,6 +1267,7 @@ dependencies = [ "aws-config", "aws-sdk-kms", "aws-sdk-s3", + "axum", "base64 0.13.1", "bytes", "camino", @@ -1277,7 +1278,7 @@ dependencies = [ "fail", "flate2", "futures", - "hyper 0.14.30", + "http 1.1.0", "metrics", "nix 0.27.1", "notify", @@ -1303,6 +1304,8 @@ dependencies = [ "tokio-postgres", "tokio-stream", "tokio-util", + "tower 0.5.2", + "tower-http", "tracing", "tracing-opentelemetry", "tracing-subscriber", @@ -2720,7 +2723,7 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower", + "tower 0.4.13", "tower-service", "tracing", ] @@ -3260,9 +3263,9 @@ checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" [[package]] name = "matchit" -version = "0.8.2" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "540f1c43aed89909c0cc0cc604e3bb2f7e7a341a3728a9e6cfe760e733cd11ed" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" [[package]] name = "md-5" @@ -4758,7 +4761,7 @@ dependencies = [ "tokio-postgres", "tokio-postgres2", "tokio-rustls 0.26.0", - "tokio-tungstenite", + "tokio-tungstenite 0.21.0", "tokio-util", "tracing", "tracing-subscriber", @@ -5186,7 +5189,7 @@ dependencies = [ "async-trait", "getrandom 0.2.11", "http 1.1.0", - "matchit 0.8.2", + "matchit 0.8.4", "opentelemetry", "reqwest", "reqwest-middleware", @@ -6800,7 +6803,19 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.21.0", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.24.0", ] [[package]] @@ -6881,7 +6896,7 @@ dependencies = [ "tokio", "tokio-rustls 0.26.0", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -6922,16 +6937,49 @@ dependencies = [ ] [[package]] -name = "tower-layer" -version = "0.3.2" +name = "tower" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 1.0.1", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697" +dependencies = [ + "bitflags 2.4.1", + "bytes", + "http 1.1.0", + "http-body 1.0.0", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", + "uuid", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" @@ -7086,6 +7134,24 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "twox-hash" version = "1.6.3" @@ -7867,7 +7933,8 @@ dependencies = [ "tokio-util", "toml_edit", "tonic", - "tower", + "tower 0.4.13", + "tower 0.5.2", "tracing", "tracing-core", "url", diff --git a/Cargo.toml b/Cargo.toml index 197808d5ae..39898e1c8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ aws-smithy-types = "1.2" aws-credential-types = "1.2.0" aws-sigv4 = { version = "1.2", features = ["sign-http"] } aws-types = "1.3" -axum = { version = "0.7.5", features = ["ws"] } +axum = { version = "0.7.9", features = ["ws"] } base64 = "0.13.0" bincode = "1.3" bindgen = "0.70" @@ -187,7 +187,9 @@ tokio-util = { version = "0.7.10", features = ["io", "rt"] } toml = "0.8" toml_edit = "0.22" tonic = {version = "0.12.3", features = ["tls", "tls-roots"]} -tower-service = "0.3.2" +tower = { version = "0.5.2", default-features = false } +tower-http = { version = "0.6.2", features = ["request-id", "trace"] } +tower-service = "0.3.3" tracing = "0.1" tracing-error = "0.2" tracing-opentelemetry = "0.27" diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 9525b27818..33892813c4 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -15,6 +15,7 @@ aws-config.workspace = true aws-sdk-s3.workspace = true aws-sdk-kms.workspace = true anyhow.workspace = true +axum = { workspace = true, features = [] } camino.workspace = true chrono.workspace = true cfg-if.workspace = true @@ -22,7 +23,7 @@ clap.workspace = true fail.workspace = true flate2.workspace = true futures.workspace = true -hyper0 = { workspace = true, features = ["full"] } +http.workspace = true metrics.workspace = true nix.workspace = true notify.workspace = true @@ -37,6 +38,8 @@ serde_with.workspace = true serde_json.workspace = true signal-hook.workspace = true tar.workspace = true +tower.workspace = true +tower-http.workspace = true reqwest = { workspace = true, features = ["json"] } tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } tokio-postgres.workspace = true diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 6ede5fdceb..04432ad0f3 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -60,7 +60,7 @@ use compute_tools::compute::{ }; use compute_tools::configurator::launch_configurator; use compute_tools::extension_server::get_pg_version_string; -use compute_tools::http::api::launch_http_server; +use compute_tools::http::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; use compute_tools::params::*; @@ -493,7 +493,10 @@ fn start_postgres( let mut pg = None; if !prestartup_failed { pg = match compute.start_compute() { - Ok(pg) => Some(pg), + Ok(pg) => { + info!(postmaster_pid = %pg.0.id(), "Postgres was started"); + Some(pg) + } Err(err) => { error!("could not start the compute node: {:#}", err); compute.set_failed_status(err); @@ -591,6 +594,8 @@ fn wait_postgres(pg: Option) -> Result { // propagate to Postgres and it will be shut down as well. let mut exit_code = None; if let Some((mut pg, logs_handle)) = pg { + info!(postmaster_pid = %pg.id(), "Waiting for Postgres to exit"); + let ecode = pg .wait() .expect("failed to start waiting on Postgres process"); diff --git a/compute_tools/src/catalog.rs b/compute_tools/src/catalog.rs index 72198a9479..4a297cfacf 100644 --- a/compute_tools/src/catalog.rs +++ b/compute_tools/src/catalog.rs @@ -36,11 +36,11 @@ pub async fn get_dbs_and_roles(compute: &Arc) -> anyhow::Result ComputeStatusResponse { - ComputeStatusResponse { - start_time: state.start_time, - tenant: state - .pspec - .as_ref() - .map(|pspec| pspec.tenant_id.to_string()), - timeline: state - .pspec - .as_ref() - .map(|pspec| pspec.timeline_id.to_string()), - status: state.status, - last_active: state.last_active, - error: state.error.clone(), - } -} - -// Service function to handle all available routes. -async fn routes(req: Request, compute: &Arc) -> Response { - // - // NOTE: The URI path is currently included in traces. That's OK because - // it doesn't contain any variable parts or sensitive information. But - // please keep that in mind if you change the routing here. - // - match (req.method(), req.uri().path()) { - // Serialized compute state. - (&Method::GET, "/status") => { - debug!("serving /status GET request"); - let state = compute.state.lock().unwrap(); - let status_response = status_response_from_state(&state); - Response::new(Body::from(serde_json::to_string(&status_response).unwrap())) - } - - // Startup metrics in JSON format. Keep /metrics reserved for a possible - // future use for Prometheus metrics format. - (&Method::GET, "/metrics.json") => { - info!("serving /metrics.json GET request"); - let metrics = compute.state.lock().unwrap().metrics.clone(); - Response::new(Body::from(serde_json::to_string(&metrics).unwrap())) - } - - // Prometheus metrics - (&Method::GET, "/metrics") => { - debug!("serving /metrics GET request"); - - // When we call TextEncoder::encode() below, it will immediately - // return an error if a metric family has no metrics, so we need to - // preemptively filter out metric families with no metrics. - let metrics = installed_extensions::collect() - .into_iter() - .filter(|m| !m.get_metric().is_empty()) - .collect::>(); - - let encoder = TextEncoder::new(); - let mut buffer = vec![]; - - if let Err(err) = encoder.encode(&metrics, &mut buffer) { - let msg = format!("error handling /metrics request: {err}"); - error!(msg); - return render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR); - } - - match Response::builder() - .status(StatusCode::OK) - .header(CONTENT_TYPE, encoder.format_type()) - .body(Body::from(buffer)) - { - Ok(response) => response, - Err(err) => { - let msg = format!("error handling /metrics request: {err}"); - error!(msg); - render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR) - } - } - } - // Collect Postgres current usage insights - (&Method::GET, "/insights") => { - info!("serving /insights GET request"); - let status = compute.get_status(); - if status != ComputeStatus::Running { - let msg = format!("compute is not running, current status: {:?}", status); - error!(msg); - return Response::new(Body::from(msg)); - } - - let insights = compute.collect_insights().await; - Response::new(Body::from(insights)) - } - - (&Method::POST, "/check_writability") => { - info!("serving /check_writability POST request"); - let status = compute.get_status(); - if status != ComputeStatus::Running { - let msg = format!( - "invalid compute status for check_writability request: {:?}", - status - ); - error!(msg); - return Response::new(Body::from(msg)); - } - - let res = crate::checker::check_writability(compute).await; - match res { - Ok(_) => Response::new(Body::from("true")), - Err(e) => { - error!("check_writability failed: {}", e); - Response::new(Body::from(e.to_string())) - } - } - } - - (&Method::POST, "/extensions") => { - info!("serving /extensions POST request"); - let status = compute.get_status(); - if status != ComputeStatus::Running { - let msg = format!( - "invalid compute status for extensions request: {:?}", - status - ); - error!(msg); - return render_json_error(&msg, StatusCode::PRECONDITION_FAILED); - } - - let request = hyper::body::to_bytes(req.into_body()).await.unwrap(); - let request = serde_json::from_slice::(&request).unwrap(); - let res = compute - .install_extension(&request.extension, &request.database, request.version) - .await; - match res { - Ok(version) => render_json(Body::from( - serde_json::to_string(&ExtensionInstallResult { - extension: request.extension, - version, - }) - .unwrap(), - )), - Err(e) => { - error!("install_extension failed: {}", e); - render_json_error(&e.to_string(), StatusCode::INTERNAL_SERVER_ERROR) - } - } - } - - (&Method::GET, "/info") => { - let num_cpus = num_cpus::get_physical(); - info!("serving /info GET request. num_cpus: {}", num_cpus); - Response::new(Body::from( - serde_json::json!({ - "num_cpus": num_cpus, - }) - .to_string(), - )) - } - - // Accept spec in JSON format and request compute configuration. If - // anything goes wrong after we set the compute status to `ConfigurationPending` - // and update compute state with new spec, we basically leave compute - // in the potentially wrong state. That said, it's control-plane's - // responsibility to watch compute state after reconfiguration request - // and to clean restart in case of errors. - (&Method::POST, "/configure") => { - info!("serving /configure POST request"); - match handle_configure_request(req, compute).await { - Ok(msg) => Response::new(Body::from(msg)), - Err((msg, code)) => { - error!("error handling /configure request: {msg}"); - render_json_error(&msg, code) - } - } - } - - (&Method::POST, "/terminate") => { - info!("serving /terminate POST request"); - match handle_terminate_request(compute).await { - Ok(()) => Response::new(Body::empty()), - Err((msg, code)) => { - error!("error handling /terminate request: {msg}"); - render_json_error(&msg, code) - } - } - } - - (&Method::GET, "/dbs_and_roles") => { - info!("serving /dbs_and_roles GET request",); - match get_dbs_and_roles(compute).await { - Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())), - Err(_) => { - render_json_error("can't get dbs and roles", StatusCode::INTERNAL_SERVER_ERROR) - } - } - } - - (&Method::GET, "/database_schema") => { - let database = match must_get_query_param(&req, "database") { - Err(e) => return e.into_response(), - Ok(database) => database, - }; - info!("serving /database_schema GET request with database: {database}",); - match get_database_schema(compute, &database).await { - Ok(res) => render_plain(Body::wrap_stream(res)), - Err(SchemaDumpError::DatabaseDoesNotExist) => { - render_json_error("database does not exist", StatusCode::NOT_FOUND) - } - Err(e) => { - error!("can't get schema dump: {}", e); - render_json_error("can't get schema dump", StatusCode::INTERNAL_SERVER_ERROR) - } - } - } - - (&Method::POST, "/grants") => { - info!("serving /grants POST request"); - let status = compute.get_status(); - if status != ComputeStatus::Running { - let msg = format!( - "invalid compute status for set_role_grants request: {:?}", - status - ); - error!(msg); - return render_json_error(&msg, StatusCode::PRECONDITION_FAILED); - } - - let request = hyper::body::to_bytes(req.into_body()).await.unwrap(); - let request = serde_json::from_slice::(&request).unwrap(); - - let res = compute - .set_role_grants( - &request.database, - &request.schema, - &request.privileges, - &request.role, - ) - .await; - match res { - Ok(()) => render_json(Body::from( - serde_json::to_string(&SetRoleGrantsResponse { - database: request.database, - schema: request.schema, - role: request.role, - privileges: request.privileges, - }) - .unwrap(), - )), - Err(e) => render_json_error( - &format!("could not grant role privileges to the schema: {e}"), - // TODO: can we filter on role/schema not found errors - // and return appropriate error code? - StatusCode::INTERNAL_SERVER_ERROR, - ), - } - } - - // get the list of installed extensions - // currently only used in python tests - // TODO: call it from cplane - (&Method::GET, "/installed_extensions") => { - info!("serving /installed_extensions GET request"); - let status = compute.get_status(); - if status != ComputeStatus::Running { - let msg = format!( - "invalid compute status for extensions request: {:?}", - status - ); - error!(msg); - return Response::new(Body::from(msg)); - } - - let conf = compute.get_conn_conf(None); - let res = - task::spawn_blocking(move || installed_extensions::get_installed_extensions(conf)) - .await - .unwrap(); - - match res { - Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())), - Err(e) => render_json_error( - &format!("could not get list of installed extensions: {}", e), - StatusCode::INTERNAL_SERVER_ERROR, - ), - } - } - - (&Method::POST, "/failpoints") if cfg!(feature = "testing") => { - match failpoints_handler(req, CancellationToken::new()).await { - Ok(r) => r, - Err(ApiError::BadRequest(e)) => { - render_json_error(&e.to_string(), StatusCode::BAD_REQUEST) - } - Err(_) => { - render_json_error("Internal server error", StatusCode::INTERNAL_SERVER_ERROR) - } - } - } - - // download extension files from remote extension storage on demand - (&Method::POST, route) if route.starts_with("/extension_server/") => { - info!("serving {:?} POST request", route); - info!("req.uri {:?}", req.uri()); - - // don't even try to download extensions - // if no remote storage is configured - if compute.ext_remote_storage.is_none() { - info!("no extensions remote storage configured"); - let mut resp = Response::new(Body::from("no remote storage configured")); - *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - return resp; - } - - let mut is_library = false; - if let Some(params) = req.uri().query() { - info!("serving {:?} POST request with params: {}", route, params); - if params == "is_library=true" { - is_library = true; - } else { - let mut resp = Response::new(Body::from("Wrong request parameters")); - *resp.status_mut() = StatusCode::BAD_REQUEST; - return resp; - } - } - let filename = route.split('/').last().unwrap().to_string(); - info!("serving /extension_server POST request, filename: {filename:?} is_library: {is_library}"); - - // get ext_name and path from spec - // don't lock compute_state for too long - let ext = { - let compute_state = compute.state.lock().unwrap(); - let pspec = compute_state.pspec.as_ref().expect("spec must be set"); - let spec = &pspec.spec; - - // debug only - info!("spec: {:?}", spec); - - let remote_extensions = match spec.remote_extensions.as_ref() { - Some(r) => r, - None => { - info!("no remote extensions spec was provided"); - let mut resp = Response::new(Body::from("no remote storage configured")); - *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - return resp; - } - }; - - remote_extensions.get_ext( - &filename, - is_library, - &compute.build_tag, - &compute.pgversion, - ) - }; - - match ext { - Ok((ext_name, ext_path)) => { - match compute.download_extension(ext_name, ext_path).await { - Ok(_) => Response::new(Body::from("OK")), - Err(e) => { - error!("extension download failed: {}", e); - let mut resp = Response::new(Body::from(e.to_string())); - *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - resp - } - } - } - Err(e) => { - warn!("extension download failed to find extension: {}", e); - let mut resp = Response::new(Body::from("failed to find file")); - *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - resp - } - } - } - - // Return the `404 Not Found` for any other routes. - _ => { - let mut not_found = Response::new(Body::from("404 Not Found")); - *not_found.status_mut() = StatusCode::NOT_FOUND; - not_found - } - } -} - -async fn handle_configure_request( - req: Request, - compute: &Arc, -) -> Result { - if !compute.live_config_allowed { - return Err(( - "live configuration is not allowed for this compute node".to_string(), - StatusCode::PRECONDITION_FAILED, - )); - } - - let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap(); - let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap(); - if let Ok(request) = serde_json::from_str::(&spec_raw) { - let spec = request.spec; - - let parsed_spec = match ParsedSpec::try_from(spec) { - Ok(ps) => ps, - Err(msg) => return Err((msg, StatusCode::BAD_REQUEST)), - }; - - // XXX: wrap state update under lock in code blocks. Otherwise, - // we will try to `Send` `mut state` into the spawned thread - // bellow, which will cause error: - // ``` - // error: future cannot be sent between threads safely - // ``` - { - let mut state = compute.state.lock().unwrap(); - if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running { - let msg = format!( - "invalid compute status for configuration request: {:?}", - state.status.clone() - ); - return Err((msg, StatusCode::PRECONDITION_FAILED)); - } - state.pspec = Some(parsed_spec); - state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed); - drop(state); - info!("set new spec and notified waiters"); - } - - // Spawn a blocking thread to wait for compute to become Running. - // This is needed to do not block the main pool of workers and - // be able to serve other requests while some particular request - // is waiting for compute to finish configuration. - let c = compute.clone(); - task::spawn_blocking(move || { - let mut state = c.state.lock().unwrap(); - while state.status != ComputeStatus::Running { - state = c.state_changed.wait(state).unwrap(); - info!( - "waiting for compute to become Running, current status: {:?}", - state.status - ); - - if state.status == ComputeStatus::Failed { - let err = state.error.as_ref().map_or("unknown error", |x| x); - let msg = format!("compute configuration failed: {:?}", err); - return Err((msg, StatusCode::INTERNAL_SERVER_ERROR)); - } - } - - Ok(()) - }) - .await - .unwrap()?; - - // Return current compute state if everything went well. - let state = compute.state.lock().unwrap().clone(); - let status_response = status_response_from_state(&state); - Ok(serde_json::to_string(&status_response).unwrap()) - } else { - Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST)) - } -} - -fn render_json_error(e: &str, status: StatusCode) -> Response { - let error = GenericAPIError { - error: e.to_string(), - }; - Response::builder() - .status(status) - .header(CONTENT_TYPE, "application/json") - .body(Body::from(serde_json::to_string(&error).unwrap())) - .unwrap() -} - -fn render_json(body: Body) -> Response { - Response::builder() - .header(CONTENT_TYPE, "application/json") - .body(body) - .unwrap() -} - -fn render_plain(body: Body) -> Response { - Response::builder() - .header(CONTENT_TYPE, "text/plain") - .body(body) - .unwrap() -} - -async fn handle_terminate_request(compute: &Arc) -> Result<(), (String, StatusCode)> { - { - let mut state = compute.state.lock().unwrap(); - if state.status == ComputeStatus::Terminated { - return Ok(()); - } - if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running { - let msg = format!( - "invalid compute status for termination request: {}", - state.status - ); - return Err((msg, StatusCode::PRECONDITION_FAILED)); - } - state.set_status(ComputeStatus::TerminationPending, &compute.state_changed); - drop(state); - } - - forward_termination_signal(); - info!("sent signal and notified waiters"); - - // Spawn a blocking thread to wait for compute to become Terminated. - // This is needed to do not block the main pool of workers and - // be able to serve other requests while some particular request - // is waiting for compute to finish configuration. - let c = compute.clone(); - task::spawn_blocking(move || { - let mut state = c.state.lock().unwrap(); - while state.status != ComputeStatus::Terminated { - state = c.state_changed.wait(state).unwrap(); - info!( - "waiting for compute to become {}, current status: {:?}", - ComputeStatus::Terminated, - state.status - ); - } - - Ok(()) - }) - .await - .unwrap()?; - info!("terminated Postgres"); - Ok(()) -} - -// Main Hyper HTTP server function that runs it and blocks waiting on it forever. -#[tokio::main] -async fn serve(port: u16, state: Arc) { - // this usually binds to both IPv4 and IPv6 on linux - // see e.g. https://github.com/rust-lang/rust/pull/34440 - let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port); - - let make_service = make_service_fn(move |_conn| { - let state = state.clone(); - async move { - Ok::<_, Infallible>(service_fn(move |req: Request| { - let state = state.clone(); - async move { - Ok::<_, Infallible>( - // NOTE: We include the URI path in the string. It - // doesn't contain any variable parts or sensitive - // information in this API. - tracing_utils::http::tracing_handler( - req, - |req| routes(req, &state), - OtelName::UriPath, - ) - .await, - ) - } - })) - } - }); - - info!("starting HTTP server on {}", addr); - - let server = Server::bind(&addr).serve(make_service); - - // Run this server forever - if let Err(e) = server.await { - error!("server error: {}", e); - } -} - -/// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`. -pub fn launch_http_server(port: u16, state: &Arc) -> Result> { - let state = Arc::clone(state); - - Ok(thread::Builder::new() - .name("http-endpoint".into()) - .spawn(move || serve(port, state))?) -} diff --git a/compute_tools/src/http/extract/json.rs b/compute_tools/src/http/extract/json.rs new file mode 100644 index 0000000000..41f13625ad --- /dev/null +++ b/compute_tools/src/http/extract/json.rs @@ -0,0 +1,48 @@ +use std::ops::{Deref, DerefMut}; + +use axum::{ + async_trait, + extract::{rejection::JsonRejection, FromRequest, Request}, +}; +use compute_api::responses::GenericAPIError; +use http::StatusCode; + +/// Custom `Json` extractor, so that we can format errors into +/// `JsonResponse`. +#[derive(Debug, Clone, Copy, Default)] +pub(crate) struct Json(pub T); + +#[async_trait] +impl FromRequest for Json +where + axum::Json: FromRequest, + S: Send + Sync, +{ + type Rejection = (StatusCode, axum::Json); + + async fn from_request(req: Request, state: &S) -> Result { + match axum::Json::::from_request(req, state).await { + Ok(value) => Ok(Self(value.0)), + Err(rejection) => Err(( + rejection.status(), + axum::Json(GenericAPIError { + error: rejection.body_text().to_lowercase(), + }), + )), + } + } +} + +impl Deref for Json { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for Json { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} diff --git a/compute_tools/src/http/extract/mod.rs b/compute_tools/src/http/extract/mod.rs new file mode 100644 index 0000000000..1b690e444d --- /dev/null +++ b/compute_tools/src/http/extract/mod.rs @@ -0,0 +1,7 @@ +pub(crate) mod json; +pub(crate) mod path; +pub(crate) mod query; + +pub(crate) use json::Json; +pub(crate) use path::Path; +pub(crate) use query::Query; diff --git a/compute_tools/src/http/extract/path.rs b/compute_tools/src/http/extract/path.rs new file mode 100644 index 0000000000..95edc657f2 --- /dev/null +++ b/compute_tools/src/http/extract/path.rs @@ -0,0 +1,48 @@ +use std::ops::{Deref, DerefMut}; + +use axum::{ + async_trait, + extract::{rejection::PathRejection, FromRequestParts}, +}; +use compute_api::responses::GenericAPIError; +use http::{request::Parts, StatusCode}; + +/// Custom `Path` extractor, so that we can format errors into +/// `JsonResponse`. +#[derive(Debug, Clone, Copy, Default)] +pub(crate) struct Path(pub T); + +#[async_trait] +impl FromRequestParts for Path +where + axum::extract::Path: FromRequestParts, + S: Send + Sync, +{ + type Rejection = (StatusCode, axum::Json); + + async fn from_request_parts(parts: &mut Parts, state: &S) -> Result { + match axum::extract::Path::::from_request_parts(parts, state).await { + Ok(value) => Ok(Self(value.0)), + Err(rejection) => Err(( + rejection.status(), + axum::Json(GenericAPIError { + error: rejection.body_text().to_ascii_lowercase(), + }), + )), + } + } +} + +impl Deref for Path { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for Path { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} diff --git a/compute_tools/src/http/extract/query.rs b/compute_tools/src/http/extract/query.rs new file mode 100644 index 0000000000..a1f1b0cef0 --- /dev/null +++ b/compute_tools/src/http/extract/query.rs @@ -0,0 +1,48 @@ +use std::ops::{Deref, DerefMut}; + +use axum::{ + async_trait, + extract::{rejection::QueryRejection, FromRequestParts}, +}; +use compute_api::responses::GenericAPIError; +use http::{request::Parts, StatusCode}; + +/// Custom `Query` extractor, so that we can format errors into +/// `JsonResponse`. +#[derive(Debug, Clone, Copy, Default)] +pub(crate) struct Query(pub T); + +#[async_trait] +impl FromRequestParts for Query +where + axum::extract::Query: FromRequestParts, + S: Send + Sync, +{ + type Rejection = (StatusCode, axum::Json); + + async fn from_request_parts(parts: &mut Parts, state: &S) -> Result { + match axum::extract::Query::::from_request_parts(parts, state).await { + Ok(value) => Ok(Self(value.0)), + Err(rejection) => Err(( + rejection.status(), + axum::Json(GenericAPIError { + error: rejection.body_text().to_ascii_lowercase(), + }), + )), + } + } +} + +impl Deref for Query { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for Query { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} diff --git a/compute_tools/src/http/mod.rs b/compute_tools/src/http/mod.rs index e5fdf85eed..a596bea504 100644 --- a/compute_tools/src/http/mod.rs +++ b/compute_tools/src/http/mod.rs @@ -1 +1,56 @@ -pub mod api; +use axum::{body::Body, response::Response}; +use compute_api::responses::{ComputeStatus, GenericAPIError}; +use http::{header::CONTENT_TYPE, StatusCode}; +use serde::Serialize; +use tracing::error; + +pub use server::launch_http_server; + +mod extract; +mod routes; +mod server; + +/// Convenience response builder for JSON responses +struct JsonResponse; + +impl JsonResponse { + /// Helper for actually creating a response + fn create_response(code: StatusCode, body: impl Serialize) -> Response { + Response::builder() + .status(code) + .header(CONTENT_TYPE.as_str(), "application/json") + .body(Body::from(serde_json::to_string(&body).unwrap())) + .unwrap() + } + + /// Create a successful error response + pub(self) fn success(code: StatusCode, body: impl Serialize) -> Response { + assert!({ + let code = code.as_u16(); + + (200..300).contains(&code) + }); + + Self::create_response(code, body) + } + + /// Create an error response + pub(self) fn error(code: StatusCode, error: impl ToString) -> Response { + assert!(code.as_u16() >= 400); + + let message = error.to_string(); + error!(message); + + Self::create_response(code, &GenericAPIError { error: message }) + } + + /// Create an error response related to the compute being in an invalid state + pub(self) fn invalid_status(status: ComputeStatus) -> Response { + Self::create_response( + StatusCode::PRECONDITION_FAILED, + &GenericAPIError { + error: format!("invalid compute status: {status}"), + }, + ) + } +} diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index 24a67cac71..50319cdd85 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -37,7 +37,7 @@ paths: schema: $ref: "#/components/schemas/ComputeMetrics" - /metrics + /metrics: get: tags: - Info diff --git a/compute_tools/src/http/routes/check_writability.rs b/compute_tools/src/http/routes/check_writability.rs new file mode 100644 index 0000000000..d7feb055e9 --- /dev/null +++ b/compute_tools/src/http/routes/check_writability.rs @@ -0,0 +1,20 @@ +use std::sync::Arc; + +use axum::{extract::State, response::Response}; +use compute_api::responses::ComputeStatus; +use http::StatusCode; + +use crate::{checker::check_writability, compute::ComputeNode, http::JsonResponse}; + +/// Check that the compute is currently running. +pub(in crate::http) async fn is_writable(State(compute): State>) -> Response { + let status = compute.get_status(); + if status != ComputeStatus::Running { + return JsonResponse::invalid_status(status); + } + + match check_writability(&compute).await { + Ok(_) => JsonResponse::success(StatusCode::OK, true), + Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e), + } +} diff --git a/compute_tools/src/http/routes/configure.rs b/compute_tools/src/http/routes/configure.rs new file mode 100644 index 0000000000..2546cbc344 --- /dev/null +++ b/compute_tools/src/http/routes/configure.rs @@ -0,0 +1,91 @@ +use std::sync::Arc; + +use axum::{extract::State, response::Response}; +use compute_api::{ + requests::ConfigurationRequest, + responses::{ComputeStatus, ComputeStatusResponse}, +}; +use http::StatusCode; +use tokio::task; +use tracing::info; + +use crate::{ + compute::{ComputeNode, ParsedSpec}, + http::{extract::Json, JsonResponse}, +}; + +// Accept spec in JSON format and request compute configuration. If anything +// goes wrong after we set the compute status to `ConfigurationPending` and +// update compute state with new spec, we basically leave compute in the +// potentially wrong state. That said, it's control-plane's responsibility to +// watch compute state after reconfiguration request and to clean restart in +// case of errors. +pub(in crate::http) async fn configure( + State(compute): State>, + request: Json, +) -> Response { + if !compute.live_config_allowed { + return JsonResponse::error( + StatusCode::PRECONDITION_FAILED, + "live configuration is not allowed for this compute node".to_string(), + ); + } + + let pspec = match ParsedSpec::try_from(request.spec.clone()) { + Ok(p) => p, + Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e), + }; + + // XXX: wrap state update under lock in a code block. Otherwise, we will try + // to `Send` `mut state` into the spawned thread bellow, which will cause + // the following rustc error: + // + // error: future cannot be sent between threads safely + { + let mut state = compute.state.lock().unwrap(); + if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) { + return JsonResponse::invalid_status(state.status); + } + + state.pspec = Some(pspec); + state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed); + drop(state); + } + + // Spawn a blocking thread to wait for compute to become Running. This is + // needed to do not block the main pool of workers and be able to serve + // other requests while some particular request is waiting for compute to + // finish configuration. + let c = compute.clone(); + let completed = task::spawn_blocking(move || { + let mut state = c.state.lock().unwrap(); + while state.status != ComputeStatus::Running { + state = c.state_changed.wait(state).unwrap(); + info!( + "waiting for compute to become {}, current status: {}", + ComputeStatus::Running, + state.status + ); + + if state.status == ComputeStatus::Failed { + let err = state.error.as_ref().map_or("unknown error", |x| x); + let msg = format!("compute configuration failed: {:?}", err); + return Err(msg); + } + } + + Ok(()) + }) + .await + .unwrap(); + + if let Err(e) = completed { + return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e); + } + + // Return current compute state if everything went well. + let state = compute.state.lock().unwrap().clone(); + let body = ComputeStatusResponse::from(&state); + + JsonResponse::success(StatusCode::OK, body) +} diff --git a/compute_tools/src/http/routes/database_schema.rs b/compute_tools/src/http/routes/database_schema.rs new file mode 100644 index 0000000000..fd716272dc --- /dev/null +++ b/compute_tools/src/http/routes/database_schema.rs @@ -0,0 +1,34 @@ +use std::sync::Arc; + +use axum::{body::Body, extract::State, response::Response}; +use http::{header::CONTENT_TYPE, StatusCode}; +use serde::Deserialize; + +use crate::{ + catalog::{get_database_schema, SchemaDumpError}, + compute::ComputeNode, + http::{extract::Query, JsonResponse}, +}; + +#[derive(Debug, Clone, Deserialize)] +pub(in crate::http) struct DatabaseSchemaParams { + database: String, +} + +/// Get a schema dump of the requested database. +pub(in crate::http) async fn get_schema_dump( + params: Query, + State(compute): State>, +) -> Response { + match get_database_schema(&compute, ¶ms.database).await { + Ok(schema) => Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE.as_str(), "application/json") + .body(Body::from_stream(schema)) + .unwrap(), + Err(SchemaDumpError::DatabaseDoesNotExist) => { + JsonResponse::error(StatusCode::NOT_FOUND, SchemaDumpError::DatabaseDoesNotExist) + } + Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e), + } +} diff --git a/compute_tools/src/http/routes/dbs_and_roles.rs b/compute_tools/src/http/routes/dbs_and_roles.rs new file mode 100644 index 0000000000..4843c3fab4 --- /dev/null +++ b/compute_tools/src/http/routes/dbs_and_roles.rs @@ -0,0 +1,16 @@ +use std::sync::Arc; + +use axum::{extract::State, response::Response}; +use http::StatusCode; + +use crate::{catalog::get_dbs_and_roles, compute::ComputeNode, http::JsonResponse}; + +/// Get the databases and roles from the compute. +pub(in crate::http) async fn get_catalog_objects( + State(compute): State>, +) -> Response { + match get_dbs_and_roles(&compute).await { + Ok(catalog_objects) => JsonResponse::success(StatusCode::OK, catalog_objects), + Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e), + } +} diff --git a/compute_tools/src/http/routes/extension_server.rs b/compute_tools/src/http/routes/extension_server.rs new file mode 100644 index 0000000000..ee5bc675ba --- /dev/null +++ b/compute_tools/src/http/routes/extension_server.rs @@ -0,0 +1,67 @@ +use std::sync::Arc; + +use axum::{ + extract::State, + response::{IntoResponse, Response}, +}; +use http::StatusCode; +use serde::Deserialize; + +use crate::{ + compute::ComputeNode, + http::{ + extract::{Path, Query}, + JsonResponse, + }, +}; + +#[derive(Debug, Clone, Deserialize)] +pub(in crate::http) struct ExtensionServerParams { + is_library: Option, +} + +/// Download a remote extension. +pub(in crate::http) async fn download_extension( + Path(filename): Path, + params: Query, + State(compute): State>, +) -> Response { + // Don't even try to download extensions if no remote storage is configured + if compute.ext_remote_storage.is_none() { + return JsonResponse::error( + StatusCode::PRECONDITION_FAILED, + "remote storage is not configured", + ); + } + + let ext = { + let state = compute.state.lock().unwrap(); + let pspec = state.pspec.as_ref().unwrap(); + let spec = &pspec.spec; + + let remote_extensions = match spec.remote_extensions.as_ref() { + Some(r) => r, + None => { + return JsonResponse::error( + StatusCode::CONFLICT, + "information about remote extensions is unavailable", + ); + } + }; + + remote_extensions.get_ext( + &filename, + params.is_library.unwrap_or(false), + &compute.build_tag, + &compute.pgversion, + ) + }; + + match ext { + Ok((ext_name, ext_path)) => match compute.download_extension(ext_name, ext_path).await { + Ok(_) => StatusCode::OK.into_response(), + Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e), + }, + Err(e) => JsonResponse::error(StatusCode::NOT_FOUND, e), + } +} diff --git a/compute_tools/src/http/routes/extensions.rs b/compute_tools/src/http/routes/extensions.rs new file mode 100644 index 0000000000..1fc03b9109 --- /dev/null +++ b/compute_tools/src/http/routes/extensions.rs @@ -0,0 +1,45 @@ +use std::sync::Arc; + +use axum::{extract::State, response::Response}; +use compute_api::{ + requests::ExtensionInstallRequest, + responses::{ComputeStatus, ExtensionInstallResponse}, +}; +use http::StatusCode; + +use crate::{ + compute::ComputeNode, + http::{extract::Json, JsonResponse}, +}; + +/// Install a extension. +pub(in crate::http) async fn install_extension( + State(compute): State>, + request: Json, +) -> Response { + let status = compute.get_status(); + if status != ComputeStatus::Running { + return JsonResponse::invalid_status(status); + } + + match compute + .install_extension( + &request.extension, + &request.database, + request.version.to_string(), + ) + .await + { + Ok(version) => JsonResponse::success( + StatusCode::CREATED, + Some(ExtensionInstallResponse { + extension: request.extension.clone(), + version, + }), + ), + Err(e) => JsonResponse::error( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to install extension: {e}"), + ), + } +} diff --git a/compute_tools/src/http/routes/failpoints.rs b/compute_tools/src/http/routes/failpoints.rs new file mode 100644 index 0000000000..2ec4511676 --- /dev/null +++ b/compute_tools/src/http/routes/failpoints.rs @@ -0,0 +1,35 @@ +use axum::response::{IntoResponse, Response}; +use http::StatusCode; +use tracing::info; +use utils::failpoint_support::{apply_failpoint, ConfigureFailpointsRequest}; + +use crate::http::{extract::Json, JsonResponse}; + +/// Configure failpoints for testing purposes. +pub(in crate::http) async fn configure_failpoints( + failpoints: Json, +) -> Response { + if !fail::has_failpoints() { + return JsonResponse::error( + StatusCode::PRECONDITION_FAILED, + "Cannot manage failpoints because neon was compiled without failpoints support", + ); + } + + for fp in &*failpoints { + info!("cfg failpoint: {} {}", fp.name, fp.actions); + + // We recognize one extra "action" that's not natively recognized + // by the failpoints crate: exit, to immediately kill the process + let cfg_result = apply_failpoint(&fp.name, &fp.actions); + + if let Err(e) = cfg_result { + return JsonResponse::error( + StatusCode::BAD_REQUEST, + format!("failed to configure failpoints: {e}"), + ); + } + } + + StatusCode::OK.into_response() +} diff --git a/compute_tools/src/http/routes/grants.rs b/compute_tools/src/http/routes/grants.rs new file mode 100644 index 0000000000..3f67f011e5 --- /dev/null +++ b/compute_tools/src/http/routes/grants.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; + +use axum::{extract::State, response::Response}; +use compute_api::{ + requests::SetRoleGrantsRequest, + responses::{ComputeStatus, SetRoleGrantsResponse}, +}; +use http::StatusCode; + +use crate::{ + compute::ComputeNode, + http::{extract::Json, JsonResponse}, +}; + +/// Add grants for a role. +pub(in crate::http) async fn add_grant( + State(compute): State>, + request: Json, +) -> Response { + let status = compute.get_status(); + if status != ComputeStatus::Running { + return JsonResponse::invalid_status(status); + } + + match compute + .set_role_grants( + &request.database, + &request.schema, + &request.privileges, + &request.role, + ) + .await + { + Ok(()) => JsonResponse::success( + StatusCode::CREATED, + Some(SetRoleGrantsResponse { + database: request.database.clone(), + schema: request.schema.clone(), + role: request.role.clone(), + privileges: request.privileges.clone(), + }), + ), + Err(e) => JsonResponse::error( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to grant role privileges to the schema: {e}"), + ), + } +} diff --git a/compute_tools/src/http/routes/info.rs b/compute_tools/src/http/routes/info.rs new file mode 100644 index 0000000000..32d6fea74c --- /dev/null +++ b/compute_tools/src/http/routes/info.rs @@ -0,0 +1,11 @@ +use axum::response::Response; +use compute_api::responses::InfoResponse; +use http::StatusCode; + +use crate::http::JsonResponse; + +/// Get information about the physical characteristics about the compute. +pub(in crate::http) async fn get_info() -> Response { + let num_cpus = num_cpus::get_physical(); + JsonResponse::success(StatusCode::OK, &InfoResponse { num_cpus }) +} diff --git a/compute_tools/src/http/routes/insights.rs b/compute_tools/src/http/routes/insights.rs new file mode 100644 index 0000000000..6b03a461c3 --- /dev/null +++ b/compute_tools/src/http/routes/insights.rs @@ -0,0 +1,18 @@ +use std::sync::Arc; + +use axum::{extract::State, response::Response}; +use compute_api::responses::ComputeStatus; +use http::StatusCode; + +use crate::{compute::ComputeNode, http::JsonResponse}; + +/// Collect current Postgres usage insights. +pub(in crate::http) async fn get_insights(State(compute): State>) -> Response { + let status = compute.get_status(); + if status != ComputeStatus::Running { + return JsonResponse::invalid_status(status); + } + + let insights = compute.collect_insights().await; + JsonResponse::success(StatusCode::OK, insights) +} diff --git a/compute_tools/src/http/routes/installed_extensions.rs b/compute_tools/src/http/routes/installed_extensions.rs new file mode 100644 index 0000000000..db74a6b195 --- /dev/null +++ b/compute_tools/src/http/routes/installed_extensions.rs @@ -0,0 +1,33 @@ +use std::sync::Arc; + +use axum::{extract::State, response::Response}; +use compute_api::responses::ComputeStatus; +use http::StatusCode; +use tokio::task; + +use crate::{compute::ComputeNode, http::JsonResponse, installed_extensions}; + +/// Get a list of installed extensions. +pub(in crate::http) async fn get_installed_extensions( + State(compute): State>, +) -> Response { + let status = compute.get_status(); + if status != ComputeStatus::Running { + return JsonResponse::invalid_status(status); + } + + let conf = compute.get_conn_conf(None); + let res = task::spawn_blocking(move || installed_extensions::get_installed_extensions(conf)) + .await + .unwrap(); + + match res { + Ok(installed_extensions) => { + JsonResponse::success(StatusCode::OK, Some(installed_extensions)) + } + Err(e) => JsonResponse::error( + StatusCode::INTERNAL_SERVER_ERROR, + format!("failed to get list of installed extensions: {e}"), + ), + } +} diff --git a/compute_tools/src/http/routes/metrics.rs b/compute_tools/src/http/routes/metrics.rs new file mode 100644 index 0000000000..40d71b5de7 --- /dev/null +++ b/compute_tools/src/http/routes/metrics.rs @@ -0,0 +1,32 @@ +use axum::{body::Body, response::Response}; +use http::header::CONTENT_TYPE; +use http::StatusCode; +use metrics::proto::MetricFamily; +use metrics::Encoder; +use metrics::TextEncoder; + +use crate::{http::JsonResponse, installed_extensions}; + +/// Expose Prometheus metrics. +pub(in crate::http) async fn get_metrics() -> Response { + // When we call TextEncoder::encode() below, it will immediately return an + // error if a metric family has no metrics, so we need to preemptively + // filter out metric families with no metrics. + let metrics = installed_extensions::collect() + .into_iter() + .filter(|m| !m.get_metric().is_empty()) + .collect::>(); + + let encoder = TextEncoder::new(); + let mut buffer = vec![]; + + if let Err(e) = encoder.encode(&metrics, &mut buffer) { + return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e); + } + + Response::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, encoder.format_type()) + .body(Body::from(buffer)) + .unwrap() +} diff --git a/compute_tools/src/http/routes/metrics_json.rs b/compute_tools/src/http/routes/metrics_json.rs new file mode 100644 index 0000000000..0709db5011 --- /dev/null +++ b/compute_tools/src/http/routes/metrics_json.rs @@ -0,0 +1,12 @@ +use std::sync::Arc; + +use axum::{extract::State, response::Response}; +use http::StatusCode; + +use crate::{compute::ComputeNode, http::JsonResponse}; + +/// Get startup metrics. +pub(in crate::http) async fn get_metrics(State(compute): State>) -> Response { + let metrics = compute.state.lock().unwrap().metrics.clone(); + JsonResponse::success(StatusCode::OK, metrics) +} diff --git a/compute_tools/src/http/routes/mod.rs b/compute_tools/src/http/routes/mod.rs new file mode 100644 index 0000000000..3efa1153ad --- /dev/null +++ b/compute_tools/src/http/routes/mod.rs @@ -0,0 +1,38 @@ +use compute_api::responses::ComputeStatusResponse; + +use crate::compute::ComputeState; + +pub(in crate::http) mod check_writability; +pub(in crate::http) mod configure; +pub(in crate::http) mod database_schema; +pub(in crate::http) mod dbs_and_roles; +pub(in crate::http) mod extension_server; +pub(in crate::http) mod extensions; +pub(in crate::http) mod failpoints; +pub(in crate::http) mod grants; +pub(in crate::http) mod info; +pub(in crate::http) mod insights; +pub(in crate::http) mod installed_extensions; +pub(in crate::http) mod metrics; +pub(in crate::http) mod metrics_json; +pub(in crate::http) mod status; +pub(in crate::http) mod terminate; + +impl From<&ComputeState> for ComputeStatusResponse { + fn from(state: &ComputeState) -> Self { + ComputeStatusResponse { + start_time: state.start_time, + tenant: state + .pspec + .as_ref() + .map(|pspec| pspec.tenant_id.to_string()), + timeline: state + .pspec + .as_ref() + .map(|pspec| pspec.timeline_id.to_string()), + status: state.status, + last_active: state.last_active, + error: state.error.clone(), + } + } +} diff --git a/compute_tools/src/http/routes/status.rs b/compute_tools/src/http/routes/status.rs new file mode 100644 index 0000000000..d64d53a58f --- /dev/null +++ b/compute_tools/src/http/routes/status.rs @@ -0,0 +1,14 @@ +use std::{ops::Deref, sync::Arc}; + +use axum::{extract::State, http::StatusCode, response::Response}; +use compute_api::responses::ComputeStatusResponse; + +use crate::{compute::ComputeNode, http::JsonResponse}; + +/// Retrieve the state of the comute. +pub(in crate::http) async fn get_status(State(compute): State>) -> Response { + let state = compute.state.lock().unwrap(); + let body = ComputeStatusResponse::from(state.deref()); + + JsonResponse::success(StatusCode::OK, body) +} diff --git a/compute_tools/src/http/routes/terminate.rs b/compute_tools/src/http/routes/terminate.rs new file mode 100644 index 0000000000..7acd84f236 --- /dev/null +++ b/compute_tools/src/http/routes/terminate.rs @@ -0,0 +1,58 @@ +use std::sync::Arc; + +use axum::{ + extract::State, + response::{IntoResponse, Response}, +}; +use compute_api::responses::ComputeStatus; +use http::StatusCode; +use tokio::task; +use tracing::info; + +use crate::{ + compute::{forward_termination_signal, ComputeNode}, + http::JsonResponse, +}; + +/// Terminate the compute. +pub(in crate::http) async fn terminate(State(compute): State>) -> Response { + { + let mut state = compute.state.lock().unwrap(); + if state.status == ComputeStatus::Terminated { + return StatusCode::CREATED.into_response(); + } + + if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) { + return JsonResponse::invalid_status(state.status); + } + + state.set_status(ComputeStatus::TerminationPending, &compute.state_changed); + drop(state); + } + + forward_termination_signal(); + info!("sent signal and notified waiters"); + + // Spawn a blocking thread to wait for compute to become Terminated. + // This is needed to do not block the main pool of workers and + // be able to serve other requests while some particular request + // is waiting for compute to finish configuration. + let c = compute.clone(); + task::spawn_blocking(move || { + let mut state = c.state.lock().unwrap(); + while state.status != ComputeStatus::Terminated { + state = c.state_changed.wait(state).unwrap(); + info!( + "waiting for compute to become {}, current status: {:?}", + ComputeStatus::Terminated, + state.status + ); + } + }) + .await + .unwrap(); + + info!("terminated Postgres"); + + StatusCode::OK.into_response() +} diff --git a/compute_tools/src/http/server.rs b/compute_tools/src/http/server.rs new file mode 100644 index 0000000000..33d4b489a0 --- /dev/null +++ b/compute_tools/src/http/server.rs @@ -0,0 +1,165 @@ +use std::{ + net::{IpAddr, Ipv6Addr, SocketAddr}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + thread, + time::Duration, +}; + +use anyhow::Result; +use axum::{ + response::{IntoResponse, Response}, + routing::{get, post}, + Router, +}; +use http::StatusCode; +use tokio::net::TcpListener; +use tower::ServiceBuilder; +use tower_http::{ + request_id::{MakeRequestId, PropagateRequestIdLayer, RequestId, SetRequestIdLayer}, + trace::TraceLayer, +}; +use tracing::{debug, error, info, Span}; + +use super::routes::{ + check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions, + grants, info as info_route, insights, installed_extensions, metrics, metrics_json, status, + terminate, +}; +use crate::compute::ComputeNode; + +async fn handle_404() -> Response { + StatusCode::NOT_FOUND.into_response() +} + +#[derive(Clone, Default)] +struct ComputeMakeRequestId(Arc); + +impl MakeRequestId for ComputeMakeRequestId { + fn make_request_id( + &mut self, + _request: &http::Request, + ) -> Option { + let request_id = self + .0 + .fetch_add(1, Ordering::SeqCst) + .to_string() + .parse() + .unwrap(); + + Some(RequestId::new(request_id)) + } +} + +/// Run the HTTP server and wait on it forever. +#[tokio::main] +async fn serve(port: u16, compute: Arc) { + const X_REQUEST_ID: &str = "x-request-id"; + + let mut app = Router::new() + .route("/check_writability", post(check_writability::is_writable)) + .route("/configure", post(configure::configure)) + .route("/database_schema", get(database_schema::get_schema_dump)) + .route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects)) + .route( + "/extension_server/*filename", + post(extension_server::download_extension), + ) + .route("/extensions", post(extensions::install_extension)) + .route("/grants", post(grants::add_grant)) + .route("/info", get(info_route::get_info)) + .route("/insights", get(insights::get_insights)) + .route( + "/installed_extensions", + get(installed_extensions::get_installed_extensions), + ) + .route("/metrics", get(metrics::get_metrics)) + .route("/metrics.json", get(metrics_json::get_metrics)) + .route("/status", get(status::get_status)) + .route("/terminate", post(terminate::terminate)) + .fallback(handle_404) + .layer( + ServiceBuilder::new() + .layer(SetRequestIdLayer::x_request_id( + ComputeMakeRequestId::default(), + )) + .layer( + TraceLayer::new_for_http() + .on_request(|request: &http::Request<_>, _span: &Span| { + let request_id = request + .headers() + .get(X_REQUEST_ID) + .unwrap() + .to_str() + .unwrap(); + + match request.uri().path() { + "/metrics" => { + debug!(%request_id, "{} {}", request.method(), request.uri()) + } + _ => info!(%request_id, "{} {}", request.method(), request.uri()), + }; + }) + .on_response( + |response: &http::Response<_>, latency: Duration, _span: &Span| { + let request_id = response + .headers() + .get(X_REQUEST_ID) + .unwrap() + .to_str() + .unwrap(); + + info!( + %request_id, + code = response.status().as_u16(), + latency = latency.as_millis() + ) + }, + ), + ) + .layer(PropagateRequestIdLayer::x_request_id()), + ) + .with_state(compute); + + // Add in any testing support + if cfg!(feature = "testing") { + use super::routes::failpoints; + + app = app.route("/failpoints", post(failpoints::configure_failpoints)) + } + + // This usually binds to both IPv4 and IPv6 on Linux, see + // https://github.com/rust-lang/rust/pull/34440 for more information + let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port); + let listener = match TcpListener::bind(&addr).await { + Ok(listener) => listener, + Err(e) => { + error!( + "failed to bind the compute_ctl HTTP server to port {}: {}", + port, e + ); + return; + } + }; + + if let Ok(local_addr) = listener.local_addr() { + info!("compute_ctl HTTP server listening on {}", local_addr); + } else { + info!("compute_ctl HTTP server listening on port {}", port); + } + + if let Err(e) = axum::serve(listener, app).await { + error!("compute_ctl HTTP server error: {}", e); + } +} + +/// Launch a separate HTTP server thread and return its `JoinHandle`. +pub fn launch_http_server(port: u16, state: &Arc) -> Result> { + let state = Arc::clone(state); + + Ok(thread::Builder::new() + .name("http-server".into()) + .spawn(move || serve(port, state))?) +} diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index ee4cf2dfa5..12fea4e61a 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -3,8 +3,6 @@ #![deny(unsafe_code)] #![deny(clippy::undocumented_unsafe_blocks)] -extern crate hyper0 as hyper; - pub mod checker; pub mod config; pub mod configurator; diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 5e47ec4811..b8027abf7c 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -62,7 +62,7 @@ use crate::local_env::LocalEnv; use crate::postgresql_conf::PostgresConf; use crate::storage_controller::StorageController; -use compute_api::responses::{ComputeState, ComputeStatus}; +use compute_api::responses::{ComputeStatus, ComputeStatusResponse}; use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec}; // contents of a endpoint.json file @@ -739,7 +739,7 @@ impl Endpoint { } // Call the /status HTTP API - pub async fn get_status(&self) -> Result { + pub async fn get_status(&self) -> Result { let client = reqwest::Client::new(); let response = client diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 0d65f6a38d..9ce605089b 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -15,6 +15,17 @@ pub struct GenericAPIError { pub error: String, } +#[derive(Debug, Clone, Serialize)] +pub struct InfoResponse { + pub num_cpus: usize, +} + +#[derive(Debug, Clone, Serialize)] +pub struct ExtensionInstallResponse { + pub extension: PgIdent, + pub version: ExtVersion, +} + /// Response of the /status API #[derive(Serialize, Debug, Deserialize)] #[serde(rename_all = "snake_case")] @@ -28,16 +39,6 @@ pub struct ComputeStatusResponse { pub error: Option, } -#[derive(Deserialize, Serialize)] -#[serde(rename_all = "snake_case")] -pub struct ComputeState { - pub status: ComputeStatus, - /// Timestamp of the last Postgres activity - #[serde(serialize_with = "rfc3339_serialize")] - pub last_active: Option>, - pub error: Option, -} - #[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum ComputeStatus { @@ -78,7 +79,7 @@ impl Display for ComputeStatus { } } -fn rfc3339_serialize(x: &Option>, s: S) -> Result +pub fn rfc3339_serialize(x: &Option>, s: S) -> Result where S: Serializer, { diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index 33bdc25785..0ffeeead18 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -91,7 +91,8 @@ tokio-stream = { version = "0.1", features = ["net"] } tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] } toml_edit = { version = "0.22", features = ["serde"] } tonic = { version = "0.12", features = ["tls-roots"] } -tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "log", "util"] } +tower-9fbad63c4bcf4a8f = { package = "tower", version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "util"] } +tower-d8f496e17d97b5cb = { package = "tower", version = "0.5", default-features = false, features = ["log", "make", "util"] } tracing = { version = "0.1", features = ["log"] } tracing-core = { version = "0.1" } url = { version = "2", features = ["serde"] }