Join multiline postgres logs in compute_ctl (#5903)

Postgres can write multiline logs, and they are difficult to handle
after they are mixed with other logs. This PR combines multiline logs
from postgres into a single line, where previous line breaks are
replaced with unicode zero-width spaces. Then postgres logs are written
to stderr with `PG:` prefix.

It makes it easy to distinguish postgres logs from all other compute
logs with a simple grep, e.g. `|= "PG:"`
This commit is contained in:
Arthur Petukhovsky
2024-01-10 19:11:43 +04:00
committed by GitHub
parent 76372ce002
commit 71beabf82d
3 changed files with 108 additions and 9 deletions

View File

@@ -350,7 +350,7 @@ fn main() -> Result<()> {
// Wait for the child Postgres process forever. In this state Ctrl+C will // Wait for the child Postgres process forever. In this state Ctrl+C will
// propagate to Postgres and it will be shut down as well. // propagate to Postgres and it will be shut down as well.
if let Some(mut pg) = pg { if let Some((mut pg, logs_handle)) = pg {
// Startup is finished, exit the startup tracing span // Startup is finished, exit the startup tracing span
drop(startup_context_guard); drop(startup_context_guard);
@@ -358,6 +358,12 @@ fn main() -> Result<()> {
.wait() .wait()
.expect("failed to start waiting on Postgres process"); .expect("failed to start waiting on Postgres process");
PG_PID.store(0, Ordering::SeqCst); PG_PID.store(0, Ordering::SeqCst);
// Process has exited, so we can join the logs thread.
let _ = logs_handle
.join()
.map_err(|e| tracing::error!("log thread panicked: {:?}", e));
info!("Postgres exited with code {}, shutting down", ecode); info!("Postgres exited with code {}, shutting down", ecode);
exit_code = ecode.code() exit_code = ecode.code()
} }

View File

@@ -495,7 +495,7 @@ impl ComputeNode {
pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> { pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let start_time = Utc::now(); let start_time = Utc::now();
let sync_handle = maybe_cgexec(&self.pgbin) let mut sync_handle = maybe_cgexec(&self.pgbin)
.args(["--sync-safekeepers"]) .args(["--sync-safekeepers"])
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode .env("PGDATA", &self.pgdata) // we cannot use -D in this mode
.envs(if let Some(storage_auth_token) = &storage_auth_token { .envs(if let Some(storage_auth_token) = &storage_auth_token {
@@ -504,18 +504,30 @@ impl ComputeNode {
vec![] vec![]
}) })
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn() .spawn()
.expect("postgres --sync-safekeepers failed to start"); .expect("postgres --sync-safekeepers failed to start");
SYNC_SAFEKEEPERS_PID.store(sync_handle.id(), Ordering::SeqCst); SYNC_SAFEKEEPERS_PID.store(sync_handle.id(), Ordering::SeqCst);
// `postgres --sync-safekeepers` will print all log output to stderr and // `postgres --sync-safekeepers` will print all log output to stderr and
// final LSN to stdout. So we pipe only stdout, while stderr will be automatically // final LSN to stdout. So we leave stdout to collect LSN, while stderr logs
// redirected to the caller output. // will be collected in a child thread.
let stderr = sync_handle
.stderr
.take()
.expect("stderr should be captured");
let logs_handle = handle_postgres_logs(stderr);
let sync_output = sync_handle let sync_output = sync_handle
.wait_with_output() .wait_with_output()
.expect("postgres --sync-safekeepers failed"); .expect("postgres --sync-safekeepers failed");
SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst); SYNC_SAFEKEEPERS_PID.store(0, Ordering::SeqCst);
// Process has exited, so we can join the logs thread.
let _ = logs_handle
.join()
.map_err(|e| tracing::error!("log thread panicked: {:?}", e));
if !sync_output.status.success() { if !sync_output.status.success() {
anyhow::bail!( anyhow::bail!(
"postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}", "postgres --sync-safekeepers exited with non-zero status: {}. stdout: {}",
@@ -652,11 +664,12 @@ impl ComputeNode {
/// Start Postgres as a child process and manage DBs/roles. /// Start Postgres as a child process and manage DBs/roles.
/// After that this will hang waiting on the postmaster process to exit. /// After that this will hang waiting on the postmaster process to exit.
/// Returns a handle to the child process and a handle to the logs thread.
#[instrument(skip_all)] #[instrument(skip_all)]
pub fn start_postgres( pub fn start_postgres(
&self, &self,
storage_auth_token: Option<String>, storage_auth_token: Option<String>,
) -> Result<std::process::Child> { ) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
let pgdata_path = Path::new(&self.pgdata); let pgdata_path = Path::new(&self.pgdata);
// Run postgres as a child process. // Run postgres as a child process.
@@ -667,13 +680,18 @@ impl ComputeNode {
} else { } else {
vec![] vec![]
}) })
.stderr(Stdio::piped())
.spawn() .spawn()
.expect("cannot start postgres process"); .expect("cannot start postgres process");
PG_PID.store(pg.id(), Ordering::SeqCst); PG_PID.store(pg.id(), Ordering::SeqCst);
// Start a thread to collect logs from stderr.
let stderr = pg.stderr.take().expect("stderr should be captured");
let logs_handle = handle_postgres_logs(stderr);
wait_for_postgres(&mut pg, pgdata_path)?; wait_for_postgres(&mut pg, pgdata_path)?;
Ok(pg) Ok((pg, logs_handle))
} }
/// Do initial configuration of the already started Postgres. /// Do initial configuration of the already started Postgres.
@@ -818,7 +836,10 @@ impl ComputeNode {
} }
#[instrument(skip_all)] #[instrument(skip_all)]
pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> { pub fn start_compute(
&self,
extension_server_port: u16,
) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
let compute_state = self.state.lock().unwrap().clone(); let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set"); let pspec = compute_state.pspec.as_ref().expect("spec must be set");
info!( info!(
@@ -889,7 +910,7 @@ impl ComputeNode {
self.prepare_pgdata(&compute_state, extension_server_port)?; self.prepare_pgdata(&compute_state, extension_server_port)?;
let start_time = Utc::now(); let start_time = Utc::now();
let pg = self.start_postgres(pspec.storage_auth_token.clone())?; let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
let config_time = Utc::now(); let config_time = Utc::now();
if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates { if pspec.spec.mode == ComputeMode::Primary && !pspec.spec.skip_pg_catalog_updates {
@@ -939,7 +960,7 @@ impl ComputeNode {
}; };
info!(?metrics, "compute start finished"); info!(?metrics, "compute start finished");
Ok(pg) Ok(pg_process)
} }
// Look for core dumps and collect backtraces. // Look for core dumps and collect backtraces.

View File

@@ -6,12 +6,15 @@ use std::io::{BufRead, BufReader};
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::path::Path; use std::path::Path;
use std::process::Child; use std::process::Child;
use std::thread::JoinHandle;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use ini::Ini; use ini::Ini;
use notify::{RecursiveMode, Watcher}; use notify::{RecursiveMode, Watcher};
use postgres::{Client, Transaction}; use postgres::{Client, Transaction};
use tokio::io::AsyncBufReadExt;
use tokio::time::timeout;
use tokio_postgres::NoTls; use tokio_postgres::NoTls;
use tracing::{debug, error, info, instrument}; use tracing::{debug, error, info, instrument};
@@ -426,3 +429,72 @@ pub async fn tune_pgbouncer(
Ok(()) Ok(())
} }
/// Spawn a thread that will read Postgres logs from `stderr`, join multiline logs
/// and send them to the logger. In the future we may also want to add context to
/// these logs.
pub fn handle_postgres_logs(stderr: std::process::ChildStderr) -> JoinHandle<()> {
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build tokio runtime");
let res = runtime.block_on(async move {
let stderr = tokio::process::ChildStderr::from_std(stderr)?;
handle_postgres_logs_async(stderr).await
});
if let Err(e) = res {
tracing::error!("error while processing postgres logs: {}", e);
}
})
}
/// Read Postgres logs from `stderr` until EOF. Buffer is flushed on one of the following conditions:
/// - next line starts with timestamp
/// - EOF
/// - no new lines were written for the last second
async fn handle_postgres_logs_async(stderr: tokio::process::ChildStderr) -> Result<()> {
let mut lines = tokio::io::BufReader::new(stderr).lines();
let timeout_duration = Duration::from_secs(1);
let ts_regex =
regex::Regex::new(r"^\d+-\d{2}-\d{2} \d{2}:\d{2}:\d{2}").expect("regex is valid");
let mut buf = vec![];
loop {
let next_line = timeout(timeout_duration, lines.next_line()).await;
// we should flush lines from the buffer if we cannot continue reading multiline message
let should_flush_buf = match next_line {
// Flushing if new line starts with timestamp
Ok(Ok(Some(ref line))) => ts_regex.is_match(line),
// Flushing on EOF, timeout or error
_ => true,
};
if !buf.is_empty() && should_flush_buf {
// join multiline message into a single line, separated by unicode Zero Width Space.
// "PG:" suffix is used to distinguish postgres logs from other logs.
let combined = format!("PG:{}\n", buf.join("\u{200B}"));
buf.clear();
// sync write to stderr to avoid interleaving with other logs
use std::io::Write;
let res = std::io::stderr().lock().write_all(combined.as_bytes());
if let Err(e) = res {
tracing::error!("error while writing to stderr: {}", e);
}
}
// if not timeout, append line to the buffer
if next_line.is_ok() {
match next_line?? {
Some(line) => buf.push(line),
// EOF
None => break,
};
}
}
Ok(())
}