mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
Implement compute_ctl management API in Axum (#10099)
This is a refactor to create better abstractions related to our management server. It cleans up the code, and prepares everything for authorized communication to and from the control plane. Signed-off-by: Tristan Partin <tristan@neon.tech>
This commit is contained in:
105
Cargo.lock
generated
105
Cargo.lock
generated
@@ -718,13 +718,13 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "axum"
|
name = "axum"
|
||||||
version = "0.7.5"
|
version = "0.7.9"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf"
|
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"axum-core",
|
"axum-core",
|
||||||
"base64 0.21.1",
|
"base64 0.22.1",
|
||||||
"bytes",
|
"bytes",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"http 1.1.0",
|
"http 1.1.0",
|
||||||
@@ -746,8 +746,8 @@ dependencies = [
|
|||||||
"sha1",
|
"sha1",
|
||||||
"sync_wrapper 1.0.1",
|
"sync_wrapper 1.0.1",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-tungstenite",
|
"tokio-tungstenite 0.24.0",
|
||||||
"tower",
|
"tower 0.5.2",
|
||||||
"tower-layer",
|
"tower-layer",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
"tracing",
|
"tracing",
|
||||||
@@ -1267,6 +1267,7 @@ dependencies = [
|
|||||||
"aws-config",
|
"aws-config",
|
||||||
"aws-sdk-kms",
|
"aws-sdk-kms",
|
||||||
"aws-sdk-s3",
|
"aws-sdk-s3",
|
||||||
|
"axum",
|
||||||
"base64 0.13.1",
|
"base64 0.13.1",
|
||||||
"bytes",
|
"bytes",
|
||||||
"camino",
|
"camino",
|
||||||
@@ -1277,7 +1278,7 @@ dependencies = [
|
|||||||
"fail",
|
"fail",
|
||||||
"flate2",
|
"flate2",
|
||||||
"futures",
|
"futures",
|
||||||
"hyper 0.14.30",
|
"http 1.1.0",
|
||||||
"metrics",
|
"metrics",
|
||||||
"nix 0.27.1",
|
"nix 0.27.1",
|
||||||
"notify",
|
"notify",
|
||||||
@@ -1303,6 +1304,8 @@ dependencies = [
|
|||||||
"tokio-postgres",
|
"tokio-postgres",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
|
"tower 0.5.2",
|
||||||
|
"tower-http",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-opentelemetry",
|
"tracing-opentelemetry",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
@@ -2720,7 +2723,7 @@ dependencies = [
|
|||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"socket2",
|
"socket2",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tower",
|
"tower 0.4.13",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
@@ -3260,9 +3263,9 @@ checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "matchit"
|
name = "matchit"
|
||||||
version = "0.8.2"
|
version = "0.8.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "540f1c43aed89909c0cc0cc604e3bb2f7e7a341a3728a9e6cfe760e733cd11ed"
|
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "md-5"
|
name = "md-5"
|
||||||
@@ -4758,7 +4761,7 @@ dependencies = [
|
|||||||
"tokio-postgres",
|
"tokio-postgres",
|
||||||
"tokio-postgres2",
|
"tokio-postgres2",
|
||||||
"tokio-rustls 0.26.0",
|
"tokio-rustls 0.26.0",
|
||||||
"tokio-tungstenite",
|
"tokio-tungstenite 0.21.0",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
@@ -5186,7 +5189,7 @@ dependencies = [
|
|||||||
"async-trait",
|
"async-trait",
|
||||||
"getrandom 0.2.11",
|
"getrandom 0.2.11",
|
||||||
"http 1.1.0",
|
"http 1.1.0",
|
||||||
"matchit 0.8.2",
|
"matchit 0.8.4",
|
||||||
"opentelemetry",
|
"opentelemetry",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"reqwest-middleware",
|
"reqwest-middleware",
|
||||||
@@ -6800,7 +6803,19 @@ dependencies = [
|
|||||||
"futures-util",
|
"futures-util",
|
||||||
"log",
|
"log",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tungstenite",
|
"tungstenite 0.21.0",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio-tungstenite"
|
||||||
|
version = "0.24.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9"
|
||||||
|
dependencies = [
|
||||||
|
"futures-util",
|
||||||
|
"log",
|
||||||
|
"tokio",
|
||||||
|
"tungstenite 0.24.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -6881,7 +6896,7 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
"tokio-rustls 0.26.0",
|
"tokio-rustls 0.26.0",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"tower",
|
"tower 0.4.13",
|
||||||
"tower-layer",
|
"tower-layer",
|
||||||
"tower-service",
|
"tower-service",
|
||||||
"tracing",
|
"tracing",
|
||||||
@@ -6922,16 +6937,49 @@ dependencies = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-layer"
|
name = "tower"
|
||||||
version = "0.3.2"
|
version = "0.5.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
|
checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
|
||||||
|
dependencies = [
|
||||||
|
"futures-core",
|
||||||
|
"futures-util",
|
||||||
|
"pin-project-lite",
|
||||||
|
"sync_wrapper 1.0.1",
|
||||||
|
"tokio",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
"tracing",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tower-http"
|
||||||
|
version = "0.6.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags 2.4.1",
|
||||||
|
"bytes",
|
||||||
|
"http 1.1.0",
|
||||||
|
"http-body 1.0.0",
|
||||||
|
"pin-project-lite",
|
||||||
|
"tower-layer",
|
||||||
|
"tower-service",
|
||||||
|
"tracing",
|
||||||
|
"uuid",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tower-layer"
|
||||||
|
version = "0.3.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tower-service"
|
name = "tower-service"
|
||||||
version = "0.3.2"
|
version = "0.3.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
|
checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tracing"
|
name = "tracing"
|
||||||
@@ -7086,6 +7134,24 @@ dependencies = [
|
|||||||
"utf-8",
|
"utf-8",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tungstenite"
|
||||||
|
version = "0.24.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a"
|
||||||
|
dependencies = [
|
||||||
|
"byteorder",
|
||||||
|
"bytes",
|
||||||
|
"data-encoding",
|
||||||
|
"http 1.1.0",
|
||||||
|
"httparse",
|
||||||
|
"log",
|
||||||
|
"rand 0.8.5",
|
||||||
|
"sha1",
|
||||||
|
"thiserror",
|
||||||
|
"utf-8",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "twox-hash"
|
name = "twox-hash"
|
||||||
version = "1.6.3"
|
version = "1.6.3"
|
||||||
@@ -7867,7 +7933,8 @@ dependencies = [
|
|||||||
"tokio-util",
|
"tokio-util",
|
||||||
"toml_edit",
|
"toml_edit",
|
||||||
"tonic",
|
"tonic",
|
||||||
"tower",
|
"tower 0.4.13",
|
||||||
|
"tower 0.5.2",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-core",
|
"tracing-core",
|
||||||
"url",
|
"url",
|
||||||
|
|||||||
@@ -65,7 +65,7 @@ aws-smithy-types = "1.2"
|
|||||||
aws-credential-types = "1.2.0"
|
aws-credential-types = "1.2.0"
|
||||||
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
|
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
|
||||||
aws-types = "1.3"
|
aws-types = "1.3"
|
||||||
axum = { version = "0.7.5", features = ["ws"] }
|
axum = { version = "0.7.9", features = ["ws"] }
|
||||||
base64 = "0.13.0"
|
base64 = "0.13.0"
|
||||||
bincode = "1.3"
|
bincode = "1.3"
|
||||||
bindgen = "0.70"
|
bindgen = "0.70"
|
||||||
@@ -187,7 +187,9 @@ tokio-util = { version = "0.7.10", features = ["io", "rt"] }
|
|||||||
toml = "0.8"
|
toml = "0.8"
|
||||||
toml_edit = "0.22"
|
toml_edit = "0.22"
|
||||||
tonic = {version = "0.12.3", features = ["tls", "tls-roots"]}
|
tonic = {version = "0.12.3", features = ["tls", "tls-roots"]}
|
||||||
tower-service = "0.3.2"
|
tower = { version = "0.5.2", default-features = false }
|
||||||
|
tower-http = { version = "0.6.2", features = ["request-id", "trace"] }
|
||||||
|
tower-service = "0.3.3"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-error = "0.2"
|
tracing-error = "0.2"
|
||||||
tracing-opentelemetry = "0.27"
|
tracing-opentelemetry = "0.27"
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ aws-config.workspace = true
|
|||||||
aws-sdk-s3.workspace = true
|
aws-sdk-s3.workspace = true
|
||||||
aws-sdk-kms.workspace = true
|
aws-sdk-kms.workspace = true
|
||||||
anyhow.workspace = true
|
anyhow.workspace = true
|
||||||
|
axum = { workspace = true, features = [] }
|
||||||
camino.workspace = true
|
camino.workspace = true
|
||||||
chrono.workspace = true
|
chrono.workspace = true
|
||||||
cfg-if.workspace = true
|
cfg-if.workspace = true
|
||||||
@@ -22,7 +23,7 @@ clap.workspace = true
|
|||||||
fail.workspace = true
|
fail.workspace = true
|
||||||
flate2.workspace = true
|
flate2.workspace = true
|
||||||
futures.workspace = true
|
futures.workspace = true
|
||||||
hyper0 = { workspace = true, features = ["full"] }
|
http.workspace = true
|
||||||
metrics.workspace = true
|
metrics.workspace = true
|
||||||
nix.workspace = true
|
nix.workspace = true
|
||||||
notify.workspace = true
|
notify.workspace = true
|
||||||
@@ -37,6 +38,8 @@ serde_with.workspace = true
|
|||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
signal-hook.workspace = true
|
signal-hook.workspace = true
|
||||||
tar.workspace = true
|
tar.workspace = true
|
||||||
|
tower.workspace = true
|
||||||
|
tower-http.workspace = true
|
||||||
reqwest = { workspace = true, features = ["json"] }
|
reqwest = { workspace = true, features = ["json"] }
|
||||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||||
tokio-postgres.workspace = true
|
tokio-postgres.workspace = true
|
||||||
|
|||||||
@@ -60,7 +60,7 @@ use compute_tools::compute::{
|
|||||||
};
|
};
|
||||||
use compute_tools::configurator::launch_configurator;
|
use compute_tools::configurator::launch_configurator;
|
||||||
use compute_tools::extension_server::get_pg_version_string;
|
use compute_tools::extension_server::get_pg_version_string;
|
||||||
use compute_tools::http::api::launch_http_server;
|
use compute_tools::http::launch_http_server;
|
||||||
use compute_tools::logger::*;
|
use compute_tools::logger::*;
|
||||||
use compute_tools::monitor::launch_monitor;
|
use compute_tools::monitor::launch_monitor;
|
||||||
use compute_tools::params::*;
|
use compute_tools::params::*;
|
||||||
@@ -493,7 +493,10 @@ fn start_postgres(
|
|||||||
let mut pg = None;
|
let mut pg = None;
|
||||||
if !prestartup_failed {
|
if !prestartup_failed {
|
||||||
pg = match compute.start_compute() {
|
pg = match compute.start_compute() {
|
||||||
Ok(pg) => Some(pg),
|
Ok(pg) => {
|
||||||
|
info!(postmaster_pid = %pg.0.id(), "Postgres was started");
|
||||||
|
Some(pg)
|
||||||
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("could not start the compute node: {:#}", err);
|
error!("could not start the compute node: {:#}", err);
|
||||||
compute.set_failed_status(err);
|
compute.set_failed_status(err);
|
||||||
@@ -591,6 +594,8 @@ fn wait_postgres(pg: Option<PostgresHandle>) -> Result<WaitPostgresResult> {
|
|||||||
// propagate to Postgres and it will be shut down as well.
|
// propagate to Postgres and it will be shut down as well.
|
||||||
let mut exit_code = None;
|
let mut exit_code = None;
|
||||||
if let Some((mut pg, logs_handle)) = pg {
|
if let Some((mut pg, logs_handle)) = pg {
|
||||||
|
info!(postmaster_pid = %pg.id(), "Waiting for Postgres to exit");
|
||||||
|
|
||||||
let ecode = pg
|
let ecode = pg
|
||||||
.wait()
|
.wait()
|
||||||
.expect("failed to start waiting on Postgres process");
|
.expect("failed to start waiting on Postgres process");
|
||||||
|
|||||||
@@ -36,11 +36,11 @@ pub async fn get_dbs_and_roles(compute: &Arc<ComputeNode>) -> anyhow::Result<Cat
|
|||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum SchemaDumpError {
|
pub enum SchemaDumpError {
|
||||||
#[error("Database does not exist.")]
|
#[error("database does not exist")]
|
||||||
DatabaseDoesNotExist,
|
DatabaseDoesNotExist,
|
||||||
#[error("Failed to execute pg_dump.")]
|
#[error("failed to execute pg_dump")]
|
||||||
IO(#[from] std::io::Error),
|
IO(#[from] std::io::Error),
|
||||||
#[error("Unexpected error.")]
|
#[error("unexpected I/O error")]
|
||||||
Unexpected,
|
Unexpected,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,606 +0,0 @@
|
|||||||
use std::convert::Infallible;
|
|
||||||
use std::net::IpAddr;
|
|
||||||
use std::net::Ipv6Addr;
|
|
||||||
use std::net::SocketAddr;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use std::thread;
|
|
||||||
|
|
||||||
use crate::catalog::SchemaDumpError;
|
|
||||||
use crate::catalog::{get_database_schema, get_dbs_and_roles};
|
|
||||||
use crate::compute::forward_termination_signal;
|
|
||||||
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
|
|
||||||
use crate::installed_extensions;
|
|
||||||
use compute_api::requests::{ConfigurationRequest, ExtensionInstallRequest, SetRoleGrantsRequest};
|
|
||||||
use compute_api::responses::{
|
|
||||||
ComputeStatus, ComputeStatusResponse, ExtensionInstallResult, GenericAPIError,
|
|
||||||
SetRoleGrantsResponse,
|
|
||||||
};
|
|
||||||
|
|
||||||
use anyhow::Result;
|
|
||||||
use hyper::header::CONTENT_TYPE;
|
|
||||||
use hyper::service::{make_service_fn, service_fn};
|
|
||||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
|
||||||
use metrics::proto::MetricFamily;
|
|
||||||
use metrics::Encoder;
|
|
||||||
use metrics::TextEncoder;
|
|
||||||
use tokio::task;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
use tracing::{debug, error, info, warn};
|
|
||||||
use tracing_utils::http::OtelName;
|
|
||||||
use utils::failpoint_support::failpoints_handler;
|
|
||||||
use utils::http::error::ApiError;
|
|
||||||
use utils::http::request::must_get_query_param;
|
|
||||||
|
|
||||||
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
|
|
||||||
ComputeStatusResponse {
|
|
||||||
start_time: state.start_time,
|
|
||||||
tenant: state
|
|
||||||
.pspec
|
|
||||||
.as_ref()
|
|
||||||
.map(|pspec| pspec.tenant_id.to_string()),
|
|
||||||
timeline: state
|
|
||||||
.pspec
|
|
||||||
.as_ref()
|
|
||||||
.map(|pspec| pspec.timeline_id.to_string()),
|
|
||||||
status: state.status,
|
|
||||||
last_active: state.last_active,
|
|
||||||
error: state.error.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Service function to handle all available routes.
|
|
||||||
async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
|
|
||||||
//
|
|
||||||
// NOTE: The URI path is currently included in traces. That's OK because
|
|
||||||
// it doesn't contain any variable parts or sensitive information. But
|
|
||||||
// please keep that in mind if you change the routing here.
|
|
||||||
//
|
|
||||||
match (req.method(), req.uri().path()) {
|
|
||||||
// Serialized compute state.
|
|
||||||
(&Method::GET, "/status") => {
|
|
||||||
debug!("serving /status GET request");
|
|
||||||
let state = compute.state.lock().unwrap();
|
|
||||||
let status_response = status_response_from_state(&state);
|
|
||||||
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Startup metrics in JSON format. Keep /metrics reserved for a possible
|
|
||||||
// future use for Prometheus metrics format.
|
|
||||||
(&Method::GET, "/metrics.json") => {
|
|
||||||
info!("serving /metrics.json GET request");
|
|
||||||
let metrics = compute.state.lock().unwrap().metrics.clone();
|
|
||||||
Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Prometheus metrics
|
|
||||||
(&Method::GET, "/metrics") => {
|
|
||||||
debug!("serving /metrics GET request");
|
|
||||||
|
|
||||||
// When we call TextEncoder::encode() below, it will immediately
|
|
||||||
// return an error if a metric family has no metrics, so we need to
|
|
||||||
// preemptively filter out metric families with no metrics.
|
|
||||||
let metrics = installed_extensions::collect()
|
|
||||||
.into_iter()
|
|
||||||
.filter(|m| !m.get_metric().is_empty())
|
|
||||||
.collect::<Vec<MetricFamily>>();
|
|
||||||
|
|
||||||
let encoder = TextEncoder::new();
|
|
||||||
let mut buffer = vec![];
|
|
||||||
|
|
||||||
if let Err(err) = encoder.encode(&metrics, &mut buffer) {
|
|
||||||
let msg = format!("error handling /metrics request: {err}");
|
|
||||||
error!(msg);
|
|
||||||
return render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
match Response::builder()
|
|
||||||
.status(StatusCode::OK)
|
|
||||||
.header(CONTENT_TYPE, encoder.format_type())
|
|
||||||
.body(Body::from(buffer))
|
|
||||||
{
|
|
||||||
Ok(response) => response,
|
|
||||||
Err(err) => {
|
|
||||||
let msg = format!("error handling /metrics request: {err}");
|
|
||||||
error!(msg);
|
|
||||||
render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Collect Postgres current usage insights
|
|
||||||
(&Method::GET, "/insights") => {
|
|
||||||
info!("serving /insights GET request");
|
|
||||||
let status = compute.get_status();
|
|
||||||
if status != ComputeStatus::Running {
|
|
||||||
let msg = format!("compute is not running, current status: {:?}", status);
|
|
||||||
error!(msg);
|
|
||||||
return Response::new(Body::from(msg));
|
|
||||||
}
|
|
||||||
|
|
||||||
let insights = compute.collect_insights().await;
|
|
||||||
Response::new(Body::from(insights))
|
|
||||||
}
|
|
||||||
|
|
||||||
(&Method::POST, "/check_writability") => {
|
|
||||||
info!("serving /check_writability POST request");
|
|
||||||
let status = compute.get_status();
|
|
||||||
if status != ComputeStatus::Running {
|
|
||||||
let msg = format!(
|
|
||||||
"invalid compute status for check_writability request: {:?}",
|
|
||||||
status
|
|
||||||
);
|
|
||||||
error!(msg);
|
|
||||||
return Response::new(Body::from(msg));
|
|
||||||
}
|
|
||||||
|
|
||||||
let res = crate::checker::check_writability(compute).await;
|
|
||||||
match res {
|
|
||||||
Ok(_) => Response::new(Body::from("true")),
|
|
||||||
Err(e) => {
|
|
||||||
error!("check_writability failed: {}", e);
|
|
||||||
Response::new(Body::from(e.to_string()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(&Method::POST, "/extensions") => {
|
|
||||||
info!("serving /extensions POST request");
|
|
||||||
let status = compute.get_status();
|
|
||||||
if status != ComputeStatus::Running {
|
|
||||||
let msg = format!(
|
|
||||||
"invalid compute status for extensions request: {:?}",
|
|
||||||
status
|
|
||||||
);
|
|
||||||
error!(msg);
|
|
||||||
return render_json_error(&msg, StatusCode::PRECONDITION_FAILED);
|
|
||||||
}
|
|
||||||
|
|
||||||
let request = hyper::body::to_bytes(req.into_body()).await.unwrap();
|
|
||||||
let request = serde_json::from_slice::<ExtensionInstallRequest>(&request).unwrap();
|
|
||||||
let res = compute
|
|
||||||
.install_extension(&request.extension, &request.database, request.version)
|
|
||||||
.await;
|
|
||||||
match res {
|
|
||||||
Ok(version) => render_json(Body::from(
|
|
||||||
serde_json::to_string(&ExtensionInstallResult {
|
|
||||||
extension: request.extension,
|
|
||||||
version,
|
|
||||||
})
|
|
||||||
.unwrap(),
|
|
||||||
)),
|
|
||||||
Err(e) => {
|
|
||||||
error!("install_extension failed: {}", e);
|
|
||||||
render_json_error(&e.to_string(), StatusCode::INTERNAL_SERVER_ERROR)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(&Method::GET, "/info") => {
|
|
||||||
let num_cpus = num_cpus::get_physical();
|
|
||||||
info!("serving /info GET request. num_cpus: {}", num_cpus);
|
|
||||||
Response::new(Body::from(
|
|
||||||
serde_json::json!({
|
|
||||||
"num_cpus": num_cpus,
|
|
||||||
})
|
|
||||||
.to_string(),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Accept spec in JSON format and request compute configuration. If
|
|
||||||
// anything goes wrong after we set the compute status to `ConfigurationPending`
|
|
||||||
// and update compute state with new spec, we basically leave compute
|
|
||||||
// in the potentially wrong state. That said, it's control-plane's
|
|
||||||
// responsibility to watch compute state after reconfiguration request
|
|
||||||
// and to clean restart in case of errors.
|
|
||||||
(&Method::POST, "/configure") => {
|
|
||||||
info!("serving /configure POST request");
|
|
||||||
match handle_configure_request(req, compute).await {
|
|
||||||
Ok(msg) => Response::new(Body::from(msg)),
|
|
||||||
Err((msg, code)) => {
|
|
||||||
error!("error handling /configure request: {msg}");
|
|
||||||
render_json_error(&msg, code)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(&Method::POST, "/terminate") => {
|
|
||||||
info!("serving /terminate POST request");
|
|
||||||
match handle_terminate_request(compute).await {
|
|
||||||
Ok(()) => Response::new(Body::empty()),
|
|
||||||
Err((msg, code)) => {
|
|
||||||
error!("error handling /terminate request: {msg}");
|
|
||||||
render_json_error(&msg, code)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(&Method::GET, "/dbs_and_roles") => {
|
|
||||||
info!("serving /dbs_and_roles GET request",);
|
|
||||||
match get_dbs_and_roles(compute).await {
|
|
||||||
Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
|
|
||||||
Err(_) => {
|
|
||||||
render_json_error("can't get dbs and roles", StatusCode::INTERNAL_SERVER_ERROR)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(&Method::GET, "/database_schema") => {
|
|
||||||
let database = match must_get_query_param(&req, "database") {
|
|
||||||
Err(e) => return e.into_response(),
|
|
||||||
Ok(database) => database,
|
|
||||||
};
|
|
||||||
info!("serving /database_schema GET request with database: {database}",);
|
|
||||||
match get_database_schema(compute, &database).await {
|
|
||||||
Ok(res) => render_plain(Body::wrap_stream(res)),
|
|
||||||
Err(SchemaDumpError::DatabaseDoesNotExist) => {
|
|
||||||
render_json_error("database does not exist", StatusCode::NOT_FOUND)
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("can't get schema dump: {}", e);
|
|
||||||
render_json_error("can't get schema dump", StatusCode::INTERNAL_SERVER_ERROR)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(&Method::POST, "/grants") => {
|
|
||||||
info!("serving /grants POST request");
|
|
||||||
let status = compute.get_status();
|
|
||||||
if status != ComputeStatus::Running {
|
|
||||||
let msg = format!(
|
|
||||||
"invalid compute status for set_role_grants request: {:?}",
|
|
||||||
status
|
|
||||||
);
|
|
||||||
error!(msg);
|
|
||||||
return render_json_error(&msg, StatusCode::PRECONDITION_FAILED);
|
|
||||||
}
|
|
||||||
|
|
||||||
let request = hyper::body::to_bytes(req.into_body()).await.unwrap();
|
|
||||||
let request = serde_json::from_slice::<SetRoleGrantsRequest>(&request).unwrap();
|
|
||||||
|
|
||||||
let res = compute
|
|
||||||
.set_role_grants(
|
|
||||||
&request.database,
|
|
||||||
&request.schema,
|
|
||||||
&request.privileges,
|
|
||||||
&request.role,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
match res {
|
|
||||||
Ok(()) => render_json(Body::from(
|
|
||||||
serde_json::to_string(&SetRoleGrantsResponse {
|
|
||||||
database: request.database,
|
|
||||||
schema: request.schema,
|
|
||||||
role: request.role,
|
|
||||||
privileges: request.privileges,
|
|
||||||
})
|
|
||||||
.unwrap(),
|
|
||||||
)),
|
|
||||||
Err(e) => render_json_error(
|
|
||||||
&format!("could not grant role privileges to the schema: {e}"),
|
|
||||||
// TODO: can we filter on role/schema not found errors
|
|
||||||
// and return appropriate error code?
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// get the list of installed extensions
|
|
||||||
// currently only used in python tests
|
|
||||||
// TODO: call it from cplane
|
|
||||||
(&Method::GET, "/installed_extensions") => {
|
|
||||||
info!("serving /installed_extensions GET request");
|
|
||||||
let status = compute.get_status();
|
|
||||||
if status != ComputeStatus::Running {
|
|
||||||
let msg = format!(
|
|
||||||
"invalid compute status for extensions request: {:?}",
|
|
||||||
status
|
|
||||||
);
|
|
||||||
error!(msg);
|
|
||||||
return Response::new(Body::from(msg));
|
|
||||||
}
|
|
||||||
|
|
||||||
let conf = compute.get_conn_conf(None);
|
|
||||||
let res =
|
|
||||||
task::spawn_blocking(move || installed_extensions::get_installed_extensions(conf))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
match res {
|
|
||||||
Ok(res) => render_json(Body::from(serde_json::to_string(&res).unwrap())),
|
|
||||||
Err(e) => render_json_error(
|
|
||||||
&format!("could not get list of installed extensions: {}", e),
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(&Method::POST, "/failpoints") if cfg!(feature = "testing") => {
|
|
||||||
match failpoints_handler(req, CancellationToken::new()).await {
|
|
||||||
Ok(r) => r,
|
|
||||||
Err(ApiError::BadRequest(e)) => {
|
|
||||||
render_json_error(&e.to_string(), StatusCode::BAD_REQUEST)
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
render_json_error("Internal server error", StatusCode::INTERNAL_SERVER_ERROR)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// download extension files from remote extension storage on demand
|
|
||||||
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
|
||||||
info!("serving {:?} POST request", route);
|
|
||||||
info!("req.uri {:?}", req.uri());
|
|
||||||
|
|
||||||
// don't even try to download extensions
|
|
||||||
// if no remote storage is configured
|
|
||||||
if compute.ext_remote_storage.is_none() {
|
|
||||||
info!("no extensions remote storage configured");
|
|
||||||
let mut resp = Response::new(Body::from("no remote storage configured"));
|
|
||||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
|
||||||
return resp;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut is_library = false;
|
|
||||||
if let Some(params) = req.uri().query() {
|
|
||||||
info!("serving {:?} POST request with params: {}", route, params);
|
|
||||||
if params == "is_library=true" {
|
|
||||||
is_library = true;
|
|
||||||
} else {
|
|
||||||
let mut resp = Response::new(Body::from("Wrong request parameters"));
|
|
||||||
*resp.status_mut() = StatusCode::BAD_REQUEST;
|
|
||||||
return resp;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let filename = route.split('/').last().unwrap().to_string();
|
|
||||||
info!("serving /extension_server POST request, filename: {filename:?} is_library: {is_library}");
|
|
||||||
|
|
||||||
// get ext_name and path from spec
|
|
||||||
// don't lock compute_state for too long
|
|
||||||
let ext = {
|
|
||||||
let compute_state = compute.state.lock().unwrap();
|
|
||||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
|
||||||
let spec = &pspec.spec;
|
|
||||||
|
|
||||||
// debug only
|
|
||||||
info!("spec: {:?}", spec);
|
|
||||||
|
|
||||||
let remote_extensions = match spec.remote_extensions.as_ref() {
|
|
||||||
Some(r) => r,
|
|
||||||
None => {
|
|
||||||
info!("no remote extensions spec was provided");
|
|
||||||
let mut resp = Response::new(Body::from("no remote storage configured"));
|
|
||||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
|
||||||
return resp;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
remote_extensions.get_ext(
|
|
||||||
&filename,
|
|
||||||
is_library,
|
|
||||||
&compute.build_tag,
|
|
||||||
&compute.pgversion,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
match ext {
|
|
||||||
Ok((ext_name, ext_path)) => {
|
|
||||||
match compute.download_extension(ext_name, ext_path).await {
|
|
||||||
Ok(_) => Response::new(Body::from("OK")),
|
|
||||||
Err(e) => {
|
|
||||||
error!("extension download failed: {}", e);
|
|
||||||
let mut resp = Response::new(Body::from(e.to_string()));
|
|
||||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
|
||||||
resp
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("extension download failed to find extension: {}", e);
|
|
||||||
let mut resp = Response::new(Body::from("failed to find file"));
|
|
||||||
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
|
||||||
resp
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return the `404 Not Found` for any other routes.
|
|
||||||
_ => {
|
|
||||||
let mut not_found = Response::new(Body::from("404 Not Found"));
|
|
||||||
*not_found.status_mut() = StatusCode::NOT_FOUND;
|
|
||||||
not_found
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_configure_request(
|
|
||||||
req: Request<Body>,
|
|
||||||
compute: &Arc<ComputeNode>,
|
|
||||||
) -> Result<String, (String, StatusCode)> {
|
|
||||||
if !compute.live_config_allowed {
|
|
||||||
return Err((
|
|
||||||
"live configuration is not allowed for this compute node".to_string(),
|
|
||||||
StatusCode::PRECONDITION_FAILED,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
|
|
||||||
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
|
|
||||||
if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
|
|
||||||
let spec = request.spec;
|
|
||||||
|
|
||||||
let parsed_spec = match ParsedSpec::try_from(spec) {
|
|
||||||
Ok(ps) => ps,
|
|
||||||
Err(msg) => return Err((msg, StatusCode::BAD_REQUEST)),
|
|
||||||
};
|
|
||||||
|
|
||||||
// XXX: wrap state update under lock in code blocks. Otherwise,
|
|
||||||
// we will try to `Send` `mut state` into the spawned thread
|
|
||||||
// bellow, which will cause error:
|
|
||||||
// ```
|
|
||||||
// error: future cannot be sent between threads safely
|
|
||||||
// ```
|
|
||||||
{
|
|
||||||
let mut state = compute.state.lock().unwrap();
|
|
||||||
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
|
|
||||||
let msg = format!(
|
|
||||||
"invalid compute status for configuration request: {:?}",
|
|
||||||
state.status.clone()
|
|
||||||
);
|
|
||||||
return Err((msg, StatusCode::PRECONDITION_FAILED));
|
|
||||||
}
|
|
||||||
state.pspec = Some(parsed_spec);
|
|
||||||
state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
|
|
||||||
drop(state);
|
|
||||||
info!("set new spec and notified waiters");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Spawn a blocking thread to wait for compute to become Running.
|
|
||||||
// This is needed to do not block the main pool of workers and
|
|
||||||
// be able to serve other requests while some particular request
|
|
||||||
// is waiting for compute to finish configuration.
|
|
||||||
let c = compute.clone();
|
|
||||||
task::spawn_blocking(move || {
|
|
||||||
let mut state = c.state.lock().unwrap();
|
|
||||||
while state.status != ComputeStatus::Running {
|
|
||||||
state = c.state_changed.wait(state).unwrap();
|
|
||||||
info!(
|
|
||||||
"waiting for compute to become Running, current status: {:?}",
|
|
||||||
state.status
|
|
||||||
);
|
|
||||||
|
|
||||||
if state.status == ComputeStatus::Failed {
|
|
||||||
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
|
||||||
let msg = format!("compute configuration failed: {:?}", err);
|
|
||||||
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap()?;
|
|
||||||
|
|
||||||
// Return current compute state if everything went well.
|
|
||||||
let state = compute.state.lock().unwrap().clone();
|
|
||||||
let status_response = status_response_from_state(&state);
|
|
||||||
Ok(serde_json::to_string(&status_response).unwrap())
|
|
||||||
} else {
|
|
||||||
Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
|
|
||||||
let error = GenericAPIError {
|
|
||||||
error: e.to_string(),
|
|
||||||
};
|
|
||||||
Response::builder()
|
|
||||||
.status(status)
|
|
||||||
.header(CONTENT_TYPE, "application/json")
|
|
||||||
.body(Body::from(serde_json::to_string(&error).unwrap()))
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn render_json(body: Body) -> Response<Body> {
|
|
||||||
Response::builder()
|
|
||||||
.header(CONTENT_TYPE, "application/json")
|
|
||||||
.body(body)
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn render_plain(body: Body) -> Response<Body> {
|
|
||||||
Response::builder()
|
|
||||||
.header(CONTENT_TYPE, "text/plain")
|
|
||||||
.body(body)
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_terminate_request(compute: &Arc<ComputeNode>) -> Result<(), (String, StatusCode)> {
|
|
||||||
{
|
|
||||||
let mut state = compute.state.lock().unwrap();
|
|
||||||
if state.status == ComputeStatus::Terminated {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
|
|
||||||
let msg = format!(
|
|
||||||
"invalid compute status for termination request: {}",
|
|
||||||
state.status
|
|
||||||
);
|
|
||||||
return Err((msg, StatusCode::PRECONDITION_FAILED));
|
|
||||||
}
|
|
||||||
state.set_status(ComputeStatus::TerminationPending, &compute.state_changed);
|
|
||||||
drop(state);
|
|
||||||
}
|
|
||||||
|
|
||||||
forward_termination_signal();
|
|
||||||
info!("sent signal and notified waiters");
|
|
||||||
|
|
||||||
// Spawn a blocking thread to wait for compute to become Terminated.
|
|
||||||
// This is needed to do not block the main pool of workers and
|
|
||||||
// be able to serve other requests while some particular request
|
|
||||||
// is waiting for compute to finish configuration.
|
|
||||||
let c = compute.clone();
|
|
||||||
task::spawn_blocking(move || {
|
|
||||||
let mut state = c.state.lock().unwrap();
|
|
||||||
while state.status != ComputeStatus::Terminated {
|
|
||||||
state = c.state_changed.wait(state).unwrap();
|
|
||||||
info!(
|
|
||||||
"waiting for compute to become {}, current status: {:?}",
|
|
||||||
ComputeStatus::Terminated,
|
|
||||||
state.status
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap()?;
|
|
||||||
info!("terminated Postgres");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
|
|
||||||
#[tokio::main]
|
|
||||||
async fn serve(port: u16, state: Arc<ComputeNode>) {
|
|
||||||
// this usually binds to both IPv4 and IPv6 on linux
|
|
||||||
// see e.g. https://github.com/rust-lang/rust/pull/34440
|
|
||||||
let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port);
|
|
||||||
|
|
||||||
let make_service = make_service_fn(move |_conn| {
|
|
||||||
let state = state.clone();
|
|
||||||
async move {
|
|
||||||
Ok::<_, Infallible>(service_fn(move |req: Request<Body>| {
|
|
||||||
let state = state.clone();
|
|
||||||
async move {
|
|
||||||
Ok::<_, Infallible>(
|
|
||||||
// NOTE: We include the URI path in the string. It
|
|
||||||
// doesn't contain any variable parts or sensitive
|
|
||||||
// information in this API.
|
|
||||||
tracing_utils::http::tracing_handler(
|
|
||||||
req,
|
|
||||||
|req| routes(req, &state),
|
|
||||||
OtelName::UriPath,
|
|
||||||
)
|
|
||||||
.await,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
info!("starting HTTP server on {}", addr);
|
|
||||||
|
|
||||||
let server = Server::bind(&addr).serve(make_service);
|
|
||||||
|
|
||||||
// Run this server forever
|
|
||||||
if let Err(e) = server.await {
|
|
||||||
error!("server error: {}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`.
|
|
||||||
pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
|
||||||
let state = Arc::clone(state);
|
|
||||||
|
|
||||||
Ok(thread::Builder::new()
|
|
||||||
.name("http-endpoint".into())
|
|
||||||
.spawn(move || serve(port, state))?)
|
|
||||||
}
|
|
||||||
48
compute_tools/src/http/extract/json.rs
Normal file
48
compute_tools/src/http/extract/json.rs
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
use std::ops::{Deref, DerefMut};
|
||||||
|
|
||||||
|
use axum::{
|
||||||
|
async_trait,
|
||||||
|
extract::{rejection::JsonRejection, FromRequest, Request},
|
||||||
|
};
|
||||||
|
use compute_api::responses::GenericAPIError;
|
||||||
|
use http::StatusCode;
|
||||||
|
|
||||||
|
/// Custom `Json` extractor, so that we can format errors into
|
||||||
|
/// `JsonResponse<GenericAPIError>`.
|
||||||
|
#[derive(Debug, Clone, Copy, Default)]
|
||||||
|
pub(crate) struct Json<T>(pub T);
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<S, T> FromRequest<S> for Json<T>
|
||||||
|
where
|
||||||
|
axum::Json<T>: FromRequest<S, Rejection = JsonRejection>,
|
||||||
|
S: Send + Sync,
|
||||||
|
{
|
||||||
|
type Rejection = (StatusCode, axum::Json<GenericAPIError>);
|
||||||
|
|
||||||
|
async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
|
||||||
|
match axum::Json::<T>::from_request(req, state).await {
|
||||||
|
Ok(value) => Ok(Self(value.0)),
|
||||||
|
Err(rejection) => Err((
|
||||||
|
rejection.status(),
|
||||||
|
axum::Json(GenericAPIError {
|
||||||
|
error: rejection.body_text().to_lowercase(),
|
||||||
|
}),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Deref for Json<T> {
|
||||||
|
type Target = T;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> DerefMut for Json<T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
7
compute_tools/src/http/extract/mod.rs
Normal file
7
compute_tools/src/http/extract/mod.rs
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
pub(crate) mod json;
|
||||||
|
pub(crate) mod path;
|
||||||
|
pub(crate) mod query;
|
||||||
|
|
||||||
|
pub(crate) use json::Json;
|
||||||
|
pub(crate) use path::Path;
|
||||||
|
pub(crate) use query::Query;
|
||||||
48
compute_tools/src/http/extract/path.rs
Normal file
48
compute_tools/src/http/extract/path.rs
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
use std::ops::{Deref, DerefMut};
|
||||||
|
|
||||||
|
use axum::{
|
||||||
|
async_trait,
|
||||||
|
extract::{rejection::PathRejection, FromRequestParts},
|
||||||
|
};
|
||||||
|
use compute_api::responses::GenericAPIError;
|
||||||
|
use http::{request::Parts, StatusCode};
|
||||||
|
|
||||||
|
/// Custom `Path` extractor, so that we can format errors into
|
||||||
|
/// `JsonResponse<GenericAPIError>`.
|
||||||
|
#[derive(Debug, Clone, Copy, Default)]
|
||||||
|
pub(crate) struct Path<T>(pub T);
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<S, T> FromRequestParts<S> for Path<T>
|
||||||
|
where
|
||||||
|
axum::extract::Path<T>: FromRequestParts<S, Rejection = PathRejection>,
|
||||||
|
S: Send + Sync,
|
||||||
|
{
|
||||||
|
type Rejection = (StatusCode, axum::Json<GenericAPIError>);
|
||||||
|
|
||||||
|
async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
|
||||||
|
match axum::extract::Path::<T>::from_request_parts(parts, state).await {
|
||||||
|
Ok(value) => Ok(Self(value.0)),
|
||||||
|
Err(rejection) => Err((
|
||||||
|
rejection.status(),
|
||||||
|
axum::Json(GenericAPIError {
|
||||||
|
error: rejection.body_text().to_ascii_lowercase(),
|
||||||
|
}),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Deref for Path<T> {
|
||||||
|
type Target = T;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> DerefMut for Path<T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
48
compute_tools/src/http/extract/query.rs
Normal file
48
compute_tools/src/http/extract/query.rs
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
use std::ops::{Deref, DerefMut};
|
||||||
|
|
||||||
|
use axum::{
|
||||||
|
async_trait,
|
||||||
|
extract::{rejection::QueryRejection, FromRequestParts},
|
||||||
|
};
|
||||||
|
use compute_api::responses::GenericAPIError;
|
||||||
|
use http::{request::Parts, StatusCode};
|
||||||
|
|
||||||
|
/// Custom `Query` extractor, so that we can format errors into
|
||||||
|
/// `JsonResponse<GenericAPIError>`.
|
||||||
|
#[derive(Debug, Clone, Copy, Default)]
|
||||||
|
pub(crate) struct Query<T>(pub T);
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<S, T> FromRequestParts<S> for Query<T>
|
||||||
|
where
|
||||||
|
axum::extract::Query<T>: FromRequestParts<S, Rejection = QueryRejection>,
|
||||||
|
S: Send + Sync,
|
||||||
|
{
|
||||||
|
type Rejection = (StatusCode, axum::Json<GenericAPIError>);
|
||||||
|
|
||||||
|
async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
|
||||||
|
match axum::extract::Query::<T>::from_request_parts(parts, state).await {
|
||||||
|
Ok(value) => Ok(Self(value.0)),
|
||||||
|
Err(rejection) => Err((
|
||||||
|
rejection.status(),
|
||||||
|
axum::Json(GenericAPIError {
|
||||||
|
error: rejection.body_text().to_ascii_lowercase(),
|
||||||
|
}),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Deref for Query<T> {
|
||||||
|
type Target = T;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> DerefMut for Query<T> {
|
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||||
|
&mut self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1 +1,56 @@
|
|||||||
pub mod api;
|
use axum::{body::Body, response::Response};
|
||||||
|
use compute_api::responses::{ComputeStatus, GenericAPIError};
|
||||||
|
use http::{header::CONTENT_TYPE, StatusCode};
|
||||||
|
use serde::Serialize;
|
||||||
|
use tracing::error;
|
||||||
|
|
||||||
|
pub use server::launch_http_server;
|
||||||
|
|
||||||
|
mod extract;
|
||||||
|
mod routes;
|
||||||
|
mod server;
|
||||||
|
|
||||||
|
/// Convenience response builder for JSON responses
|
||||||
|
struct JsonResponse;
|
||||||
|
|
||||||
|
impl JsonResponse {
|
||||||
|
/// Helper for actually creating a response
|
||||||
|
fn create_response(code: StatusCode, body: impl Serialize) -> Response {
|
||||||
|
Response::builder()
|
||||||
|
.status(code)
|
||||||
|
.header(CONTENT_TYPE.as_str(), "application/json")
|
||||||
|
.body(Body::from(serde_json::to_string(&body).unwrap()))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a successful error response
|
||||||
|
pub(self) fn success(code: StatusCode, body: impl Serialize) -> Response {
|
||||||
|
assert!({
|
||||||
|
let code = code.as_u16();
|
||||||
|
|
||||||
|
(200..300).contains(&code)
|
||||||
|
});
|
||||||
|
|
||||||
|
Self::create_response(code, body)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create an error response
|
||||||
|
pub(self) fn error(code: StatusCode, error: impl ToString) -> Response {
|
||||||
|
assert!(code.as_u16() >= 400);
|
||||||
|
|
||||||
|
let message = error.to_string();
|
||||||
|
error!(message);
|
||||||
|
|
||||||
|
Self::create_response(code, &GenericAPIError { error: message })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create an error response related to the compute being in an invalid state
|
||||||
|
pub(self) fn invalid_status(status: ComputeStatus) -> Response {
|
||||||
|
Self::create_response(
|
||||||
|
StatusCode::PRECONDITION_FAILED,
|
||||||
|
&GenericAPIError {
|
||||||
|
error: format!("invalid compute status: {status}"),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ paths:
|
|||||||
schema:
|
schema:
|
||||||
$ref: "#/components/schemas/ComputeMetrics"
|
$ref: "#/components/schemas/ComputeMetrics"
|
||||||
|
|
||||||
/metrics
|
/metrics:
|
||||||
get:
|
get:
|
||||||
tags:
|
tags:
|
||||||
- Info
|
- Info
|
||||||
|
|||||||
20
compute_tools/src/http/routes/check_writability.rs
Normal file
20
compute_tools/src/http/routes/check_writability.rs
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::{extract::State, response::Response};
|
||||||
|
use compute_api::responses::ComputeStatus;
|
||||||
|
use http::StatusCode;
|
||||||
|
|
||||||
|
use crate::{checker::check_writability, compute::ComputeNode, http::JsonResponse};
|
||||||
|
|
||||||
|
/// Check that the compute is currently running.
|
||||||
|
pub(in crate::http) async fn is_writable(State(compute): State<Arc<ComputeNode>>) -> Response {
|
||||||
|
let status = compute.get_status();
|
||||||
|
if status != ComputeStatus::Running {
|
||||||
|
return JsonResponse::invalid_status(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
match check_writability(&compute).await {
|
||||||
|
Ok(_) => JsonResponse::success(StatusCode::OK, true),
|
||||||
|
Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e),
|
||||||
|
}
|
||||||
|
}
|
||||||
91
compute_tools/src/http/routes/configure.rs
Normal file
91
compute_tools/src/http/routes/configure.rs
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::{extract::State, response::Response};
|
||||||
|
use compute_api::{
|
||||||
|
requests::ConfigurationRequest,
|
||||||
|
responses::{ComputeStatus, ComputeStatusResponse},
|
||||||
|
};
|
||||||
|
use http::StatusCode;
|
||||||
|
use tokio::task;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
compute::{ComputeNode, ParsedSpec},
|
||||||
|
http::{extract::Json, JsonResponse},
|
||||||
|
};
|
||||||
|
|
||||||
|
// Accept spec in JSON format and request compute configuration. If anything
|
||||||
|
// goes wrong after we set the compute status to `ConfigurationPending` and
|
||||||
|
// update compute state with new spec, we basically leave compute in the
|
||||||
|
// potentially wrong state. That said, it's control-plane's responsibility to
|
||||||
|
// watch compute state after reconfiguration request and to clean restart in
|
||||||
|
// case of errors.
|
||||||
|
pub(in crate::http) async fn configure(
|
||||||
|
State(compute): State<Arc<ComputeNode>>,
|
||||||
|
request: Json<ConfigurationRequest>,
|
||||||
|
) -> Response {
|
||||||
|
if !compute.live_config_allowed {
|
||||||
|
return JsonResponse::error(
|
||||||
|
StatusCode::PRECONDITION_FAILED,
|
||||||
|
"live configuration is not allowed for this compute node".to_string(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let pspec = match ParsedSpec::try_from(request.spec.clone()) {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
|
||||||
|
};
|
||||||
|
|
||||||
|
// XXX: wrap state update under lock in a code block. Otherwise, we will try
|
||||||
|
// to `Send` `mut state` into the spawned thread bellow, which will cause
|
||||||
|
// the following rustc error:
|
||||||
|
//
|
||||||
|
// error: future cannot be sent between threads safely
|
||||||
|
{
|
||||||
|
let mut state = compute.state.lock().unwrap();
|
||||||
|
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
|
||||||
|
return JsonResponse::invalid_status(state.status);
|
||||||
|
}
|
||||||
|
|
||||||
|
state.pspec = Some(pspec);
|
||||||
|
state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed);
|
||||||
|
drop(state);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawn a blocking thread to wait for compute to become Running. This is
|
||||||
|
// needed to do not block the main pool of workers and be able to serve
|
||||||
|
// other requests while some particular request is waiting for compute to
|
||||||
|
// finish configuration.
|
||||||
|
let c = compute.clone();
|
||||||
|
let completed = task::spawn_blocking(move || {
|
||||||
|
let mut state = c.state.lock().unwrap();
|
||||||
|
while state.status != ComputeStatus::Running {
|
||||||
|
state = c.state_changed.wait(state).unwrap();
|
||||||
|
info!(
|
||||||
|
"waiting for compute to become {}, current status: {}",
|
||||||
|
ComputeStatus::Running,
|
||||||
|
state.status
|
||||||
|
);
|
||||||
|
|
||||||
|
if state.status == ComputeStatus::Failed {
|
||||||
|
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
||||||
|
let msg = format!("compute configuration failed: {:?}", err);
|
||||||
|
return Err(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
if let Err(e) = completed {
|
||||||
|
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return current compute state if everything went well.
|
||||||
|
let state = compute.state.lock().unwrap().clone();
|
||||||
|
let body = ComputeStatusResponse::from(&state);
|
||||||
|
|
||||||
|
JsonResponse::success(StatusCode::OK, body)
|
||||||
|
}
|
||||||
34
compute_tools/src/http/routes/database_schema.rs
Normal file
34
compute_tools/src/http/routes/database_schema.rs
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::{body::Body, extract::State, response::Response};
|
||||||
|
use http::{header::CONTENT_TYPE, StatusCode};
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
catalog::{get_database_schema, SchemaDumpError},
|
||||||
|
compute::ComputeNode,
|
||||||
|
http::{extract::Query, JsonResponse},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
pub(in crate::http) struct DatabaseSchemaParams {
|
||||||
|
database: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a schema dump of the requested database.
|
||||||
|
pub(in crate::http) async fn get_schema_dump(
|
||||||
|
params: Query<DatabaseSchemaParams>,
|
||||||
|
State(compute): State<Arc<ComputeNode>>,
|
||||||
|
) -> Response {
|
||||||
|
match get_database_schema(&compute, ¶ms.database).await {
|
||||||
|
Ok(schema) => Response::builder()
|
||||||
|
.status(StatusCode::OK)
|
||||||
|
.header(CONTENT_TYPE.as_str(), "application/json")
|
||||||
|
.body(Body::from_stream(schema))
|
||||||
|
.unwrap(),
|
||||||
|
Err(SchemaDumpError::DatabaseDoesNotExist) => {
|
||||||
|
JsonResponse::error(StatusCode::NOT_FOUND, SchemaDumpError::DatabaseDoesNotExist)
|
||||||
|
}
|
||||||
|
Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e),
|
||||||
|
}
|
||||||
|
}
|
||||||
16
compute_tools/src/http/routes/dbs_and_roles.rs
Normal file
16
compute_tools/src/http/routes/dbs_and_roles.rs
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::{extract::State, response::Response};
|
||||||
|
use http::StatusCode;
|
||||||
|
|
||||||
|
use crate::{catalog::get_dbs_and_roles, compute::ComputeNode, http::JsonResponse};
|
||||||
|
|
||||||
|
/// Get the databases and roles from the compute.
|
||||||
|
pub(in crate::http) async fn get_catalog_objects(
|
||||||
|
State(compute): State<Arc<ComputeNode>>,
|
||||||
|
) -> Response {
|
||||||
|
match get_dbs_and_roles(&compute).await {
|
||||||
|
Ok(catalog_objects) => JsonResponse::success(StatusCode::OK, catalog_objects),
|
||||||
|
Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e),
|
||||||
|
}
|
||||||
|
}
|
||||||
67
compute_tools/src/http/routes/extension_server.rs
Normal file
67
compute_tools/src/http/routes/extension_server.rs
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::{
|
||||||
|
extract::State,
|
||||||
|
response::{IntoResponse, Response},
|
||||||
|
};
|
||||||
|
use http::StatusCode;
|
||||||
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
compute::ComputeNode,
|
||||||
|
http::{
|
||||||
|
extract::{Path, Query},
|
||||||
|
JsonResponse,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
pub(in crate::http) struct ExtensionServerParams {
|
||||||
|
is_library: Option<bool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Download a remote extension.
|
||||||
|
pub(in crate::http) async fn download_extension(
|
||||||
|
Path(filename): Path<String>,
|
||||||
|
params: Query<ExtensionServerParams>,
|
||||||
|
State(compute): State<Arc<ComputeNode>>,
|
||||||
|
) -> Response {
|
||||||
|
// Don't even try to download extensions if no remote storage is configured
|
||||||
|
if compute.ext_remote_storage.is_none() {
|
||||||
|
return JsonResponse::error(
|
||||||
|
StatusCode::PRECONDITION_FAILED,
|
||||||
|
"remote storage is not configured",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let ext = {
|
||||||
|
let state = compute.state.lock().unwrap();
|
||||||
|
let pspec = state.pspec.as_ref().unwrap();
|
||||||
|
let spec = &pspec.spec;
|
||||||
|
|
||||||
|
let remote_extensions = match spec.remote_extensions.as_ref() {
|
||||||
|
Some(r) => r,
|
||||||
|
None => {
|
||||||
|
return JsonResponse::error(
|
||||||
|
StatusCode::CONFLICT,
|
||||||
|
"information about remote extensions is unavailable",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
remote_extensions.get_ext(
|
||||||
|
&filename,
|
||||||
|
params.is_library.unwrap_or(false),
|
||||||
|
&compute.build_tag,
|
||||||
|
&compute.pgversion,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
match ext {
|
||||||
|
Ok((ext_name, ext_path)) => match compute.download_extension(ext_name, ext_path).await {
|
||||||
|
Ok(_) => StatusCode::OK.into_response(),
|
||||||
|
Err(e) => JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e),
|
||||||
|
},
|
||||||
|
Err(e) => JsonResponse::error(StatusCode::NOT_FOUND, e),
|
||||||
|
}
|
||||||
|
}
|
||||||
45
compute_tools/src/http/routes/extensions.rs
Normal file
45
compute_tools/src/http/routes/extensions.rs
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::{extract::State, response::Response};
|
||||||
|
use compute_api::{
|
||||||
|
requests::ExtensionInstallRequest,
|
||||||
|
responses::{ComputeStatus, ExtensionInstallResponse},
|
||||||
|
};
|
||||||
|
use http::StatusCode;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
compute::ComputeNode,
|
||||||
|
http::{extract::Json, JsonResponse},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Install a extension.
|
||||||
|
pub(in crate::http) async fn install_extension(
|
||||||
|
State(compute): State<Arc<ComputeNode>>,
|
||||||
|
request: Json<ExtensionInstallRequest>,
|
||||||
|
) -> Response {
|
||||||
|
let status = compute.get_status();
|
||||||
|
if status != ComputeStatus::Running {
|
||||||
|
return JsonResponse::invalid_status(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
match compute
|
||||||
|
.install_extension(
|
||||||
|
&request.extension,
|
||||||
|
&request.database,
|
||||||
|
request.version.to_string(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(version) => JsonResponse::success(
|
||||||
|
StatusCode::CREATED,
|
||||||
|
Some(ExtensionInstallResponse {
|
||||||
|
extension: request.extension.clone(),
|
||||||
|
version,
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
Err(e) => JsonResponse::error(
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
format!("failed to install extension: {e}"),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
35
compute_tools/src/http/routes/failpoints.rs
Normal file
35
compute_tools/src/http/routes/failpoints.rs
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
use axum::response::{IntoResponse, Response};
|
||||||
|
use http::StatusCode;
|
||||||
|
use tracing::info;
|
||||||
|
use utils::failpoint_support::{apply_failpoint, ConfigureFailpointsRequest};
|
||||||
|
|
||||||
|
use crate::http::{extract::Json, JsonResponse};
|
||||||
|
|
||||||
|
/// Configure failpoints for testing purposes.
|
||||||
|
pub(in crate::http) async fn configure_failpoints(
|
||||||
|
failpoints: Json<ConfigureFailpointsRequest>,
|
||||||
|
) -> Response {
|
||||||
|
if !fail::has_failpoints() {
|
||||||
|
return JsonResponse::error(
|
||||||
|
StatusCode::PRECONDITION_FAILED,
|
||||||
|
"Cannot manage failpoints because neon was compiled without failpoints support",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
for fp in &*failpoints {
|
||||||
|
info!("cfg failpoint: {} {}", fp.name, fp.actions);
|
||||||
|
|
||||||
|
// We recognize one extra "action" that's not natively recognized
|
||||||
|
// by the failpoints crate: exit, to immediately kill the process
|
||||||
|
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
|
||||||
|
|
||||||
|
if let Err(e) = cfg_result {
|
||||||
|
return JsonResponse::error(
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
format!("failed to configure failpoints: {e}"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
StatusCode::OK.into_response()
|
||||||
|
}
|
||||||
48
compute_tools/src/http/routes/grants.rs
Normal file
48
compute_tools/src/http/routes/grants.rs
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::{extract::State, response::Response};
|
||||||
|
use compute_api::{
|
||||||
|
requests::SetRoleGrantsRequest,
|
||||||
|
responses::{ComputeStatus, SetRoleGrantsResponse},
|
||||||
|
};
|
||||||
|
use http::StatusCode;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
compute::ComputeNode,
|
||||||
|
http::{extract::Json, JsonResponse},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Add grants for a role.
|
||||||
|
pub(in crate::http) async fn add_grant(
|
||||||
|
State(compute): State<Arc<ComputeNode>>,
|
||||||
|
request: Json<SetRoleGrantsRequest>,
|
||||||
|
) -> Response {
|
||||||
|
let status = compute.get_status();
|
||||||
|
if status != ComputeStatus::Running {
|
||||||
|
return JsonResponse::invalid_status(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
match compute
|
||||||
|
.set_role_grants(
|
||||||
|
&request.database,
|
||||||
|
&request.schema,
|
||||||
|
&request.privileges,
|
||||||
|
&request.role,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(()) => JsonResponse::success(
|
||||||
|
StatusCode::CREATED,
|
||||||
|
Some(SetRoleGrantsResponse {
|
||||||
|
database: request.database.clone(),
|
||||||
|
schema: request.schema.clone(),
|
||||||
|
role: request.role.clone(),
|
||||||
|
privileges: request.privileges.clone(),
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
Err(e) => JsonResponse::error(
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
format!("failed to grant role privileges to the schema: {e}"),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
11
compute_tools/src/http/routes/info.rs
Normal file
11
compute_tools/src/http/routes/info.rs
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
use axum::response::Response;
|
||||||
|
use compute_api::responses::InfoResponse;
|
||||||
|
use http::StatusCode;
|
||||||
|
|
||||||
|
use crate::http::JsonResponse;
|
||||||
|
|
||||||
|
/// Get information about the physical characteristics about the compute.
|
||||||
|
pub(in crate::http) async fn get_info() -> Response {
|
||||||
|
let num_cpus = num_cpus::get_physical();
|
||||||
|
JsonResponse::success(StatusCode::OK, &InfoResponse { num_cpus })
|
||||||
|
}
|
||||||
18
compute_tools/src/http/routes/insights.rs
Normal file
18
compute_tools/src/http/routes/insights.rs
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::{extract::State, response::Response};
|
||||||
|
use compute_api::responses::ComputeStatus;
|
||||||
|
use http::StatusCode;
|
||||||
|
|
||||||
|
use crate::{compute::ComputeNode, http::JsonResponse};
|
||||||
|
|
||||||
|
/// Collect current Postgres usage insights.
|
||||||
|
pub(in crate::http) async fn get_insights(State(compute): State<Arc<ComputeNode>>) -> Response {
|
||||||
|
let status = compute.get_status();
|
||||||
|
if status != ComputeStatus::Running {
|
||||||
|
return JsonResponse::invalid_status(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
let insights = compute.collect_insights().await;
|
||||||
|
JsonResponse::success(StatusCode::OK, insights)
|
||||||
|
}
|
||||||
33
compute_tools/src/http/routes/installed_extensions.rs
Normal file
33
compute_tools/src/http/routes/installed_extensions.rs
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::{extract::State, response::Response};
|
||||||
|
use compute_api::responses::ComputeStatus;
|
||||||
|
use http::StatusCode;
|
||||||
|
use tokio::task;
|
||||||
|
|
||||||
|
use crate::{compute::ComputeNode, http::JsonResponse, installed_extensions};
|
||||||
|
|
||||||
|
/// Get a list of installed extensions.
|
||||||
|
pub(in crate::http) async fn get_installed_extensions(
|
||||||
|
State(compute): State<Arc<ComputeNode>>,
|
||||||
|
) -> Response {
|
||||||
|
let status = compute.get_status();
|
||||||
|
if status != ComputeStatus::Running {
|
||||||
|
return JsonResponse::invalid_status(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
let conf = compute.get_conn_conf(None);
|
||||||
|
let res = task::spawn_blocking(move || installed_extensions::get_installed_extensions(conf))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(installed_extensions) => {
|
||||||
|
JsonResponse::success(StatusCode::OK, Some(installed_extensions))
|
||||||
|
}
|
||||||
|
Err(e) => JsonResponse::error(
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
format!("failed to get list of installed extensions: {e}"),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
32
compute_tools/src/http/routes/metrics.rs
Normal file
32
compute_tools/src/http/routes/metrics.rs
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
use axum::{body::Body, response::Response};
|
||||||
|
use http::header::CONTENT_TYPE;
|
||||||
|
use http::StatusCode;
|
||||||
|
use metrics::proto::MetricFamily;
|
||||||
|
use metrics::Encoder;
|
||||||
|
use metrics::TextEncoder;
|
||||||
|
|
||||||
|
use crate::{http::JsonResponse, installed_extensions};
|
||||||
|
|
||||||
|
/// Expose Prometheus metrics.
|
||||||
|
pub(in crate::http) async fn get_metrics() -> Response {
|
||||||
|
// When we call TextEncoder::encode() below, it will immediately return an
|
||||||
|
// error if a metric family has no metrics, so we need to preemptively
|
||||||
|
// filter out metric families with no metrics.
|
||||||
|
let metrics = installed_extensions::collect()
|
||||||
|
.into_iter()
|
||||||
|
.filter(|m| !m.get_metric().is_empty())
|
||||||
|
.collect::<Vec<MetricFamily>>();
|
||||||
|
|
||||||
|
let encoder = TextEncoder::new();
|
||||||
|
let mut buffer = vec![];
|
||||||
|
|
||||||
|
if let Err(e) = encoder.encode(&metrics, &mut buffer) {
|
||||||
|
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
Response::builder()
|
||||||
|
.status(StatusCode::OK)
|
||||||
|
.header(CONTENT_TYPE, encoder.format_type())
|
||||||
|
.body(Body::from(buffer))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
12
compute_tools/src/http/routes/metrics_json.rs
Normal file
12
compute_tools/src/http/routes/metrics_json.rs
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::{extract::State, response::Response};
|
||||||
|
use http::StatusCode;
|
||||||
|
|
||||||
|
use crate::{compute::ComputeNode, http::JsonResponse};
|
||||||
|
|
||||||
|
/// Get startup metrics.
|
||||||
|
pub(in crate::http) async fn get_metrics(State(compute): State<Arc<ComputeNode>>) -> Response {
|
||||||
|
let metrics = compute.state.lock().unwrap().metrics.clone();
|
||||||
|
JsonResponse::success(StatusCode::OK, metrics)
|
||||||
|
}
|
||||||
38
compute_tools/src/http/routes/mod.rs
Normal file
38
compute_tools/src/http/routes/mod.rs
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
use compute_api::responses::ComputeStatusResponse;
|
||||||
|
|
||||||
|
use crate::compute::ComputeState;
|
||||||
|
|
||||||
|
pub(in crate::http) mod check_writability;
|
||||||
|
pub(in crate::http) mod configure;
|
||||||
|
pub(in crate::http) mod database_schema;
|
||||||
|
pub(in crate::http) mod dbs_and_roles;
|
||||||
|
pub(in crate::http) mod extension_server;
|
||||||
|
pub(in crate::http) mod extensions;
|
||||||
|
pub(in crate::http) mod failpoints;
|
||||||
|
pub(in crate::http) mod grants;
|
||||||
|
pub(in crate::http) mod info;
|
||||||
|
pub(in crate::http) mod insights;
|
||||||
|
pub(in crate::http) mod installed_extensions;
|
||||||
|
pub(in crate::http) mod metrics;
|
||||||
|
pub(in crate::http) mod metrics_json;
|
||||||
|
pub(in crate::http) mod status;
|
||||||
|
pub(in crate::http) mod terminate;
|
||||||
|
|
||||||
|
impl From<&ComputeState> for ComputeStatusResponse {
|
||||||
|
fn from(state: &ComputeState) -> Self {
|
||||||
|
ComputeStatusResponse {
|
||||||
|
start_time: state.start_time,
|
||||||
|
tenant: state
|
||||||
|
.pspec
|
||||||
|
.as_ref()
|
||||||
|
.map(|pspec| pspec.tenant_id.to_string()),
|
||||||
|
timeline: state
|
||||||
|
.pspec
|
||||||
|
.as_ref()
|
||||||
|
.map(|pspec| pspec.timeline_id.to_string()),
|
||||||
|
status: state.status,
|
||||||
|
last_active: state.last_active,
|
||||||
|
error: state.error.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
14
compute_tools/src/http/routes/status.rs
Normal file
14
compute_tools/src/http/routes/status.rs
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
use std::{ops::Deref, sync::Arc};
|
||||||
|
|
||||||
|
use axum::{extract::State, http::StatusCode, response::Response};
|
||||||
|
use compute_api::responses::ComputeStatusResponse;
|
||||||
|
|
||||||
|
use crate::{compute::ComputeNode, http::JsonResponse};
|
||||||
|
|
||||||
|
/// Retrieve the state of the comute.
|
||||||
|
pub(in crate::http) async fn get_status(State(compute): State<Arc<ComputeNode>>) -> Response {
|
||||||
|
let state = compute.state.lock().unwrap();
|
||||||
|
let body = ComputeStatusResponse::from(state.deref());
|
||||||
|
|
||||||
|
JsonResponse::success(StatusCode::OK, body)
|
||||||
|
}
|
||||||
58
compute_tools/src/http/routes/terminate.rs
Normal file
58
compute_tools/src/http/routes/terminate.rs
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use axum::{
|
||||||
|
extract::State,
|
||||||
|
response::{IntoResponse, Response},
|
||||||
|
};
|
||||||
|
use compute_api::responses::ComputeStatus;
|
||||||
|
use http::StatusCode;
|
||||||
|
use tokio::task;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
compute::{forward_termination_signal, ComputeNode},
|
||||||
|
http::JsonResponse,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Terminate the compute.
|
||||||
|
pub(in crate::http) async fn terminate(State(compute): State<Arc<ComputeNode>>) -> Response {
|
||||||
|
{
|
||||||
|
let mut state = compute.state.lock().unwrap();
|
||||||
|
if state.status == ComputeStatus::Terminated {
|
||||||
|
return StatusCode::CREATED.into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
|
||||||
|
return JsonResponse::invalid_status(state.status);
|
||||||
|
}
|
||||||
|
|
||||||
|
state.set_status(ComputeStatus::TerminationPending, &compute.state_changed);
|
||||||
|
drop(state);
|
||||||
|
}
|
||||||
|
|
||||||
|
forward_termination_signal();
|
||||||
|
info!("sent signal and notified waiters");
|
||||||
|
|
||||||
|
// Spawn a blocking thread to wait for compute to become Terminated.
|
||||||
|
// This is needed to do not block the main pool of workers and
|
||||||
|
// be able to serve other requests while some particular request
|
||||||
|
// is waiting for compute to finish configuration.
|
||||||
|
let c = compute.clone();
|
||||||
|
task::spawn_blocking(move || {
|
||||||
|
let mut state = c.state.lock().unwrap();
|
||||||
|
while state.status != ComputeStatus::Terminated {
|
||||||
|
state = c.state_changed.wait(state).unwrap();
|
||||||
|
info!(
|
||||||
|
"waiting for compute to become {}, current status: {:?}",
|
||||||
|
ComputeStatus::Terminated,
|
||||||
|
state.status
|
||||||
|
);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
info!("terminated Postgres");
|
||||||
|
|
||||||
|
StatusCode::OK.into_response()
|
||||||
|
}
|
||||||
165
compute_tools/src/http/server.rs
Normal file
165
compute_tools/src/http/server.rs
Normal file
@@ -0,0 +1,165 @@
|
|||||||
|
use std::{
|
||||||
|
net::{IpAddr, Ipv6Addr, SocketAddr},
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicU64, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
thread,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use axum::{
|
||||||
|
response::{IntoResponse, Response},
|
||||||
|
routing::{get, post},
|
||||||
|
Router,
|
||||||
|
};
|
||||||
|
use http::StatusCode;
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
use tower::ServiceBuilder;
|
||||||
|
use tower_http::{
|
||||||
|
request_id::{MakeRequestId, PropagateRequestIdLayer, RequestId, SetRequestIdLayer},
|
||||||
|
trace::TraceLayer,
|
||||||
|
};
|
||||||
|
use tracing::{debug, error, info, Span};
|
||||||
|
|
||||||
|
use super::routes::{
|
||||||
|
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
|
||||||
|
grants, info as info_route, insights, installed_extensions, metrics, metrics_json, status,
|
||||||
|
terminate,
|
||||||
|
};
|
||||||
|
use crate::compute::ComputeNode;
|
||||||
|
|
||||||
|
async fn handle_404() -> Response {
|
||||||
|
StatusCode::NOT_FOUND.into_response()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Default)]
|
||||||
|
struct ComputeMakeRequestId(Arc<AtomicU64>);
|
||||||
|
|
||||||
|
impl MakeRequestId for ComputeMakeRequestId {
|
||||||
|
fn make_request_id<B>(
|
||||||
|
&mut self,
|
||||||
|
_request: &http::Request<B>,
|
||||||
|
) -> Option<tower_http::request_id::RequestId> {
|
||||||
|
let request_id = self
|
||||||
|
.0
|
||||||
|
.fetch_add(1, Ordering::SeqCst)
|
||||||
|
.to_string()
|
||||||
|
.parse()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
Some(RequestId::new(request_id))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run the HTTP server and wait on it forever.
|
||||||
|
#[tokio::main]
|
||||||
|
async fn serve(port: u16, compute: Arc<ComputeNode>) {
|
||||||
|
const X_REQUEST_ID: &str = "x-request-id";
|
||||||
|
|
||||||
|
let mut app = Router::new()
|
||||||
|
.route("/check_writability", post(check_writability::is_writable))
|
||||||
|
.route("/configure", post(configure::configure))
|
||||||
|
.route("/database_schema", get(database_schema::get_schema_dump))
|
||||||
|
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
|
||||||
|
.route(
|
||||||
|
"/extension_server/*filename",
|
||||||
|
post(extension_server::download_extension),
|
||||||
|
)
|
||||||
|
.route("/extensions", post(extensions::install_extension))
|
||||||
|
.route("/grants", post(grants::add_grant))
|
||||||
|
.route("/info", get(info_route::get_info))
|
||||||
|
.route("/insights", get(insights::get_insights))
|
||||||
|
.route(
|
||||||
|
"/installed_extensions",
|
||||||
|
get(installed_extensions::get_installed_extensions),
|
||||||
|
)
|
||||||
|
.route("/metrics", get(metrics::get_metrics))
|
||||||
|
.route("/metrics.json", get(metrics_json::get_metrics))
|
||||||
|
.route("/status", get(status::get_status))
|
||||||
|
.route("/terminate", post(terminate::terminate))
|
||||||
|
.fallback(handle_404)
|
||||||
|
.layer(
|
||||||
|
ServiceBuilder::new()
|
||||||
|
.layer(SetRequestIdLayer::x_request_id(
|
||||||
|
ComputeMakeRequestId::default(),
|
||||||
|
))
|
||||||
|
.layer(
|
||||||
|
TraceLayer::new_for_http()
|
||||||
|
.on_request(|request: &http::Request<_>, _span: &Span| {
|
||||||
|
let request_id = request
|
||||||
|
.headers()
|
||||||
|
.get(X_REQUEST_ID)
|
||||||
|
.unwrap()
|
||||||
|
.to_str()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
match request.uri().path() {
|
||||||
|
"/metrics" => {
|
||||||
|
debug!(%request_id, "{} {}", request.method(), request.uri())
|
||||||
|
}
|
||||||
|
_ => info!(%request_id, "{} {}", request.method(), request.uri()),
|
||||||
|
};
|
||||||
|
})
|
||||||
|
.on_response(
|
||||||
|
|response: &http::Response<_>, latency: Duration, _span: &Span| {
|
||||||
|
let request_id = response
|
||||||
|
.headers()
|
||||||
|
.get(X_REQUEST_ID)
|
||||||
|
.unwrap()
|
||||||
|
.to_str()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
info!(
|
||||||
|
%request_id,
|
||||||
|
code = response.status().as_u16(),
|
||||||
|
latency = latency.as_millis()
|
||||||
|
)
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.layer(PropagateRequestIdLayer::x_request_id()),
|
||||||
|
)
|
||||||
|
.with_state(compute);
|
||||||
|
|
||||||
|
// Add in any testing support
|
||||||
|
if cfg!(feature = "testing") {
|
||||||
|
use super::routes::failpoints;
|
||||||
|
|
||||||
|
app = app.route("/failpoints", post(failpoints::configure_failpoints))
|
||||||
|
}
|
||||||
|
|
||||||
|
// This usually binds to both IPv4 and IPv6 on Linux, see
|
||||||
|
// https://github.com/rust-lang/rust/pull/34440 for more information
|
||||||
|
let addr = SocketAddr::new(IpAddr::from(Ipv6Addr::UNSPECIFIED), port);
|
||||||
|
let listener = match TcpListener::bind(&addr).await {
|
||||||
|
Ok(listener) => listener,
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
"failed to bind the compute_ctl HTTP server to port {}: {}",
|
||||||
|
port, e
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Ok(local_addr) = listener.local_addr() {
|
||||||
|
info!("compute_ctl HTTP server listening on {}", local_addr);
|
||||||
|
} else {
|
||||||
|
info!("compute_ctl HTTP server listening on port {}", port);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = axum::serve(listener, app).await {
|
||||||
|
error!("compute_ctl HTTP server error: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Launch a separate HTTP server thread and return its `JoinHandle`.
|
||||||
|
pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
|
||||||
|
let state = Arc::clone(state);
|
||||||
|
|
||||||
|
Ok(thread::Builder::new()
|
||||||
|
.name("http-server".into())
|
||||||
|
.spawn(move || serve(port, state))?)
|
||||||
|
}
|
||||||
@@ -3,8 +3,6 @@
|
|||||||
#![deny(unsafe_code)]
|
#![deny(unsafe_code)]
|
||||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||||
|
|
||||||
extern crate hyper0 as hyper;
|
|
||||||
|
|
||||||
pub mod checker;
|
pub mod checker;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod configurator;
|
pub mod configurator;
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ use crate::local_env::LocalEnv;
|
|||||||
use crate::postgresql_conf::PostgresConf;
|
use crate::postgresql_conf::PostgresConf;
|
||||||
use crate::storage_controller::StorageController;
|
use crate::storage_controller::StorageController;
|
||||||
|
|
||||||
use compute_api::responses::{ComputeState, ComputeStatus};
|
use compute_api::responses::{ComputeStatus, ComputeStatusResponse};
|
||||||
use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
|
use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
|
||||||
|
|
||||||
// contents of a endpoint.json file
|
// contents of a endpoint.json file
|
||||||
@@ -739,7 +739,7 @@ impl Endpoint {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Call the /status HTTP API
|
// Call the /status HTTP API
|
||||||
pub async fn get_status(&self) -> Result<ComputeState> {
|
pub async fn get_status(&self) -> Result<ComputeStatusResponse> {
|
||||||
let client = reqwest::Client::new();
|
let client = reqwest::Client::new();
|
||||||
|
|
||||||
let response = client
|
let response = client
|
||||||
|
|||||||
@@ -15,6 +15,17 @@ pub struct GenericAPIError {
|
|||||||
pub error: String,
|
pub error: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct InfoResponse {
|
||||||
|
pub num_cpus: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct ExtensionInstallResponse {
|
||||||
|
pub extension: PgIdent,
|
||||||
|
pub version: ExtVersion,
|
||||||
|
}
|
||||||
|
|
||||||
/// Response of the /status API
|
/// Response of the /status API
|
||||||
#[derive(Serialize, Debug, Deserialize)]
|
#[derive(Serialize, Debug, Deserialize)]
|
||||||
#[serde(rename_all = "snake_case")]
|
#[serde(rename_all = "snake_case")]
|
||||||
@@ -28,16 +39,6 @@ pub struct ComputeStatusResponse {
|
|||||||
pub error: Option<String>,
|
pub error: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize)]
|
|
||||||
#[serde(rename_all = "snake_case")]
|
|
||||||
pub struct ComputeState {
|
|
||||||
pub status: ComputeStatus,
|
|
||||||
/// Timestamp of the last Postgres activity
|
|
||||||
#[serde(serialize_with = "rfc3339_serialize")]
|
|
||||||
pub last_active: Option<DateTime<Utc>>,
|
|
||||||
pub error: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
|
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
|
||||||
#[serde(rename_all = "snake_case")]
|
#[serde(rename_all = "snake_case")]
|
||||||
pub enum ComputeStatus {
|
pub enum ComputeStatus {
|
||||||
@@ -78,7 +79,7 @@ impl Display for ComputeStatus {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn rfc3339_serialize<S>(x: &Option<DateTime<Utc>>, s: S) -> Result<S::Ok, S::Error>
|
pub fn rfc3339_serialize<S>(x: &Option<DateTime<Utc>>, s: S) -> Result<S::Ok, S::Error>
|
||||||
where
|
where
|
||||||
S: Serializer,
|
S: Serializer,
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -91,7 +91,8 @@ tokio-stream = { version = "0.1", features = ["net"] }
|
|||||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||||
toml_edit = { version = "0.22", features = ["serde"] }
|
toml_edit = { version = "0.22", features = ["serde"] }
|
||||||
tonic = { version = "0.12", features = ["tls-roots"] }
|
tonic = { version = "0.12", features = ["tls-roots"] }
|
||||||
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "log", "util"] }
|
tower-9fbad63c4bcf4a8f = { package = "tower", version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "util"] }
|
||||||
|
tower-d8f496e17d97b5cb = { package = "tower", version = "0.5", default-features = false, features = ["log", "make", "util"] }
|
||||||
tracing = { version = "0.1", features = ["log"] }
|
tracing = { version = "0.1", features = ["log"] }
|
||||||
tracing-core = { version = "0.1" }
|
tracing-core = { version = "0.1" }
|
||||||
url = { version = "2", features = ["serde"] }
|
url = { version = "2", features = ["serde"] }
|
||||||
|
|||||||
Reference in New Issue
Block a user