mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-13 19:20:36 +00:00
Compare commits
2 Commits
conrad/ref
...
fix_audit_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
15b06a6a1e | ||
|
|
ff87b51300 |
@@ -1,6 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::os::unix::fs::{PermissionsExt, symlink};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
@@ -153,11 +153,6 @@ pub struct ComputeState {
|
||||
pub startup_span: Option<tracing::span::Span>,
|
||||
|
||||
pub metrics: ComputeMetrics,
|
||||
|
||||
/// current audit log level
|
||||
/// to know if it is already configured, or we need to set up audit
|
||||
/// when compute receives a new spec
|
||||
pub audit_log_level: ComputeAudit,
|
||||
}
|
||||
|
||||
impl ComputeState {
|
||||
@@ -170,7 +165,6 @@ impl ComputeState {
|
||||
pspec: None,
|
||||
startup_span: None,
|
||||
metrics: ComputeMetrics::default(),
|
||||
audit_log_level: ComputeAudit::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -626,10 +620,16 @@ impl ComputeNode {
|
||||
});
|
||||
}
|
||||
|
||||
// If extended compute audit is enabled configure and start rsyslog
|
||||
if pspec.spec.audit_log_level == ComputeAudit::Hipaa {
|
||||
let log_directory_path = self.get_audit_log_dir().to_string_lossy().to_string();
|
||||
configure_audit_rsyslog(&log_directory_path, pspec.spec.audit_log_level.as_str())?;
|
||||
// Configure and start rsyslog for HIPAA if necessary
|
||||
if let ComputeAudit::Hipaa = pspec.spec.audit_log_level {
|
||||
let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string());
|
||||
if remote_endpoint.is_empty() {
|
||||
anyhow::bail!("AUDIT_LOGGING_ENDPOINT is empty");
|
||||
}
|
||||
|
||||
let log_directory_path = Path::new(&self.params.pgdata).join("log");
|
||||
let log_directory_path = log_directory_path.to_string_lossy().to_string();
|
||||
configure_audit_rsyslog(log_directory_path.clone(), "hipaa", &remote_endpoint)?;
|
||||
|
||||
// Launch a background task to clean up the audit logs
|
||||
launch_pgaudit_gc(log_directory_path);
|
||||
@@ -684,11 +684,6 @@ impl ComputeNode {
|
||||
});
|
||||
}
|
||||
|
||||
// after all the configuration is done
|
||||
// preserve the information about the current audit log level
|
||||
// so that we don't relaunch rsyslog on every spec change
|
||||
self.set_audit_log_level(pspec.spec.audit_log_level);
|
||||
|
||||
// All done!
|
||||
let startup_end_time = Utc::now();
|
||||
let metrics = {
|
||||
@@ -843,19 +838,6 @@ impl ComputeNode {
|
||||
self.state.lock().unwrap().status
|
||||
}
|
||||
|
||||
pub fn set_audit_log_level(&self, audit_log_level: ComputeAudit) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.audit_log_level = audit_log_level;
|
||||
}
|
||||
|
||||
pub fn get_audit_log_level(&self) -> ComputeAudit {
|
||||
self.state.lock().unwrap().audit_log_level
|
||||
}
|
||||
|
||||
pub fn get_audit_log_dir(&self) -> PathBuf {
|
||||
Path::new(&self.params.pgdata).join("log")
|
||||
}
|
||||
|
||||
pub fn get_timeline_id(&self) -> Option<TimelineId> {
|
||||
self.state
|
||||
.lock()
|
||||
@@ -1515,6 +1497,27 @@ impl ComputeNode {
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
|
||||
/// Apply config operations that are not covered by `skip_pg_catalog_updates`
|
||||
#[instrument(skip_all)]
|
||||
pub fn apply_config_non_skippable(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
let conf = self.get_tokio_conn_conf(Some("compute_ctl:apply_config"));
|
||||
|
||||
let conf = Arc::new(conf);
|
||||
let spec = Arc::new(
|
||||
compute_state
|
||||
.pspec
|
||||
.as_ref()
|
||||
.expect("spec must be set")
|
||||
.spec
|
||||
.clone(),
|
||||
);
|
||||
|
||||
// Merge-apply spec & changes to PostgreSQL state.
|
||||
self.apply_spec_sql_non_skippable(spec.clone(), conf.clone())?;
|
||||
|
||||
Ok::<(), anyhow::Error>(())
|
||||
}
|
||||
|
||||
// Wrapped this around `pg_ctl reload`, but right now we don't use
|
||||
// `pg_ctl` for start / stop.
|
||||
#[instrument(skip_all)]
|
||||
@@ -1566,29 +1569,6 @@ impl ComputeNode {
|
||||
});
|
||||
}
|
||||
|
||||
// If extended compute audit is enabled configure and start rsyslog
|
||||
// We check that the audit_log_level changed compared to the previous spec and skip this step if not.
|
||||
let audit_log_level = self.get_audit_log_level();
|
||||
|
||||
if spec.audit_log_level == ComputeAudit::Hipaa && audit_log_level != spec.audit_log_level {
|
||||
info!(
|
||||
"Configuring audit logging because audit_log_level changed from {:?} to {:?}",
|
||||
audit_log_level, spec.audit_log_level
|
||||
);
|
||||
|
||||
let log_directory_path = self.get_audit_log_dir().to_string_lossy().to_string();
|
||||
configure_audit_rsyslog(&log_directory_path, spec.audit_log_level.as_str())?;
|
||||
|
||||
// Launch a background task to clean up the audit logs
|
||||
// If rsyslog was already configured, we don't need to start this process again.
|
||||
match audit_log_level {
|
||||
ComputeAudit::Disabled | ComputeAudit::Log => {
|
||||
launch_pgaudit_gc(log_directory_path);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Write new config
|
||||
let pgdata_path = Path::new(&self.params.pgdata);
|
||||
config::write_postgres_conf(
|
||||
@@ -1598,14 +1578,7 @@ impl ComputeNode {
|
||||
&self.compute_ctl_config.tls,
|
||||
)?;
|
||||
|
||||
// Override the skip_catalog_updates flag
|
||||
// if we need to install new extensions
|
||||
//
|
||||
// Check that audit_log_level changed compared to the previous spec and skip this step if not.
|
||||
// All operations are idempotent, so this is just a performance optimization.
|
||||
let force_catalog_updates = audit_log_level != spec.audit_log_level;
|
||||
|
||||
if !spec.skip_pg_catalog_updates || force_catalog_updates {
|
||||
if !spec.skip_pg_catalog_updates {
|
||||
let max_concurrent_connections = spec.reconfigure_concurrency;
|
||||
// Temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are reconfiguring:
|
||||
@@ -1630,11 +1603,6 @@ impl ComputeNode {
|
||||
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
// after all the configuration is done
|
||||
// preserve the information about the current audit log level
|
||||
// so that we don't relaunch rsyslog on every spec change
|
||||
self.set_audit_log_level(spec.audit_log_level);
|
||||
|
||||
let unknown_op = "unknown".to_string();
|
||||
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
|
||||
info!(
|
||||
@@ -1672,8 +1640,24 @@ impl ComputeNode {
|
||||
"updated postgresql.conf to set neon.disable_logical_replication_subscribers=false"
|
||||
);
|
||||
}
|
||||
self.pg_reload_conf()?;
|
||||
} else {
|
||||
// We need to run some operations even if skip_pg_catalog_updates is set
|
||||
let pgdata_path = Path::new(&self.params.pgdata);
|
||||
// temporarily reset max_cluster_size in config
|
||||
// to avoid the possibility of hitting the limit, while we are applying config:
|
||||
// creating new extensions, roles, etc...
|
||||
config::with_compute_ctl_tmp_override(pgdata_path, "neon.max_cluster_size=-1", || {
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
self.apply_config_non_skippable(compute_state)?;
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
self.pg_reload_conf()?;
|
||||
}
|
||||
|
||||
self.post_apply_config()?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -200,9 +200,8 @@ pub fn write_postgres_conf(
|
||||
)?;
|
||||
// This log level is very verbose
|
||||
// but this is necessary for HIPAA compliance.
|
||||
// Exclude 'misc' category, because it doesn't contain anything relevant.
|
||||
// Exclude 'misc' category, because it doesn't contain anythig relevant.
|
||||
writeln!(file, "pgaudit.log='all, -misc'")?;
|
||||
// Log parameters for all queries
|
||||
writeln!(file, "pgaudit.log_parameter=on")?;
|
||||
// Disable logging of catalog queries
|
||||
// The catalog doesn't contain sensitive data, so we don't need to audit it.
|
||||
|
||||
@@ -9,7 +9,6 @@ use anyhow::{Context, Result, anyhow};
|
||||
use tracing::{error, info, instrument, warn};
|
||||
|
||||
const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf";
|
||||
const AUDIT_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
|
||||
|
||||
fn get_rsyslog_pid() -> Option<String> {
|
||||
let output = Command::new("pgrep")
|
||||
@@ -49,43 +48,32 @@ fn restart_rsyslog() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn configure_audit_rsyslog(log_directory: &str, audit_log_level: &str) -> Result<()> {
|
||||
let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT")?;
|
||||
if remote_endpoint.is_empty() {
|
||||
return Err(anyhow!("AUDIT_LOGGING_ENDPOINT is not set"));
|
||||
}
|
||||
|
||||
let old_config_content = match std::fs::read_to_string(AUDIT_LOGS_CONF_PATH) {
|
||||
Ok(c) => c,
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => String::new(),
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
|
||||
pub fn configure_audit_rsyslog(
|
||||
log_directory: String,
|
||||
tag: &str,
|
||||
remote_endpoint: &str,
|
||||
) -> Result<()> {
|
||||
let config_content: String = format!(
|
||||
include_str!("config_template/compute_audit_rsyslog_template.conf"),
|
||||
log_directory = log_directory,
|
||||
tag = audit_log_level,
|
||||
tag = tag,
|
||||
remote_endpoint = remote_endpoint
|
||||
);
|
||||
|
||||
if old_config_content == config_content {
|
||||
info!("rsyslog configuration is up-to-date");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!("rsyslog config_content: {}", config_content);
|
||||
|
||||
let rsyslog_conf_path = "/etc/rsyslog.d/compute_audit_rsyslog.conf";
|
||||
let mut file = OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.open(AUDIT_LOGS_CONF_PATH)?;
|
||||
.open(rsyslog_conf_path)?;
|
||||
|
||||
file.write_all(config_content.as_bytes())?;
|
||||
|
||||
info!(
|
||||
"rsyslog configuration file {} added successfully. Starting rsyslogd",
|
||||
AUDIT_LOGS_CONF_PATH
|
||||
rsyslog_conf_path
|
||||
);
|
||||
|
||||
// start the service, using the configuration
|
||||
|
||||
@@ -308,6 +308,75 @@ impl ComputeNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Similar to apply_spec_sql, but for the simplified set of operations
|
||||
// that we perform even when `pg_skip_catalog_updates` is set.
|
||||
//
|
||||
// Keep the list of operations as small as possible,
|
||||
// as it will be run on every spec change and affect compute start time.
|
||||
pub fn apply_spec_sql_non_skippable(
|
||||
&self,
|
||||
spec: Arc<ComputeSpec>,
|
||||
conf: Arc<tokio_postgres::Config>,
|
||||
) -> Result<()> {
|
||||
info!("Applying non_skippable config",);
|
||||
debug!("Config: {:?}", spec);
|
||||
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
rt.block_on(async {
|
||||
let client = Self::get_maintenance_client(&conf).await?;
|
||||
let spec = spec.clone();
|
||||
|
||||
let jwks_roles = Arc::new(
|
||||
spec.as_ref()
|
||||
.local_proxy_config
|
||||
.iter()
|
||||
.flat_map(|it| &it.jwks)
|
||||
.flatten()
|
||||
.flat_map(|setting| &setting.role_names)
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>(),
|
||||
);
|
||||
|
||||
// NOTE: Here we assume that operations below don't use ctx
|
||||
// TODO: refactor apply_operations() to accept ctx as option.
|
||||
let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
|
||||
roles: HashMap::new(),
|
||||
dbs: HashMap::new(),
|
||||
}));
|
||||
|
||||
let mut phases = vec![];
|
||||
|
||||
match spec.audit_log_level {
|
||||
ComputeAudit::Hipaa => {
|
||||
phases.push(CreatePgauditExtension);
|
||||
phases.push(CreatePgauditlogtofileExtension);
|
||||
phases.push(DisablePostgresDBPgAudit);
|
||||
}
|
||||
ComputeAudit::Log => {
|
||||
phases.push(CreatePgauditExtension);
|
||||
phases.push(DisablePostgresDBPgAudit);
|
||||
}
|
||||
ComputeAudit::Disabled => {}
|
||||
}
|
||||
|
||||
for phase in phases {
|
||||
debug!("Applying phase {:?}", &phase);
|
||||
apply_operations(
|
||||
spec.clone(),
|
||||
ctx.clone(),
|
||||
jwks_roles.clone(),
|
||||
phase,
|
||||
|| async { Ok(&client) },
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok::<(), anyhow::Error>(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Apply SQL migrations of the RunInEachDatabase phase.
|
||||
///
|
||||
/// May opt to not connect to databases that don't have any scheduled
|
||||
|
||||
@@ -160,6 +160,12 @@ pub struct ComputeSpec {
|
||||
pub drop_subscriptions_before_start: bool,
|
||||
|
||||
/// Log level for audit logging:
|
||||
///
|
||||
/// Disabled - no audit logging. This is the default.
|
||||
/// log - log masked statements to the postgres log using pgaudit extension
|
||||
/// hipaa - log unmasked statements to the file using pgaudit and pgauditlogtofile extension
|
||||
///
|
||||
/// Extensions should be present in shared_preload_libraries
|
||||
#[serde(default)]
|
||||
pub audit_log_level: ComputeAudit,
|
||||
}
|
||||
@@ -282,27 +288,16 @@ impl ComputeMode {
|
||||
}
|
||||
|
||||
/// Log level for audit logging
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
|
||||
/// Disabled, log, hipaa
|
||||
/// Default is Disabled
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
|
||||
pub enum ComputeAudit {
|
||||
#[default]
|
||||
/// no audit logging. This is the default.
|
||||
Disabled,
|
||||
/// write masked audit log statements to the postgres log using pgaudit extension
|
||||
Log,
|
||||
/// log unmasked statements to the file using pgaudit and pgauditlogtofile extensions
|
||||
Hipaa,
|
||||
}
|
||||
|
||||
impl ComputeAudit {
|
||||
pub fn as_str(&self) -> &str {
|
||||
match self {
|
||||
ComputeAudit::Disabled => "disabled",
|
||||
ComputeAudit::Log => "log",
|
||||
ComputeAudit::Hipaa => "hipaa",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub struct Cluster {
|
||||
pub cluster_id: Option<String>,
|
||||
|
||||
Reference in New Issue
Block a user