Compare commits

..

4 Commits

Author SHA1 Message Date
Anastasia Lubennikova
6ac7188c82 fix test timeout logic 2024-10-16 15:48:56 +01:00
Anastasia Lubennikova
2575be6dec remove redundant labels 2024-10-15 14:34:26 +01:00
Anastasia Lubennikova
e7d9f9245d Review fixes:
- do not use default collector,  collect only one metric;
- remove unit test, same behavior is covered by python test;
- code cleanup
2024-10-15 12:36:45 +01:00
Anastasia Lubennikova
b78e140fa8 Add installed_extensions prometheus metric
and add /metrics endpoint to compute_ctl to expose such metrics
2024-10-14 10:32:56 +01:00
11 changed files with 140 additions and 91 deletions

View File

@@ -1,41 +0,0 @@
name: Report Workflow Stats
on:
workflow_run:
workflows:
- Add `external` label to issues and PRs created by external users
- Benchmarking
- Build and Test
- Build and Test Locally
- Build build-tools image
- Check Permissions
- Check build-tools image
- Check neon with extra platform builds
- Cloud Regression Test
- Create Release Branch
- Handle `approved-for-ci-run` label
- Lint GitHub Workflows
- Notify Slack channel about upcoming release
- Periodic pagebench performance test on dedicated EC2 machine in eu-central-1 region
- Pin build-tools image
- Prepare benchmarking databases by restoring dumps
- Push images to ACR
- Test Postgres client libraries
- Trigger E2E Tests
- cleanup caches by a branch
types: [completed]
jobs:
gh-workflow-stats:
name: Github Workflow Stats
runs-on: ubuntu-22.04
permissions:
actions: read
steps:
- name: Export GH Workflow Stats
uses: fedordikarev/gh-workflow-stats-action@v0.1.2
with:
DB_URI: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
DB_TABLE: "gh_workflow_stats_neon"
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GH_RUN_ID: ${{ github.event.workflow_run.id }}

3
Cargo.lock generated
View File

@@ -1228,12 +1228,15 @@ dependencies = [
"flate2",
"futures",
"hyper 0.14.30",
"metrics",
"nix 0.27.1",
"notify",
"num_cpus",
"once_cell",
"opentelemetry",
"opentelemetry_sdk",
"postgres",
"prometheus",
"regex",
"remote_storage",
"reqwest 0.12.4",

View File

@@ -18,9 +18,11 @@ clap.workspace = true
flate2.workspace = true
futures.workspace = true
hyper0 = { workspace = true, features = ["full"] }
metrics.workspace = true
nix.workspace = true
notify.workspace = true
num_cpus.workspace = true
once_cell.workspace = true
opentelemetry.workspace = true
opentelemetry_sdk.workspace = true
postgres.workspace = true
@@ -39,6 +41,7 @@ tracing-subscriber.workspace = true
tracing-utils.workspace = true
thiserror.workspace = true
url.workspace = true
prometheus.workspace = true
compute_api.workspace = true
utils.workspace = true

View File

@@ -1121,6 +1121,7 @@ impl ComputeNode {
self.pg_reload_conf()?;
}
self.post_apply_config()?;
self.get_installed_extensions()?;
}
let startup_end_time = Utc::now();
@@ -1489,20 +1490,22 @@ LIMIT 100",
pub fn get_installed_extensions(&self) -> Result<()> {
let connstr = self.connstr.clone();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create runtime");
let result = rt
.block_on(crate::installed_extensions::get_installed_extensions(
connstr,
))
.expect("failed to get installed extensions");
thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create runtime");
let result = rt
.block_on(crate::installed_extensions::get_installed_extensions(
connstr,
))
.expect("failed to get installed extensions");
info!(
"{}",
serde_json::to_string(&result).expect("failed to serialize extensions list")
);
info!(
"{}",
serde_json::to_string(&result).expect("failed to serialize extensions list")
);
});
Ok(())
}

View File

