mirror of
https://github.com/neondatabase/neon.git
synced 2026-07-03 12:10:36 +00:00
Compare commits
4 Commits
reduce-mot
...
installed_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6ac7188c82 | ||
|
|
2575be6dec | ||
|
|
e7d9f9245d | ||
|
|
b78e140fa8 |
41
.github/workflows/report-workflow-stats.yml
vendored
41
.github/workflows/report-workflow-stats.yml
vendored
@@ -1,41 +0,0 @@
|
||||
name: Report Workflow Stats
|
||||
|
||||
on:
|
||||
workflow_run:
|
||||
workflows:
|
||||
- Add `external` label to issues and PRs created by external users
|
||||
- Benchmarking
|
||||
- Build and Test
|
||||
- Build and Test Locally
|
||||
- Build build-tools image
|
||||
- Check Permissions
|
||||
- Check build-tools image
|
||||
- Check neon with extra platform builds
|
||||
- Cloud Regression Test
|
||||
- Create Release Branch
|
||||
- Handle `approved-for-ci-run` label
|
||||
- Lint GitHub Workflows
|
||||
- Notify Slack channel about upcoming release
|
||||
- Periodic pagebench performance test on dedicated EC2 machine in eu-central-1 region
|
||||
- Pin build-tools image
|
||||
- Prepare benchmarking databases by restoring dumps
|
||||
- Push images to ACR
|
||||
- Test Postgres client libraries
|
||||
- Trigger E2E Tests
|
||||
- cleanup caches by a branch
|
||||
types: [completed]
|
||||
|
||||
jobs:
|
||||
gh-workflow-stats:
|
||||
name: Github Workflow Stats
|
||||
runs-on: ubuntu-22.04
|
||||
permissions:
|
||||
actions: read
|
||||
steps:
|
||||
- name: Export GH Workflow Stats
|
||||
uses: fedordikarev/gh-workflow-stats-action@v0.1.2
|
||||
with:
|
||||
DB_URI: ${{ secrets.GH_REPORT_STATS_DB_RW_CONNSTR }}
|
||||
DB_TABLE: "gh_workflow_stats_neon"
|
||||
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
GH_RUN_ID: ${{ github.event.workflow_run.id }}
|
||||
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -1228,12 +1228,15 @@ dependencies = [
|
||||
"flate2",
|
||||
"futures",
|
||||
"hyper 0.14.30",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
"notify",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
"opentelemetry",
|
||||
"opentelemetry_sdk",
|
||||
"postgres",
|
||||
"prometheus",
|
||||
"regex",
|
||||
"remote_storage",
|
||||
"reqwest 0.12.4",
|
||||
|
||||
@@ -18,9 +18,11 @@ clap.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
hyper0 = { workspace = true, features = ["full"] }
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
notify.workspace = true
|
||||
num_cpus.workspace = true
|
||||
once_cell.workspace = true
|
||||
opentelemetry.workspace = true
|
||||
opentelemetry_sdk.workspace = true
|
||||
postgres.workspace = true
|
||||
@@ -39,6 +41,7 @@ tracing-subscriber.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
thiserror.workspace = true
|
||||
url.workspace = true
|
||||
prometheus.workspace = true
|
||||
|
||||
compute_api.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
@@ -1121,6 +1121,7 @@ impl ComputeNode {
|
||||
self.pg_reload_conf()?;
|
||||
}
|
||||
self.post_apply_config()?;
|
||||
self.get_installed_extensions()?;
|
||||
}
|
||||
|
||||
let startup_end_time = Utc::now();
|
||||
@@ -1489,20 +1490,22 @@ LIMIT 100",
|
||||
pub fn get_installed_extensions(&self) -> Result<()> {
|
||||
let connstr = self.connstr.clone();
|
||||
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create runtime");
|
||||
let result = rt
|
||||
.block_on(crate::installed_extensions::get_installed_extensions(
|
||||
connstr,
|
||||
))
|
||||
.expect("failed to get installed extensions");
|
||||
thread::spawn(move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create runtime");
|
||||
let result = rt
|
||||
.block_on(crate::installed_extensions::get_installed_extensions(
|
||||
connstr,
|
||||
))
|
||||
.expect("failed to get installed extensions");
|
||||
|
||||
info!(
|
||||
"{}",
|
||||
serde_json::to_string(&result).expect("failed to serialize extensions list")
|
||||
);
|
||||
info!(
|
||||
"{}",
|
||||
serde_json::to_string(&result).expect("failed to serialize extensions list")
|
||||
);
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::catalog::SchemaDumpError;
|
||||
use crate::catalog::{get_database_schema, get_dbs_and_roles};
|
||||
use crate::compute::forward_termination_signal;
|
||||
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use crate::installed_extensions;
|
||||
use compute_api::requests::ConfigurationRequest;
|
||||
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
|
||||
|
||||
@@ -16,6 +17,8 @@ use anyhow::Result;
|
||||
use hyper::header::CONTENT_TYPE;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Request, Response, Server, StatusCode};
|
||||
use metrics::Encoder;
|
||||
use metrics::TextEncoder;
|
||||
use tokio::task;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use tracing_utils::http::OtelName;
|
||||
@@ -62,6 +65,28 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
|
||||
}
|
||||
|
||||
// Prometheus metrics
|
||||
(&Method::GET, "/metrics") => {
|
||||
debug!("serving /metrics GET request");
|
||||
|
||||
let mut buffer = vec![];
|
||||
let metrics = installed_extensions::collect();
|
||||
let encoder = TextEncoder::new();
|
||||
encoder.encode(&metrics, &mut buffer).unwrap();
|
||||
|
||||
match Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.header(CONTENT_TYPE, encoder.format_type())
|
||||
.body(Body::from(buffer))
|
||||
{
|
||||
Ok(response) => response,
|
||||
Err(err) => {
|
||||
let msg = format!("error handling /metrics request: {err}");
|
||||
error!(msg);
|
||||
render_json_error(&msg, StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Collect Postgres current usage insights
|
||||
(&Method::GET, "/insights") => {
|
||||
info!("serving /insights GET request");
|
||||
|
||||
@@ -37,6 +37,21 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ComputeMetrics"
|
||||
|
||||
/metrics
|
||||
get:
|
||||
tags:
|
||||
- Info
|
||||
summary: Get compute node metrics in
|
||||
description: ""
|
||||
operationId: getComputeMetrics
|
||||
responses:
|
||||
200:
|
||||
description: ComputeMetrics
|
||||
content:
|
||||
text/plain:
|
||||
schema:
|
||||
type: string
|
||||
description: Metrics in text format.
|
||||
/insights:
|
||||
get:
|
||||
tags:
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use compute_api::responses::{InstalledExtension, InstalledExtensions};
|
||||
use metrics::proto::MetricFamily;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use url::Url;
|
||||
@@ -7,6 +8,10 @@ use anyhow::Result;
|
||||
use postgres::{Client, NoTls};
|
||||
use tokio::task;
|
||||
|
||||
use metrics::core::Collector;
|
||||
use metrics::{register_uint_gauge_vec, UIntGaugeVec};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
/// We don't reuse get_existing_dbs() just for code clarity
|
||||
/// and to make database listing query here more explicit.
|
||||
///
|
||||
@@ -72,9 +77,40 @@ pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtension
|
||||
}
|
||||
}
|
||||
|
||||
Ok(InstalledExtensions {
|
||||
let res = InstalledExtensions {
|
||||
extensions: extensions_map.values().cloned().collect(),
|
||||
})
|
||||
};
|
||||
|
||||
// set the prometheus metrics
|
||||
for ext in res.extensions.iter() {
|
||||
let versions = {
|
||||
let mut vec: Vec<_> = ext.versions.iter().cloned().collect();
|
||||
vec.sort();
|
||||
vec.iter()
|
||||
.map(|x| x.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
};
|
||||
|
||||
INSTALLED_EXTENSIONS
|
||||
.with_label_values(&[&ext.extname, &versions])
|
||||
.set(ext.n_databases as u64);
|
||||
}
|
||||
|
||||
Ok(res)
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
||||
static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"installed_extensions",
|
||||
"Number of databases where extension is installed, versions passed as label",
|
||||
&["extension_name", "versions"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub fn collect() -> Vec<MetricFamily> {
|
||||
INSTALLED_EXTENSIONS.collect()
|
||||
}
|
||||
|
||||
@@ -28,3 +28,8 @@ class EndpointHttpClient(requests.Session):
|
||||
res = self.get(f"http://localhost:{self.port}/installed_extensions")
|
||||
res.raise_for_status()
|
||||
return res.json()
|
||||
|
||||
def metrics(self):
|
||||
res = self.get(f"http://localhost:{self.port}/metrics")
|
||||
res.raise_for_status()
|
||||
return res.text
|
||||
|
||||
@@ -24,20 +24,9 @@ https://docs.pytest.org/en/6.2.x/logging.html
|
||||
# log format is specified in pytest.ini file
|
||||
LOGGING = {
|
||||
"version": 1,
|
||||
"filters": {
|
||||
"wzfilter": {
|
||||
"()": "fixtures.log_helper_internal.WerkzeugNoiseFilter",
|
||||
},
|
||||
},
|
||||
"loggers": {
|
||||
"root": {"level": "INFO"},
|
||||
"root.safekeeper_async": {"level": "INFO"}, # a lot of logs on DEBUG level
|
||||
|
||||
# Use a custom filter to make werkzeug's messages less verbose.
|
||||
"werkzeug": {
|
||||
"filters": ["wzfilter"],
|
||||
"level": "INFO",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -1,24 +0,0 @@
|
||||
# These are logically part of in log_helper.py, but need to be in a
|
||||
# different file because these get loaded from the logging config
|
||||
# file. If you try to included these in log_helper.py directly, you
|
||||
# get an error about circular dependency.
|
||||
import re
|
||||
|
||||
class WerkzeugNoiseFilter(object):
|
||||
"""Moto server that we use for mocking S3 uses werkzeug, which
|
||||
logs all HTTP operations. It constructs log messages like this:
|
||||
|
||||
127.0.0.1 - - [08/Oct/2024 12:43:46] "PUT /bucket-name/path?x-id=PutObject HTTP/1.1" 200 -
|
||||
|
||||
The IP address is not interesting in tests, as it's always just
|
||||
127.0.0.1. And the timestamp is redundant with the timestamp we
|
||||
print for all log messages anyway, with millisecond precision.
|
||||
Unfortunately those are "etched" in the message, and cannot be
|
||||
overriden by setting a custom formatter. To reduce the noise in
|
||||
the test output, this filter removes those fields from the log
|
||||
messages.
|
||||
"""
|
||||
|
||||
def filter(self, logRecord):
|
||||
logRecord.msg = re.sub(r'127\.0\.0\.1 - - \[.+\] (".*".*)', r'\1', logRecord.msg)
|
||||
return True
|
||||
@@ -1,5 +1,7 @@
|
||||
import time
|
||||
from logging import info
|
||||
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
@@ -85,3 +87,36 @@ def test_installed_extensions(neon_simple_env: NeonEnv):
|
||||
assert ext["n_databases"] == 2
|
||||
ext["versions"].sort()
|
||||
assert ext["versions"] == ["1.2", "1.3"]
|
||||
|
||||
# check that /metrics endpoint is available
|
||||
# ensure that we see the metric before and after restart
|
||||
res = client.metrics()
|
||||
info("Metrics: %s", res)
|
||||
m = parse_metrics(res)
|
||||
neon_m = m.query_all("installed_extensions", {"extension_name": "neon", "versions": "1.2,1.3"})
|
||||
assert len(neon_m) == 1
|
||||
for sample in neon_m:
|
||||
assert sample.value == 2
|
||||
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
timeout = 5
|
||||
while timeout > 0:
|
||||
try:
|
||||
res = client.metrics()
|
||||
timeout = -1
|
||||
except Exception as e:
|
||||
info("failed to get metrics, assume they are not collected yet: %s", e)
|
||||
time.sleep(1)
|
||||
timeout -= 1
|
||||
continue
|
||||
|
||||
info("After restart metrics: %s", res)
|
||||
m = parse_metrics(res)
|
||||
neon_m = m.query_all(
|
||||
"installed_extensions", {"extension_name": "neon", "versions": "1.2,1.3"}
|
||||
)
|
||||
assert len(neon_m) == 1
|
||||
for sample in neon_m:
|
||||
assert sample.value == 2
|
||||
|
||||
Reference in New Issue
Block a user