From 71beabf82d2f53a4ec572c4efa33ac9341cfc97f Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Wed, 10 Jan 2024 19:11:43 +0400 Subject: [PATCH] Join multiline postgres logs in compute_ctl (#5903) Postgres can write multiline logs, and they are difficult to handle after they are mixed with other logs. This PR combines multiline logs from postgres into a single line, where previous line breaks are replaced with unicode zero-width spaces. Then postgres logs are written to stderr with `PG:` prefix. It makes it easy to distinguish postgres logs from all other compute logs with a simple grep, e.g. `|= "PG:"` --- compute_tools/src/bin/compute_ctl.rs | 8 +++- compute_tools/src/compute.rs | 37 ++++++++++---- compute_tools/src/pg_helpers.rs | 72 ++++++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 9 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index eb1d746f04..2eaad5c3c0 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -350,7 +350,7 @@ fn main() -> Result<()> { // Wait for the child Postgres process forever. In this state Ctrl+C will // propagate to Postgres and it will be shut down as well. - if let Some(mut pg) = pg { + if let Some((mut pg, logs_handle)) = pg { // Startup is finished, exit the startup tracing span drop(startup_context_guard); @@ -358,6 +358,12 @@ fn main() -> Result<()> { .wait() .expect("failed to start waiting on Postgres process"); PG_PID.store(0, Ordering::SeqCst); + + // Process has exited, so we can join the logs thread. + let _ = logs_handle + .join() + .map_err(|e| tracing::error!("log thread panicked: {:?}", e)); + info!("Postgres exited with code {}, shutting down", ecode); exit_code = ecode.code() } diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 13701b7378..77130db2d6 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -495,7 +495,7 @@ impl ComputeNode { pub fn sync_safekeepers(&self, storage_auth_token: Option) -> Result { let start_time = Utc::now(); - let sync_handle = maybe_cgexec(&self.pgbin) + let mut sync_handle = maybe_cgexec(&self.pgbin) .args(["--sync-safekeepers"]) .env("PGDATA", &self.pgdata) // we cannot use -D in this mode .envs(if let Some(storage_auth_token) = &storage_auth_token { @@ -504,18 +504,30 @@ impl ComputeNode { vec![] }) .stdout(Stdio::piped()) + .stderr(Stdio::piped()) .spawn() .expect("postgres --sync-safekeepers failed to start"); SYNC_SAFEKEEPERS_PID.store(sync_handle.id(), Ordering::SeqCst); // `postgres --sync-safekeepers` will print all log output to stderr and - // final LSN to stdout. So we pipe only stdout, while stderr will be automatically - // redirected to the caller output. + // final LSN to stdout. So we leave stdout to collect LSN, while stderr logs + // will be collected in a child thread. + let stderr = sync_handle + .stderr + .take() + .expect("stderr should be captured"); + let logs_handle = handle_postgres_logs(stderr); + let sync_output = sync_handle .wait_with_output() .expect("postgres --sync-safekeepers failed"); SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst); + // Process has exited, so we can join the logs thread. + let _ = logs_handle + .join() + .map_err(|e| tracing::error!("log thread panicked: {:?}", e)); + if !sync_output.status.success() { anyhow::bail!( "postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}", @@ -652,11 +664,12 @@ impl ComputeNode { /// Start Postgres as a child process and manage DBs/roles. /// After that this will hang waiting on the postmaster process to exit. + /// Returns a handle to the child process and a handle to the logs thread. #[instrument(skip_all)] pub fn start_postgres( &self, storage_auth_token: Option, - ) -> Result { + ) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> { let pgdata_path = Path::new(&self.pgdata); // Run postgres as a child process. @@ -667,13 +680,18 @@ impl ComputeNode { } else { vec![] }) + .stderr(Stdio::piped()) .spawn() .expect("cannot start postgres process"); PG_PID.store(pg.id(), Ordering::SeqCst); + // Start a thread to collect logs from stderr. + let stderr = pg.stderr.take().expect("stderr should be captured"); + let logs_handle = handle_postgres_logs(stderr); + wait_for_postgres(&mut pg, pgdata_path)?; - Ok(pg) + Ok((pg, logs_handle)) } /// Do initial configuration of the already started Postgres. @@ -818,7 +836,10 @@ impl ComputeNode { } #[instrument(skip_all)] - pub fn start_compute(&self, extension_server_port: u16) -> Result { + pub fn start_compute( + &self, + extension_server_port: u16, + ) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> { let compute_state = self.state.lock().unwrap().clone(); let pspec = compute_state.pspec.as_ref().expect("spec must be set"); info!( @@ -889,7 +910,7 @@ impl ComputeNode { self.prepare_pgdata(&compute_state, extension_server_port)?; let start_time = Utc::now(); - let pg = self.start_postgres(pspec.storage_auth_token.clone())?; + let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?; let config_time = Utc::now(); if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates { @@ -939,7 +960,7 @@ impl ComputeNode { }; info!(?metrics, "compute start finished"); - Ok(pg) + Ok(pg_process) } // Look for core dumps and collect backtraces. diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index 0b0e137c03..bde1ba0a88 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -6,12 +6,15 @@ use std::io::{BufRead, BufReader}; use std::os::unix::fs::PermissionsExt; use std::path::Path; use std::process::Child; +use std::thread::JoinHandle; use std::time::{Duration, Instant}; use anyhow::{bail, Result}; use ini::Ini; use notify::{RecursiveMode, Watcher}; use postgres::{Client, Transaction}; +use tokio::io::AsyncBufReadExt; +use tokio::time::timeout; use tokio_postgres::NoTls; use tracing::{debug, error, info, instrument}; @@ -426,3 +429,72 @@ pub async fn tune_pgbouncer( Ok(()) } + +/// Spawn a thread that will read Postgres logs from `stderr`, join multiline logs +/// and send them to the logger. In the future we may also want to add context to +/// these logs. +pub fn handle_postgres_logs(stderr: std::process::ChildStderr) -> JoinHandle<()> { + std::thread::spawn(move || { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build tokio runtime"); + + let res = runtime.block_on(async move { + let stderr = tokio::process::ChildStderr::from_std(stderr)?; + handle_postgres_logs_async(stderr).await + }); + if let Err(e) = res { + tracing::error!("error while processing postgres logs: {}", e); + } + }) +} + +/// Read Postgres logs from `stderr` until EOF. Buffer is flushed on one of the following conditions: +/// - next line starts with timestamp +/// - EOF +/// - no new lines were written for the last second +async fn handle_postgres_logs_async(stderr: tokio::process::ChildStderr) -> Result<()> { + let mut lines = tokio::io::BufReader::new(stderr).lines(); + let timeout_duration = Duration::from_secs(1); + let ts_regex = + regex::Regex::new(r"^\d+-\d{2}-\d{2} \d{2}:\d{2}:\d{2}").expect("regex is valid"); + + let mut buf = vec![]; + loop { + let next_line = timeout(timeout_duration, lines.next_line()).await; + + // we should flush lines from the buffer if we cannot continue reading multiline message + let should_flush_buf = match next_line { + // Flushing if new line starts with timestamp + Ok(Ok(Some(ref line))) => ts_regex.is_match(line), + // Flushing on EOF, timeout or error + _ => true, + }; + + if !buf.is_empty() && should_flush_buf { + // join multiline message into a single line, separated by unicode Zero Width Space. + // "PG:" suffix is used to distinguish postgres logs from other logs. + let combined = format!("PG:{}\n", buf.join("\u{200B}")); + buf.clear(); + + // sync write to stderr to avoid interleaving with other logs + use std::io::Write; + let res = std::io::stderr().lock().write_all(combined.as_bytes()); + if let Err(e) = res { + tracing::error!("error while writing to stderr: {}", e); + } + } + + // if not timeout, append line to the buffer + if next_line.is_ok() { + match next_line?? { + Some(line) => buf.push(line), + // EOF + None => break, + }; + } + } + + Ok(()) +}