Compare commits

...

1 Commits

Author SHA1 Message Date
Arseny Sher
b3d88a5e3e compute_ctl: log ttid everywhere.
Allows to easily find compute logs by timeline id, and now loki agent can make a
label from it.

pairs with
https://github.com/neondatabase/cloud/pull/7259
2023-10-06 01:02:41 +03:00
3 changed files with 28 additions and 11 deletions

View File

@@ -44,7 +44,7 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use chrono::Utc; use chrono::Utc;
use clap::Arg; use clap::Arg;
use tracing::{error, info}; use tracing::{error, info, info_span};
use url::Url; use url::Url;
use compute_api::responses::ComputeStatus; use compute_api::responses::ComputeStatus;
@@ -57,6 +57,7 @@ use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor; use compute_tools::monitor::launch_monitor;
use compute_tools::params::*; use compute_tools::params::*;
use compute_tools::spec::*; use compute_tools::spec::*;
use utils::id::TenantTimelineId;
// this is an arbitrary build tag. Fine as a default / for testing purposes // this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var // in-case of not-set environment var
@@ -249,11 +250,20 @@ fn main() -> Result<()> {
state.status = ComputeStatus::Init; state.status = ComputeStatus::Init;
compute.state_changed.notify_all(); compute.state_changed.notify_all();
let pspec = state.pspec.as_ref().expect("spec must be set");
let ttid = TenantTimelineId {
tenant_id: pspec.tenant_id,
timeline_id: pspec.timeline_id,
};
drop(state); drop(state);
// Log ttid everywhere for easier log identification (e.g. loki agent can
// create label on that).
let _guard = info_span!("", ttid = %ttid).entered();
// Launch remaining service threads // Launch remaining service threads
let _monitor_handle = launch_monitor(&compute); let _monitor_handle = launch_monitor(&compute, ttid);
let _configurator_handle = launch_configurator(&compute); let _configurator_handle = launch_configurator(&compute, ttid);
// Start Postgres // Start Postgres
let mut delay_exit = false; let mut delay_exit = false;

View File

@@ -4,11 +4,13 @@ use std::thread;
use tracing::{error, info, instrument}; use tracing::{error, info, instrument};
use compute_api::responses::ComputeStatus; use compute_api::responses::ComputeStatus;
use utils::id::TenantTimelineId;
use crate::compute::ComputeNode; use crate::compute::ComputeNode;
#[instrument(skip_all)] // Log ttid everywhere
fn configurator_main_loop(compute: &Arc<ComputeNode>) { #[instrument(name = "", fields(ttid = %ttid), skip_all)]
fn configurator_main_loop(compute: &Arc<ComputeNode>, ttid: TenantTimelineId) {
info!("waiting for reconfiguration requests"); info!("waiting for reconfiguration requests");
loop { loop {
let state = compute.state.lock().unwrap(); let state = compute.state.lock().unwrap();
@@ -41,13 +43,16 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
} }
} }
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> { pub fn launch_configurator(
compute: &Arc<ComputeNode>,
ttid: TenantTimelineId,
) -> thread::JoinHandle<()> {
let compute = Arc::clone(compute); let compute = Arc::clone(compute);
thread::Builder::new() thread::Builder::new()
.name("compute-configurator".into()) .name("compute-configurator".into())
.spawn(move || { .spawn(move || {
configurator_main_loop(&compute); configurator_main_loop(&compute, ttid);
info!("configurator thread is exited"); info!("configurator thread is exited");
}) })
.expect("cannot launch configurator thread") .expect("cannot launch configurator thread")

View File

@@ -3,7 +3,8 @@ use std::{thread, time::Duration};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use postgres::{Client, NoTls}; use postgres::{Client, NoTls};
use tracing::{debug, info}; use tracing::{debug, info, instrument};
use utils::id::TenantTimelineId;
use crate::compute::ComputeNode; use crate::compute::ComputeNode;
@@ -12,7 +13,8 @@ const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
// Spin in a loop and figure out the last activity time in the Postgres. // Spin in a loop and figure out the last activity time in the Postgres.
// Then update it in the shared state. This function never errors out. // Then update it in the shared state. This function never errors out.
// XXX: the only expected panic is at `RwLock` unwrap(). // XXX: the only expected panic is at `RwLock` unwrap().
fn watch_compute_activity(compute: &ComputeNode) { #[instrument(name = "", fields(ttid = %ttid), skip_all)]
fn watch_compute_activity(compute: &ComputeNode, ttid: TenantTimelineId) {
// Suppose that `connstr` doesn't change // Suppose that `connstr` doesn't change
let connstr = compute.connstr.as_str(); let connstr = compute.connstr.as_str();
// Define `client` outside of the loop to reuse existing connection if it's active. // Define `client` outside of the loop to reuse existing connection if it's active.
@@ -103,11 +105,11 @@ fn watch_compute_activity(compute: &ComputeNode) {
} }
/// Launch a separate compute monitor thread and return its `JoinHandle`. /// Launch a separate compute monitor thread and return its `JoinHandle`.
pub fn launch_monitor(state: &Arc<ComputeNode>) -> thread::JoinHandle<()> { pub fn launch_monitor(state: &Arc<ComputeNode>, ttid: TenantTimelineId) -> thread::JoinHandle<()> {
let state = Arc::clone(state); let state = Arc::clone(state);
thread::Builder::new() thread::Builder::new()
.name("compute-monitor".into()) .name("compute-monitor".into())
.spawn(move || watch_compute_activity(&state)) .spawn(move || watch_compute_activity(&state, ttid))
.expect("cannot launch compute monitor thread") .expect("cannot launch compute monitor thread")
} }