@@ -9,6 +9,7 @@ use crate::catalog::SchemaDumpError;
use crate::catalog::{get_database_schema, get_dbs_and_roles};
use crate::compute::forward_termination_signal;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use crate::installed_extensions;
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
@@ -16,6 +17,8 @@ use anyhow::Result;
use hyper::header::CONTENT_TYPE;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use metrics::Encoder;
use metrics::TextEncoder;
use tokio::task;
use tracing::{debug, error, info, warn};
use tracing_utils::http::OtelName;
@@ -62,6 +65,28 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
}
// Prometheus metrics
(&Method::GET, "/metrics") => {
debug!("serving /metrics GET request");
let mut buffer = vec![];
let metrics = installed_extensions::collect();
let encoder = TextEncoder::new();
encoder.encode(&metrics, &mut buffer).unwrap();
match Response::builder()
.status(StatusCode::OK)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
{
Ok(response) => response,
Err(err) => {
let msg = format!("error handling /metrics request: {err}");
error!(msg);
render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// Collect Postgres current usage insights
(&Method::GET, "/insights") => {
info!("serving /insights GET request");

View File

@@ -37,6 +37,21 @@ paths:
schema:
$ref: "#/components/schemas/ComputeMetrics"
/metrics
get:
tags:
- Info
summary: Get compute node metrics in
description: ""
operationId: getComputeMetrics
responses:
200:
description: ComputeMetrics
content:
text/plain:
schema:
type: string
description: Metrics in text format.
/insights:
get:
tags:

View File

@@ -1,4 +1,5 @@
use compute_api::responses::{InstalledExtension, InstalledExtensions};
use metrics::proto::MetricFamily;
use std::collections::HashMap;
use std::collections::HashSet;
use url::Url;
@@ -7,6 +8,10 @@ use anyhow::Result;
use postgres::{Client, NoTls};
use tokio::task;
use metrics::core::Collector;
use metrics::{register_uint_gauge_vec, UIntGaugeVec};
use once_cell::sync::Lazy;
/// We don't reuse get_existing_dbs() just for code clarity
/// and to make database listing query here more explicit.
///
@@ -72,9 +77,40 @@ pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtension
}
}
Ok(InstalledExtensions {
let res = InstalledExtensions {
extensions: extensions_map.values().cloned().collect(),
})
};
// set the prometheus metrics
for ext in res.extensions.iter() {
let versions = {
let mut vec: Vec<_> = ext.versions.iter().cloned().collect();
vec.sort();
vec.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join(",")
};
INSTALLED_EXTENSIONS
.with_label_values(&[&ext.extname, &versions])
.set(ext.n_databases as u64);
}
Ok(res)
})
.await?
}
static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"installed_extensions",
"Number of databases where extension is installed, versions passed as label",
&["extension_name", "versions"]
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
INSTALLED_EXTENSIONS.collect()
}

View File

@@ -28,3 +28,8 @@ class EndpointHttpClient(requests.Session):
res = self.get(f"http://localhost:{self.port}/installed_extensions")
res.raise_for_status()
return res.json()
def metrics(self):
res = self.get(f"http://localhost:{self.port}/metrics")
res.raise_for_status()
return res.text

View File

@@ -24,20 +24,9 @@ https://docs.pytest.org/en/6.2.x/logging.html
# log format is specified in pytest.ini file
LOGGING = {
"version": 1,
"filters": {
"wzfilter": {
"()": "fixtures.log_helper_internal.WerkzeugNoiseFilter",
},
},
"loggers": {
"root": {"level": "INFO"},
"root.safekeeper_async": {"level": "INFO"}, # a lot of logs on DEBUG level
# Use a custom filter to make werkzeug's messages less verbose.
"werkzeug": {
"filters": ["wzfilter"],
"level": "INFO",
},
},
}

View File

@@ -1,24 +0,0 @@
# These are logically part of in log_helper.py, but need to be in a
# different file because these get loaded from the logging config
# file. If you try to included these in log_helper.py directly, you
# get an error about circular dependency.
import re
class WerkzeugNoiseFilter(object):
"""Moto server that we use for mocking S3 uses werkzeug, which
logs all HTTP operations. It constructs log messages like this:
127.0.0.1 - - [08/Oct/2024 12:43:46] "PUT /bucket-name/path?x-id=PutObject HTTP/1.1" 200 -
The IP address is not interesting in tests, as it's always just
127.0.0.1. And the timestamp is redundant with the timestamp we
print for all log messages anyway, with millisecond precision.
Unfortunately those are "etched" in the message, and cannot be
overriden by setting a custom formatter. To reduce the noise in
the test output, this filter removes those fields from the log
messages.
"""
def filter(self, logRecord):
logRecord.msg = re.sub(r'127\.0\.0\.1 - - \[.+\] (".*".*)', r'\1', logRecord.msg)
return True

View File

@@ -1,5 +1,7 @@
import time
from logging import info
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import NeonEnv
@@ -85,3 +87,36 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
assert ext["n_databases"] == 2
ext["versions"].sort()
assert ext["versions"] == ["1.2", "1.3"]
# check that /metrics endpoint is available
# ensure that we see the metric before and after restart
res = client.metrics()
info("Metrics: %s", res)
m = parse_metrics(res)
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "versions": "1.2,1.3"})
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 2
endpoint.stop()
endpoint.start()
timeout = 5
while timeout > 0:
try:
res = client.metrics()
timeout = -1
except Exception as e:
info("failed to get metrics, assume they are not collected yet: %s", e)
time.sleep(1)
timeout -= 1
continue
info("After restart metrics: %s", res)
m = parse_metrics(res)
neon_m = m.query_all(
"installed_extensions", {"extension_name": "neon", "versions": "1.2,1.3"}
)
assert len(neon_m) == 1
for sample in neon_m:
assert sample.value == 2