Move compute_ctl structs used in HTTP API and spec file to separate crate.

This is in preparation of using compute_ctl to launch postgres nodes
in the neon_local control plane. And seems like a good idea to
separate the public interfaces anyway.

One non-mechanical change here is that the 'metrics' field is moved
under the Mutex, instead of using atomics. We were not using atomics
for performance but for convenience here, and it seems more clear to
not use atomics in the model for the HTTP response type.
This commit is contained in:
Heikki Linnakangas
2023-04-09 21:52:28 +03:00
parent 818e341af0
commit f0b2e076d9
18 changed files with 271 additions and 210 deletions

View File

@@ -3,9 +3,9 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use crate::compute::{ComputeNode, ComputeStatus};
use crate::http::requests::ConfigurationRequest;
use crate::http::responses::{ComputeStatusResponse, GenericAPIError};
use crate::compute::{ComputeNode, ComputeState};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
use anyhow::Result;
use hyper::service::{make_service_fn, service_fn};
@@ -16,6 +16,16 @@ use tokio::task;
use tracing::{error, info};
use tracing_utils::http::OtelName;
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
tenant: state.tenant.clone(),
timeline: state.timeline.clone(),
status: state.status,
last_active: state.last_active,
error: state.error.clone(),
}
}
// Service function to handle all available routes.
async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
//
@@ -28,8 +38,7 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
(&Method::GET, "/status") => {
info!("serving /status GET request");
let state = compute.state.lock().unwrap();
let status_response = ComputeStatusResponse::from(state.clone());
let status_response = status_response_from_state(&state);
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
}
@@ -37,7 +46,8 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
// future use for Prometheus metrics format.
(&Method::GET, "/metrics.json") => {
info!("serving /metrics.json GET request");
Response::new(Body::from(serde_json::to_string(&compute.metrics).unwrap()))
let metrics = compute.state.lock().unwrap().metrics.clone();
Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
}
// Collect Postgres current usage insights
@@ -162,7 +172,7 @@ async fn handle_configure_request(
);
if state.status == ComputeStatus::Failed {
let err = state.error.clone().unwrap_or("unknown error".to_string());
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
}
@@ -175,7 +185,7 @@ async fn handle_configure_request(
// Return current compute state if everything went well.
let state = compute.state.lock().unwrap().clone();
let status_response = ComputeStatusResponse::from(state);
let status_response = status_response_from_state(&state);
Ok(serde_json::to_string(&status_response).unwrap())
} else {
Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))

View File

@@ -1,3 +1 @@
pub mod api;
pub mod requests;
pub mod responses;

View File

@@ -1,11 +0,0 @@
use serde::Deserialize;
use crate::spec::ComputeSpec;
/// We now pass only `spec` in the configuration request, but later we can
/// extend it and something like `restart: bool` or something else. So put
/// `spec` into a struct initially to be more flexible in the future.
#[derive(Deserialize, Debug)]
pub struct ConfigurationRequest {
pub spec: ComputeSpec,
}

View File

@@ -1,40 +0,0 @@
use serde::{Serialize, Serializer};
use chrono::{DateTime, Utc};
use crate::compute::{ComputeState, ComputeStatus};
#[derive(Serialize, Debug)]
pub struct GenericAPIError {
pub error: String,
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "snake_case")]
pub struct ComputeStatusResponse {
pub tenant: String,
pub timeline: String,
pub status: ComputeStatus,
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub error: Option<String>,
}
impl From<ComputeState> for ComputeStatusResponse {
fn from(state: ComputeState) -> Self {
ComputeStatusResponse {
tenant: state.tenant,
timeline: state.timeline,
status: state.status,
last_active: state.last_active,
error: state.error,
}
}
}
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
x.to_rfc3339().serialize(s)
}