diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index d31472b0c1..70cb8ea103 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::os::unix::fs::{PermissionsExt, symlink}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::str::FromStr; use std::sync::atomic::{AtomicU32, Ordering}; @@ -153,6 +153,11 @@ pub struct ComputeState { pub startup_span: Option, pub metrics: ComputeMetrics, + + /// current audit log level + /// to know if it is already configured, or we need to set up audit + /// when compute receives a new spec + pub audit_log_level: ComputeAudit, } impl ComputeState { @@ -165,6 +170,7 @@ impl ComputeState { pspec: None, startup_span: None, metrics: ComputeMetrics::default(), + audit_log_level: ComputeAudit::default(), } } @@ -620,16 +626,10 @@ impl ComputeNode { }); } - // Configure and start rsyslog for HIPAA if necessary - if let ComputeAudit::Hipaa = pspec.spec.audit_log_level { - let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string()); - if remote_endpoint.is_empty() { - anyhow::bail!("AUDIT_LOGGING_ENDPOINT is empty"); - } - - let log_directory_path = Path::new(&self.params.pgdata).join("log"); - let log_directory_path = log_directory_path.to_string_lossy().to_string(); - configure_audit_rsyslog(log_directory_path.clone(), "hipaa", &remote_endpoint)?; + // If extended compute audit is enabled configure and start rsyslog + if pspec.spec.audit_log_level == ComputeAudit::Hipaa { + let log_directory_path = self.get_audit_log_dir().to_string_lossy().to_string(); + configure_audit_rsyslog(&log_directory_path, pspec.spec.audit_log_level.as_str())?; // Launch a background task to clean up the audit logs launch_pgaudit_gc(log_directory_path); @@ -684,6 +684,11 @@ impl ComputeNode { }); } + // after all the configuration is done + // preserve the information about the current audit log level + // so that we don't relaunch rsyslog on every spec change + self.set_audit_log_level(pspec.spec.audit_log_level); + // All done! let startup_end_time = Utc::now(); let metrics = { @@ -838,6 +843,19 @@ impl ComputeNode { self.state.lock().unwrap().status } + pub fn set_audit_log_level(&self, audit_log_level: ComputeAudit) { + let mut state = self.state.lock().unwrap(); + state.audit_log_level = audit_log_level; + } + + pub fn get_audit_log_level(&self) -> ComputeAudit { + self.state.lock().unwrap().audit_log_level + } + + pub fn get_audit_log_dir(&self) -> PathBuf { + Path::new(&self.params.pgdata).join("log") + } + pub fn get_timeline_id(&self) -> Option { self.state .lock() @@ -1540,6 +1558,29 @@ impl ComputeNode { }); } + // If extended compute audit is enabled configure and start rsyslog + // We check that the audit_log_level changed compared to the previous spec and skip this step if not. + let audit_log_level = self.get_audit_log_level(); + + if spec.audit_log_level == ComputeAudit::Hipaa && audit_log_level != spec.audit_log_level { + info!( + "Configuring audit logging because audit_log_level changed from {:?} to {:?}", + audit_log_level, spec.audit_log_level + ); + + let log_directory_path = self.get_audit_log_dir().to_string_lossy().to_string(); + configure_audit_rsyslog(&log_directory_path, spec.audit_log_level.as_str())?; + + // Launch a background task to clean up the audit logs + // If rsyslog was already configured, we don't need to start this process again. + match audit_log_level { + ComputeAudit::Disabled | ComputeAudit::Log => { + launch_pgaudit_gc(log_directory_path); + } + _ => {} + } + } + // Write new config let pgdata_path = Path::new(&self.params.pgdata); config::write_postgres_conf( @@ -1549,7 +1590,14 @@ impl ComputeNode { &self.compute_ctl_config.tls, )?; - if !spec.skip_pg_catalog_updates { + // Override the skip_catalog_updates flag + // if we need to install new extensions + // + // Check that audit_log_level changed compared to the previous spec and skip this step if not. + // All operations are idempotent, so this is just a performance optimization. + let force_catalog_updates = audit_log_level != spec.audit_log_level; + + if !spec.skip_pg_catalog_updates || force_catalog_updates { let max_concurrent_connections = spec.reconfigure_concurrency; // Temporarily reset max_cluster_size in config // to avoid the possibility of hitting the limit, while we are reconfiguring: @@ -1574,6 +1622,11 @@ impl ComputeNode { self.pg_reload_conf()?; + // after all the configuration is done + // preserve the information about the current audit log level + // so that we don't relaunch rsyslog on every spec change + self.set_audit_log_level(spec.audit_log_level); + let unknown_op = "unknown".to_string(); let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op); info!( diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index eb9dd00da5..543b13d3cf 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -199,8 +199,9 @@ pub fn write_postgres_conf( )?; // This log level is very verbose // but this is necessary for HIPAA compliance. - // Exclude 'misc' category, because it doesn't contain anythig relevant. + // Exclude 'misc' category, because it doesn't contain anything relevant. writeln!(file, "pgaudit.log='all, -misc'")?; + // Log parameters for all queries writeln!(file, "pgaudit.log_parameter=on")?; // Disable logging of catalog queries // The catalog doesn't contain sensitive data, so we don't need to audit it. diff --git a/compute_tools/src/rsyslog.rs b/compute_tools/src/rsyslog.rs index 80594db3f1..8ad02d2a8a 100644 --- a/compute_tools/src/rsyslog.rs +++ b/compute_tools/src/rsyslog.rs @@ -9,6 +9,7 @@ use anyhow::{Context, Result, anyhow}; use tracing::{error, info, instrument, warn}; const POSTGRES_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/postgres_logs.conf"; +const AUDIT_LOGS_CONF_PATH: &str = "/etc/rsyslog.d/compute_audit_rsyslog.conf"; fn get_rsyslog_pid() -> Option { let output = Command::new("pgrep") @@ -48,32 +49,43 @@ fn restart_rsyslog() -> Result<()> { Ok(()) } -pub fn configure_audit_rsyslog( - log_directory: String, - tag: &str, - remote_endpoint: &str, -) -> Result<()> { +pub fn configure_audit_rsyslog(log_directory: &str, audit_log_level: &str) -> Result<()> { + let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT")?; + if remote_endpoint.is_empty() { + return Err(anyhow!("AUDIT_LOGGING_ENDPOINT is not set")); + } + + let old_config_content = match std::fs::read_to_string(AUDIT_LOGS_CONF_PATH) { + Ok(c) => c, + Err(err) if err.kind() == ErrorKind::NotFound => String::new(), + Err(err) => return Err(err.into()), + }; + let config_content: String = format!( include_str!("config_template/compute_audit_rsyslog_template.conf"), log_directory = log_directory, - tag = tag, + tag = audit_log_level, remote_endpoint = remote_endpoint ); + if old_config_content == config_content { + info!("rsyslog configuration is up-to-date"); + return Ok(()); + } + info!("rsyslog config_content: {}", config_content); - let rsyslog_conf_path = "/etc/rsyslog.d/compute_audit_rsyslog.conf"; let mut file = OpenOptions::new() .create(true) .write(true) .truncate(true) - .open(rsyslog_conf_path)?; + .open(AUDIT_LOGS_CONF_PATH)?; file.write_all(config_content.as_bytes())?; info!( "rsyslog configuration file {} added successfully. Starting rsyslogd", - rsyslog_conf_path + AUDIT_LOGS_CONF_PATH ); // start the service, using the configuration diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 11615b73a1..0635cb091e 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -160,12 +160,6 @@ pub struct ComputeSpec { pub drop_subscriptions_before_start: bool, /// Log level for audit logging: - /// - /// Disabled - no audit logging. This is the default. - /// log - log masked statements to the postgres log using pgaudit extension - /// hipaa - log unmasked statements to the file using pgaudit and pgauditlogtofile extension - /// - /// Extensions should be present in shared_preload_libraries #[serde(default)] pub audit_log_level: ComputeAudit, } @@ -276,16 +270,27 @@ pub enum ComputeMode { } /// Log level for audit logging -/// Disabled, log, hipaa -/// Default is Disabled -#[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize, Serialize)] +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)] pub enum ComputeAudit { #[default] + /// no audit logging. This is the default. Disabled, + /// write masked audit log statements to the postgres log using pgaudit extension Log, + /// log unmasked statements to the file using pgaudit and pgauditlogtofile extensions Hipaa, } +impl ComputeAudit { + pub fn as_str(&self) -> &str { + match self { + ComputeAudit::Disabled => "disabled", + ComputeAudit::Log => "log", + ComputeAudit::Hipaa => "hipaa", + } + } +} + #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] pub struct Cluster { pub cluster_id: Option,