Compare commits

...

3 Commits

Author SHA1 Message Date
Vadim Kharitonov
04481ef500 polish Joonas's comment 2023-07-26 17:28:26 +02:00
Vadim Kharitonov
0551410c8a Update compute_tools/src/http/api.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-07-26 15:35:14 +02:00
Vadim Kharitonov
07da78c056 Remove references to Arc states in compute_ctl 2023-07-26 15:35:11 +02:00
5 changed files with 60 additions and 61 deletions

View File

@@ -188,7 +188,7 @@ fn main() -> Result<()> {
// Launch http service first, so we were able to serve control-plane
// requests, while configuration is still in progress.
let _http_handle =
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
launch_http_server(http_port, compute.clone()).expect("cannot launch http endpoint thread");
if !spec_set {
// No spec provided, hang waiting for it.
@@ -223,8 +223,8 @@ fn main() -> Result<()> {
drop(state);
// Launch remaining service threads
let _monitor_handle = launch_monitor(&compute);
let _configurator_handle = launch_configurator(&compute);
let _monitor_handle = launch_monitor(compute.clone());
let _configurator_handle = launch_configurator(compute.clone());
// Start Postgres
let mut delay_exit = false;

View File

@@ -1,3 +1,5 @@
use std::sync::Arc;
use anyhow::{anyhow, Result};
use tokio_postgres::NoTls;
use tracing::{error, instrument};
@@ -8,7 +10,7 @@ use crate::compute::ComputeNode;
/// that we can actually write some data in this particular timeline.
/// Create table if it's missing.
#[instrument(skip_all)]
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
pub async fn check_writability(compute: Arc<ComputeNode>) -> Result<()> {
// Connect to the database.
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
if client.is_closed() {

View File

@@ -8,46 +8,46 @@ use compute_api::responses::ComputeStatus;
use crate::compute::ComputeNode;
#[instrument(skip_all)]
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
fn configurator_main_loop(compute: Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
loop {
let state = compute.state.lock().unwrap();
let mut state = compute.state_changed.wait(state).unwrap();
if state.status == ComputeStatus::ConfigurationPending {
info!("got configuration request");
state.status = ComputeStatus::Configuration;
compute.state_changed.notify_all();
drop(state);
match state.status {
ComputeStatus::ConfigurationPending => {
info!("got configuration request");
state.status = ComputeStatus::Configuration;
compute.state_changed.notify_all();
drop(state);
let mut new_status = ComputeStatus::Failed;
if let Err(e) = compute.reconfigure() {
error!("could not configure compute node: {}", e);
} else {
new_status = ComputeStatus::Running;
info!("compute node configured");
let new_status = if let Err(e) = compute.reconfigure() {
error!("could not configure compute node: {}", e);
ComputeStatus::Failed
} else {
info!("compute node configured");
ComputeStatus::Running
};
// XXX: used to test that API is blocking
// std::thread::sleep(std::time::Duration::from_millis(10000));
compute.set_status(new_status);
}
// XXX: used to test that API is blocking
// std::thread::sleep(std::time::Duration::from_millis(10000));
compute.set_status(new_status);
} else if state.status == ComputeStatus::Failed {
info!("compute node is now in Failed state, exiting");
break;
} else {
info!("woken up for compute status: {:?}, sleeping", state.status);
ComputeStatus::Failed => {
info!("compute node is in Failed state, exiting");
break;
}
_ => info!("woken up for compute status: {:?}, sleeping", state.status),
}
}
}
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
let compute = Arc::clone(compute);
pub fn launch_configurator(compute: Arc<ComputeNode>) -> thread::JoinHandle<()> {
thread::Builder::new()
.name("compute-configurator".into())
.spawn(move || {
configurator_main_loop(&compute);
configurator_main_loop(compute);
info!("configurator thread is exited");
})
.expect("cannot launch configurator thread")

View File

@@ -34,7 +34,7 @@ fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
}
// Service function to handle all available routes.
async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
async fn routes(req: Request<Body>, compute: Arc<ComputeNode>) -> Response<Body> {
//
// NOTE: The URI path is currently included in traces. That's OK because
// it doesn't contain any variable parts or sensitive information. But
@@ -132,7 +132,7 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
async fn handle_configure_request(
req: Request<Body>,
compute: &Arc<ComputeNode>,
compute: Arc<ComputeNode>,
) -> Result<String, (String, StatusCode)> {
if !compute.live_config_allowed {
return Err((
@@ -142,8 +142,7 @@ async fn handle_configure_request(
}
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
if let Ok(request) = serde_json::from_slice::<ConfigurationRequest>(&body_bytes) {
let spec = request.spec;
let parsed_spec = match ParsedSpec::try_from(spec) {
@@ -177,27 +176,29 @@ async fn handle_configure_request(
// This is needed to do not block the main pool of workers and
// be able to serve other requests while some particular request
// is waiting for compute to finish configuration.
let c = compute.clone();
task::spawn_blocking(move || {
let mut state = c.state.lock().unwrap();
while state.status != ComputeStatus::Running {
state = c.state_changed.wait(state).unwrap();
info!(
"waiting for compute to become Running, current status: {:?}",
state.status
);
{
let compute = compute.clone();
task::spawn_blocking(move || {
let mut state = compute.state.lock().unwrap();
while state.status != ComputeStatus::Running {
state = compute.state_changed.wait(state).unwrap();
info!(
"waiting for compute to become Running, current status: {:?}",
state.status
);
if state.status == ComputeStatus::Failed {
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
if state.status == ComputeStatus::Failed {
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
}
}
}
Ok(())
})
.await
.unwrap()?;
Ok(())
})
.await
.unwrap()?;
}
// Return current compute state if everything went well.
let state = compute.state.lock().unwrap().clone();
@@ -235,7 +236,7 @@ async fn serve(port: u16, state: Arc<ComputeNode>) {
// information in this API.
tracing_utils::http::tracing_handler(
req,
|req| routes(req, &state),
|req| routes(req, state.clone()),
OtelName::UriPath,
)
.await,
@@ -256,9 +257,7 @@ async fn serve(port: u16, state: Arc<ComputeNode>) {
}
/// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`.
pub fn launch_http_server(port: u16, state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let state = Arc::clone(state);
pub fn launch_http_server(port: u16, state: Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
Ok(thread::Builder::new()
.name("http-endpoint".into())
.spawn(move || serve(port, state))?)

View File

@@ -12,7 +12,7 @@ const MONITOR_CHECK_INTERVAL: u64 = 500; // milliseconds
// Spin in a loop and figure out the last activity time in the Postgres.
// Then update it in the shared state. This function never errors out.
// XXX: the only expected panic is at `RwLock` unwrap().
fn watch_compute_activity(compute: &ComputeNode) {
fn watch_compute_activity(compute: Arc<ComputeNode>) {
// Suppose that `connstr` doesn't change
let connstr = compute.connstr.as_str();
// Define `client` outside of the loop to reuse existing connection if it's active.
@@ -104,11 +104,9 @@ fn watch_compute_activity(compute: &ComputeNode) {
}
/// Launch a separate compute monitor thread and return its `JoinHandle`.
pub fn launch_monitor(state: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
let state = Arc::clone(state);
pub fn launch_monitor(state: Arc<ComputeNode>) -> thread::JoinHandle<()> {
thread::Builder::new()
.name("compute-monitor".into())
.spawn(move || watch_compute_activity(&state))
.spawn(move || watch_compute_activity(state))
.expect("cannot launch compute monitor thread")
}