Compare commits

...

3 Commits

Author SHA1 Message Date
Konstantin Knizhnik
c7f6ace947 Update compute_tools/src/compute.rs
Co-authored-by: Alexey Kondratov <kondratov.aleksey@gmail.com>
2025-04-17 17:40:54 +03:00
Konstantin Knizhnik
b3b504c969 Fix formatting 2025-04-16 23:19:05 +03:00
Konstantin Knizhnik
72e0fd704a Track last_active_query and activity kind in ComputeStatusResponse 2025-04-16 21:09:24 +03:00
4 changed files with 47 additions and 8 deletions

View File

@@ -11,7 +11,9 @@ use std::{env, fs};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus};
use compute_api::responses::{
ActivityKind, ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus,
};
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
};
@@ -132,6 +134,10 @@ pub struct ComputeState {
/// Timestamp of the last Postgres activity. It could be `None` if
/// compute wasn't used since start.
pub last_active: Option<DateTime<Utc>>,
/// Timestamp of the last client's activity. Unlike `last_active` it doesn't take into account
/// baclkground activity: autovacuum, LR,...
pub last_active_query: Option<DateTime<Utc>>,
pub last_activity_kind: Option<ActivityKind>,
pub error: Option<String>,
/// Compute spec. This can be received from the CLI or - more likely -
@@ -159,6 +165,8 @@ impl ComputeState {
start_time: Utc::now(),
status: ComputeStatus::Empty,
last_active: None,
last_active_query: None,
last_activity_kind: None,
error: None,
pspec: None,
startup_span: None,
@@ -1688,13 +1696,22 @@ impl ComputeNode {
}
/// Update the `last_active` in the shared state, but ensure that it's a more recent one.
pub fn update_last_active(&self, last_active: Option<DateTime<Utc>>) {
pub fn update_last_active(
&self,
last_active: Option<DateTime<Utc>>,
activity_kind: ActivityKind,
) {
let mut state = self.state.lock().unwrap();
// NB: `Some(<DateTime>)` is always greater than `None`.
if last_active > state.last_active {
state.last_active = last_active;
debug!("set the last compute activity time to: {:?}", last_active);
}
if activity_kind == ActivityKind::Query && last_active > state.last_active_query {
state.last_active_query = last_active;
debug!("set the last user's activity time to: {:?}", last_active);
}
state.last_activity_kind = Some(activity_kind);
}
// Look for core dumps and collect backtraces.

View File

@@ -30,6 +30,8 @@ impl From<&ComputeState> for ComputeStatusResponse {
.map(|pspec| pspec.timeline_id.to_string()),
status: state.status,
last_active: state.last_active,
last_active_query: state.last_active_query,
last_activity_kind: state.last_activity_kind,
error: state.error.clone(),
}
}

View File

@@ -3,7 +3,7 @@ use std::thread;
use std::time::Duration;
use chrono::{DateTime, Utc};
use compute_api::responses::ComputeStatus;
use compute_api::responses::{ActivityKind, ComputeStatus};
use compute_api::spec::ComputeFeature;
use postgres::{Client, NoTls};
use tracing::{debug, error, info, warn};
@@ -91,7 +91,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
if detected_activity {
// Update the last active time and continue, we don't need to
// check backends state change.
compute.update_last_active(Some(Utc::now()));
compute.update_last_active(Some(Utc::now()), ActivityKind::Query);
continue;
}
}
@@ -109,7 +109,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
// This helps us to discover new sessions, that did nothing yet.
match get_backends_state_change(cli) {
Ok(last_active) => {
compute.update_last_active(last_active);
compute.update_last_active(last_active, ActivityKind::Query);
}
Err(e) => {
error!("could not get backends state change: {}", e);
@@ -125,7 +125,10 @@ fn watch_compute_activity(compute: &ComputeNode) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_ws) => {
if num_ws > 0 {
compute.update_last_active(Some(Utc::now()));
compute.update_last_active(
Some(Utc::now()),
ActivityKind::LogicalReplication,
);
continue;
}
}
@@ -150,7 +153,10 @@ fn watch_compute_activity(compute: &ComputeNode) {
Ok(row) => match row.try_get::<&str, i64>("count") {
Ok(num_subscribers) => {
if num_subscribers > 0 {
compute.update_last_active(Some(Utc::now()));
compute.update_last_active(
Some(Utc::now()),
ActivityKind::LogicalReplication,
);
continue;
}
}
@@ -175,7 +181,8 @@ fn watch_compute_activity(compute: &ComputeNode) {
Ok(r) => match r.try_get::<&str, i64>("count") {
Ok(num_workers) => {
if num_workers > 0 {
compute.update_last_active(Some(Utc::now()));
compute
.update_last_active(Some(Utc::now()), ActivityKind::Autovacuum);
continue;
}
}

View File

@@ -56,9 +56,22 @@ pub struct ComputeStatusResponse {
pub status: ComputeStatus,
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: Option<DateTime<Utc>>,
pub last_active_query: Option<DateTime<Utc>>,
pub last_activity_kind: Option<ActivityKind>,
pub error: Option<String>,
}
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ActivityKind {
// Client's query is executed
Query,
// Logical replication is active (subscription or publication)
LogicalReplication,
// Autovacuum is active
Autovacuum,
}
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {