mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 11:00:38 +00:00
Add /configure_telemetry API endpoint (#11117)
Work on https://github.com/neondatabase/cloud/issues/23721 and https://github.com/neondatabase/cloud/issues/23714 Depends on https://github.com/neondatabase/neon/pull/11111 - Add `/configure_telemetry` API endpoint - Support second rsyslog configuration for Postgres logs export - Enable logs export when compute feature is enabled and configure Postgres to send logs to syslog I have used `/configure_telemetry` name because in the future I see it also being used for configuring a `pg_tracing` extension to export traces. Let me know if you'd rather have these APIs separate. In this case we can rename it to `/configure_rsyslog`.
This commit is contained in:
committed by
GitHub
parent
fdf04d4d81
commit
db30e1669c
@@ -39,6 +39,13 @@ commands:
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
|
||||
# Rsyslog by default creates a unix socket under /dev/log . That's where Postgres sends logs also.
|
||||
# We run syslog with postgres user so it can't create /dev/log. Instead we configure rsyslog to
|
||||
# use a different path for the socket. The symlink actually points to our custom path.
|
||||
- name: rsyslogd-socket-symlink
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: "ln -s /var/db/postgres/rsyslogpipe /dev/log"
|
||||
- name: rsyslogd
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
@@ -77,6 +84,9 @@ files:
|
||||
# compute_ctl will rewrite this file with the actual configuration, if needed.
|
||||
- filename: compute_rsyslog.conf
|
||||
content: |
|
||||
# Syslock.Name specifies a non-default pipe location that is writeable for the postgres user.
|
||||
module(load="imuxsock" SysSock.Name="/var/db/postgres/rsyslogpipe") # provides support for local system logging
|
||||
|
||||
*.* /dev/null
|
||||
$IncludeConfig /etc/rsyslog.d/*.conf
|
||||
build: |
|
||||
|
||||
@@ -39,6 +39,13 @@ commands:
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: '/bin/sql_exporter -config.file=/etc/sql_exporter_autoscaling.yml -web.listen-address=:9499'
|
||||
# Rsyslog by default creates a unix socket under /dev/log . That's where Postgres sends logs also.
|
||||
# We run syslog with postgres user so it can't create /dev/log. Instead we configure rsyslog to
|
||||
# use a different path for the socket. The symlink actually points to our custom path.
|
||||
- name: rsyslogd-socket-symlink
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: "ln -s /var/db/postgres/rsyslogpipe /dev/log"
|
||||
- name: rsyslogd
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
@@ -77,6 +84,9 @@ files:
|
||||
# compute_ctl will rewrite this file with the actual configuration, if needed.
|
||||
- filename: compute_rsyslog.conf
|
||||
content: |
|
||||
# Syslock.Name specifies a non-default pipe location that is writeable for the postgres user.
|
||||
module(load="imuxsock" SysSock.Name="/var/db/postgres/rsyslogpipe") # provides support for local system logging
|
||||
|
||||
*.* /dev/null
|
||||
$IncludeConfig /etc/rsyslog.d/*.conf
|
||||
build: |
|
||||
|
||||
@@ -37,7 +37,10 @@ use crate::logger::startup_context_from_env;
|
||||
use crate::lsn_lease::launch_lsn_lease_bg_task_for_static;
|
||||
use crate::monitor::launch_monitor;
|
||||
use crate::pg_helpers::*;
|
||||
use crate::rsyslog::{configure_audit_rsyslog, launch_pgaudit_gc};
|
||||
use crate::rsyslog::{
|
||||
PostgresLogsRsyslogConfig, configure_audit_rsyslog, configure_postgres_logs_export,
|
||||
launch_pgaudit_gc,
|
||||
};
|
||||
use crate::spec::*;
|
||||
use crate::swap::resize_swap;
|
||||
use crate::sync_sk::{check_if_synced, ping_safekeeper};
|
||||
@@ -617,7 +620,7 @@ impl ComputeNode {
|
||||
});
|
||||
}
|
||||
|
||||
// Configure and start rsyslog if necessary
|
||||
// Configure and start rsyslog for HIPAA if necessary
|
||||
if let ComputeAudit::Hipaa = pspec.spec.audit_log_level {
|
||||
let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string());
|
||||
if remote_endpoint.is_empty() {
|
||||
@@ -632,6 +635,17 @@ impl ComputeNode {
|
||||
launch_pgaudit_gc(log_directory_path);
|
||||
}
|
||||
|
||||
// Configure and start rsyslog for Postgres logs export
|
||||
if self.has_feature(ComputeFeature::PostgresLogsExport) {
|
||||
if let Some(ref project_id) = pspec.spec.cluster.cluster_id {
|
||||
let host = PostgresLogsRsyslogConfig::default_host(project_id);
|
||||
let conf = PostgresLogsRsyslogConfig::new(Some(&host));
|
||||
configure_postgres_logs_export(conf)?;
|
||||
} else {
|
||||
warn!("not configuring rsyslog for Postgres logs export: project ID is missing")
|
||||
}
|
||||
}
|
||||
|
||||
// Launch remaining service threads
|
||||
let _monitor_handle = launch_monitor(self);
|
||||
let _configurator_handle = launch_configurator(self);
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::io::prelude::*;
|
||||
use std::path::Path;
|
||||
|
||||
use compute_api::responses::TlsConfig;
|
||||
use compute_api::spec::{ComputeAudit, ComputeMode, ComputeSpec, GenericOption};
|
||||
use compute_api::spec::{ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, GenericOption};
|
||||
|
||||
use crate::pg_helpers::{
|
||||
GenericOptionExt, GenericOptionsSearch, PgOptionsSerialize, escape_conf_value,
|
||||
@@ -216,6 +216,12 @@ pub fn write_postgres_conf(
|
||||
writeln!(file, "neon.disable_logical_replication_subscribers=false")?;
|
||||
}
|
||||
|
||||
// We need Postgres to send logs to rsyslog so that we can forward them
|
||||
// further to customers' log aggregation systems.
|
||||
if spec.features.contains(&ComputeFeature::PostgresLogsExport) {
|
||||
writeln!(file, "log_destination='stderr,syslog'")?;
|
||||
}
|
||||
|
||||
// This is essential to keep this line at the end of the file,
|
||||
// because it is intended to override any settings above.
|
||||
writeln!(file, "include_if_exists = 'compute_ctl_temp_override.conf'")?;
|
||||
|
||||
@@ -8,4 +8,4 @@ input(type="imfile" File="{log_directory}/*.log" Tag="{tag}" Severity="info" Fac
|
||||
global(workDirectory="/var/log/rsyslog")
|
||||
|
||||
# Forward logs to remote syslog server
|
||||
*.* @@{remote_endpoint}
|
||||
*.* @@{remote_endpoint}
|
||||
|
||||
@@ -0,0 +1,10 @@
|
||||
# Program name comes from postgres' syslog_facility configuration: https://www.postgresql.org/docs/current/runtime-config-logging.html#GUC-SYSLOG-IDENT
|
||||
# Default value is 'postgres'.
|
||||
if $programname == 'postgres' then {{
|
||||
# Forward Postgres logs to telemetry otel collector
|
||||
action(type="omfwd" target="{logs_export_target}" port="{logs_export_port}" protocol="tcp"
|
||||
template="RSYSLOG_SyslogProtocol23Format"
|
||||
action.resumeRetryCount="3"
|
||||
queue.type="linkedList" queue.size="1000")
|
||||
stop
|
||||
}}
|
||||
@@ -306,6 +306,36 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
|
||||
/configure_telemetry:
|
||||
post:
|
||||
tags:
|
||||
- Configure
|
||||
summary: Configure rsyslog
|
||||
description: |
|
||||
This API endpoint configures rsyslog to forward Postgres logs
|
||||
to a specified otel collector.
|
||||
operationId: configureTelemetry
|
||||
requestBody:
|
||||
required: true
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
properties:
|
||||
logs_export_host:
|
||||
type: string
|
||||
description: |
|
||||
Hostname and the port of the otel collector. Leave empty to disable logs forwarding.
|
||||
Example: config-shy-breeze-123-collector-monitoring.neon-telemetry.svc.cluster.local:54526
|
||||
responses:
|
||||
204:
|
||||
description: "Telemetry configured successfully"
|
||||
500:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
|
||||
components:
|
||||
securitySchemes:
|
||||
JWT:
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::body::Body;
|
||||
use axum::extract::State;
|
||||
use axum::response::Response;
|
||||
use compute_api::requests::ConfigurationRequest;
|
||||
use compute_api::requests::{ConfigurationRequest, ConfigureTelemetryRequest};
|
||||
use compute_api::responses::{ComputeStatus, ComputeStatusResponse};
|
||||
use compute_api::spec::ComputeFeature;
|
||||
use http::StatusCode;
|
||||
use tokio::task;
|
||||
use tracing::info;
|
||||
@@ -11,6 +13,7 @@ use tracing::info;
|
||||
use crate::compute::{ComputeNode, ParsedSpec};
|
||||
use crate::http::JsonResponse;
|
||||
use crate::http::extract::Json;
|
||||
use crate::rsyslog::{PostgresLogsRsyslogConfig, configure_postgres_logs_export};
|
||||
|
||||
// Accept spec in JSON format and request compute configuration. If anything
|
||||
// goes wrong after we set the compute status to `ConfigurationPending` and
|
||||
@@ -92,3 +95,25 @@ pub(in crate::http) async fn configure(
|
||||
|
||||
JsonResponse::success(StatusCode::OK, body)
|
||||
}
|
||||
|
||||
pub(in crate::http) async fn configure_telemetry(
|
||||
State(compute): State<Arc<ComputeNode>>,
|
||||
request: Json<ConfigureTelemetryRequest>,
|
||||
) -> Response {
|
||||
if !compute.has_feature(ComputeFeature::PostgresLogsExport) {
|
||||
return JsonResponse::error(
|
||||
StatusCode::PRECONDITION_FAILED,
|
||||
"Postgres logs export feature is not enabled".to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
let conf = PostgresLogsRsyslogConfig::new(request.logs_export_host.as_deref());
|
||||
if let Err(err) = configure_postgres_logs_export(conf) {
|
||||
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, err.to_string());
|
||||
}
|
||||
|
||||
Response::builder()
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(Body::from(""))
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
@@ -87,6 +87,7 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
|
||||
let authenticated_router = Router::<Arc<ComputeNode>>::new()
|
||||
.route("/check_writability", post(check_writability::is_writable))
|
||||
.route("/configure", post(configure::configure))
|
||||
.route("/configure_telemetry", post(configure::configure_telemetry))
|
||||
.route("/database_schema", get(database_schema::get_schema_dump))
|
||||
.route("/dbs_and_roles", get(dbs_and_roles::get_catalog_objects))
|
||||
.route("/insights", get(insights::get_insights))
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
use std::fs;
|
||||
use std::io::ErrorKind;
|
||||
use std::path::Path;
|
||||
use std::process::Command;
|
||||
use std::time::Duration;
|
||||
use std::{fs::OpenOptions, io::Write};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use anyhow::{Context, Result, anyhow};
|
||||
use tracing::{error, info, instrument, warn};
|
||||
|
||||
const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
|
||||
|
||||
fn get_rsyslog_pid() -> Option<String> {
|
||||
let output = Command::new("pgrep")
|
||||
.arg("rsyslogd")
|
||||
@@ -79,6 +82,95 @@ pub fn configure_audit_rsyslog(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Configuration for enabling Postgres logs forwarding from rsyslogd
|
||||
pub struct PostgresLogsRsyslogConfig<'a> {
|
||||
pub host: Option<&'a str>,
|
||||
}
|
||||
|
||||
impl<'a> PostgresLogsRsyslogConfig<'a> {
|
||||
pub fn new(host: Option<&'a str>) -> Self {
|
||||
Self { host }
|
||||
}
|
||||
|
||||
pub fn build(&self) -> Result<String> {
|
||||
match self.host {
|
||||
Some(host) => {
|
||||
if let Some((target, port)) = host.split_once(":") {
|
||||
Ok(format!(
|
||||
include_str!(
|
||||
"config_template/compute_rsyslog_postgres_export_template.conf"
|
||||
),
|
||||
logs_export_target = target,
|
||||
logs_export_port = port,
|
||||
))
|
||||
} else {
|
||||
Err(anyhow!("Invalid host format for Postgres logs export"))
|
||||
}
|
||||
}
|
||||
None => Ok("".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn current_config() -> Result<String> {
|
||||
let config_content = match std::fs::read_to_string(POSTGRES_LOGS_CONF_PATH) {
|
||||
Ok(c) => c,
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
Ok(config_content)
|
||||
}
|
||||
|
||||
/// Returns the default host for otel collector that receives Postgres logs
|
||||
pub fn default_host(project_id: &str) -> String {
|
||||
format!(
|
||||
"config-{}-collector.neon-telemetry.svc.cluster.local:10514",
|
||||
project_id
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn configure_postgres_logs_export(conf: PostgresLogsRsyslogConfig) -> Result<()> {
|
||||
let new_config = conf.build()?;
|
||||
let current_config = PostgresLogsRsyslogConfig::current_config()?;
|
||||
|
||||
if new_config == current_config {
|
||||
info!("postgres logs rsyslog configuration is up-to-date");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// When new config is empty we can simply remove the configuration file.
|
||||
if new_config.is_empty() {
|
||||
info!("removing rsyslog config file: {}", POSTGRES_LOGS_CONF_PATH);
|
||||
match std::fs::remove_file(POSTGRES_LOGS_CONF_PATH) {
|
||||
Ok(_) => {}
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => {}
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
restart_rsyslog()?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!(
|
||||
"configuring rsyslog for postgres logs export to: {:?}",
|
||||
conf.host
|
||||
);
|
||||
|
||||
let mut file = OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.open(POSTGRES_LOGS_CONF_PATH)?;
|
||||
file.write_all(new_config.as_bytes())?;
|
||||
|
||||
info!(
|
||||
"rsyslog configuration file {} added successfully. Starting rsyslogd",
|
||||
POSTGRES_LOGS_CONF_PATH
|
||||
);
|
||||
|
||||
restart_rsyslog()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn pgaudit_gc_main_loop(log_directory: String) -> Result<()> {
|
||||
info!("running pgaudit GC main loop");
|
||||
@@ -136,3 +228,49 @@ pub fn launch_pgaudit_gc(log_directory: String) {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::rsyslog::PostgresLogsRsyslogConfig;
|
||||
|
||||
#[test]
|
||||
fn test_postgres_logs_config() {
|
||||
{
|
||||
// Verify empty config
|
||||
let conf = PostgresLogsRsyslogConfig::new(None);
|
||||
let res = conf.build();
|
||||
assert!(res.is_ok());
|
||||
let conf_str = res.unwrap();
|
||||
assert_eq!(&conf_str, "");
|
||||
}
|
||||
|
||||
{
|
||||
// Verify config
|
||||
let conf = PostgresLogsRsyslogConfig::new(Some("collector.cvc.local:514"));
|
||||
let res = conf.build();
|
||||
assert!(res.is_ok());
|
||||
let conf_str = res.unwrap();
|
||||
assert!(conf_str.contains("omfwd"));
|
||||
assert!(conf_str.contains(r#"target="collector.cvc.local""#));
|
||||
assert!(conf_str.contains(r#"port="514""#));
|
||||
}
|
||||
|
||||
{
|
||||
// Verify invalid config
|
||||
let conf = PostgresLogsRsyslogConfig::new(Some("invalid"));
|
||||
let res = conf.build();
|
||||
assert!(res.is_err());
|
||||
}
|
||||
|
||||
{
|
||||
// Verify config with default host
|
||||
let host = PostgresLogsRsyslogConfig::default_host("shy-breeze-123");
|
||||
let conf = PostgresLogsRsyslogConfig::new(Some(&host));
|
||||
let res = conf.build();
|
||||
assert!(res.is_ok());
|
||||
let conf_str = res.unwrap();
|
||||
assert!(conf_str.contains(r#"shy-breeze-123"#));
|
||||
assert!(conf_str.contains(r#"port="10514""#));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,3 +30,9 @@ pub struct SetRoleGrantsRequest {
|
||||
pub privileges: Vec<Privilege>,
|
||||
pub role: PgIdent,
|
||||
}
|
||||
|
||||
/// Request of the /configure_telemetry API
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub struct ConfigureTelemetryRequest {
|
||||
pub logs_export_host: Option<String>,
|
||||
}
|
||||
|
||||
@@ -182,6 +182,9 @@ pub enum ComputeFeature {
|
||||
/// Pre-install and initialize anon extension for every database in the cluster
|
||||
AnonExtension,
|
||||
|
||||
/// Allow to configure rsyslog for Postgres logs export
|
||||
PostgresLogsExport,
|
||||
|
||||
/// This is a special feature flag that is used to represent unknown feature flags.
|
||||
/// Basically all unknown to enum flags are represented as this one. See unit test
|
||||
/// `parse_unknown_features()` for more details.
|
||||
|
||||
Reference in New Issue
Block a user