Compare commits

...

5 Commits

Author SHA1 Message Date
Christian Schwarz
453e7b6b99 debug spans 2023-12-13 15:15:51 +00:00
Christian Schwarz
468a556ebb more spans 2023-12-13 15:15:51 +00:00
Christian Schwarz
113e737fb5 spans 2023-12-13 15:15:51 +00:00
Christian Schwarz
d0c2d56fbd extract env var stuff into other utility module 2023-12-13 15:04:28 +00:00
Christian Schwarz
bcd4fb7db2 utils::logging: implement tracing_chrome & tracing_flame support 2023-12-13 14:47:31 +00:00
20 changed files with 309 additions and 66 deletions

42
Cargo.lock generated
View File

@@ -2780,6 +2780,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
@@ -3040,6 +3050,12 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "p256"
version = "0.11.1"
@@ -3139,6 +3155,7 @@ dependencies = [
"tokio",
"tokio-io-timeout",
"tokio-postgres",
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit",
@@ -5481,6 +5498,17 @@ dependencies = [
"syn 2.0.28",
]
[[package]]
name = "tracing-chrome"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "496b3cd5447f7ff527bbbf19b071ad542a000adf297d4127078b4dfdb931f41a"
dependencies = [
"serde_json",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "tracing-core"
version = "0.1.31"
@@ -5501,6 +5529,17 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "tracing-flame"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bae117ee14789185e129aaee5d93750abe67fdc5a9a62650452bfe4e122a3a9"
dependencies = [
"lazy_static",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
@@ -5553,6 +5592,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
@@ -5773,7 +5813,9 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tracing",
"tracing-chrome",
"tracing-error",
"tracing-flame",
"tracing-subscriber",
"url",
"uuid",

View File

@@ -86,7 +86,10 @@ where
.stdout(process_log_file)
.stderr(same_file_for_stderr)
.args(args);
let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
let filled_cmd = fill_env_vars_prefixed_neon(fill_remote_storage_secrets_vars(
fill_rust_env_vars(background_command),
));
filled_cmd.envs(envs);
let pid_file_to_check = match initial_pid_file {
@@ -253,6 +256,15 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
cmd
}
fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command {
for (var, val) in std::env::vars() {
if var.starts_with("NEON_") {
cmd = cmd.env(var, val);
}
}
cmd
}
/// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
/// 1. Claims a pidfile with a fcntl lock on it and
/// 2. Sets up the pidfile's file descriptor so that it (and the lock)

View File

@@ -299,7 +299,7 @@ fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body,
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::init(
let _guard = logging::init(
LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,

View File

@@ -274,7 +274,7 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res
fn ensure_logging_ready() {
LOGGING_DONE.get_or_init(|| {
utils::logging::init(
let _ = utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::Output::Stdout,

View File

@@ -200,7 +200,7 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
fn ensure_logging_ready() {
LOGGING_DONE.get_or_init(|| {
utils::logging::init(
let _ = utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::Output::Stdout,

View File

@@ -49,6 +49,8 @@ const_format.workspace = true
# to use tokio channels as streams, this is faster to compile than async_stream
# why is it only here? no other crate should use it, streams are rarely needed.
tokio-stream = { version = "0.1.14" }
tracing-chrome = "0.7.1"
tracing-flame = "0.2.0"
[dev-dependencies]
byteorder.workspace = true

View File

@@ -0,0 +1,48 @@
use std::{fmt::Display, str::FromStr};
pub fn var<V, E, D>(varname: &str, default: D) -> V
where
V: FromStr<Err = E>,
E: Display,
D: FnOnce() -> V,
{
match std::env::var(varname) {
Ok(s) => s
.parse()
.map_err(|e| format!("failed to parse env var {varname}: {e:#}"))
.unwrap(),
Err(std::env::VarError::NotPresent) => default(),
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var {varname} is not unicode")
}
}
}
pub struct Bool(bool);
impl Bool {
pub const fn new_const<const V: bool>() -> Self {
Bool(V)
}
}
impl FromStr for Bool {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(b) = s.parse() {
return Ok(b);
}
Ok(Bool(match s {
"0" => false,
"1" => true,
_ => return Err(format!("not a bool, accepting 0|1|{}|{}", false, true)),
}))
}
}
impl Into<bool> for Bool {
fn into(self) -> bool {
self.0
}
}

View File

@@ -27,6 +27,7 @@ pub mod auth;
pub mod id;
mod hex;
pub mod env_config;
pub use hex::Hex;
// http endpoint utils

View File

@@ -1,9 +1,15 @@
use std::str::FromStr;
use std::{
io::BufWriter,
str::FromStr,
sync::{Arc, Mutex},
};
use anyhow::Context;
use once_cell::sync::Lazy;
use strum_macros::{EnumString, EnumVariantNames};
use super::env_config;
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
#[strum(serialize_all = "snake_case")]
pub enum LogFormat {
@@ -73,11 +79,26 @@ pub enum Output {
Stderr,
}
/// Keep alive and drop it before the program terminates.
#[must_use]
pub struct FlushGuard(Arc<Mutex<FlushGuardInner>>);
struct FlushGuardInner {
_tracing_chrome_layer: Option<tracing_chrome::FlushGuard>,
_tracing_flame_layer: Option<tracing_flame::FlushGuard<BufWriter<std::fs::File>>>,
}
impl From<FlushGuardInner> for FlushGuard {
fn from(value: FlushGuardInner) -> Self {
Self(Arc::new(Mutex::new(value)))
}
}
pub fn init(
log_format: LogFormat,
tracing_error_layer_enablement: TracingErrorLayerEnablement,
output: Output,
) -> anyhow::Result<()> {
) -> anyhow::Result<FlushGuard> {
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let rust_log_env_filter = || {
@@ -88,8 +109,28 @@ pub fn init(
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
let r = tracing_subscriber::registry();
let r = r.with({
// https://users.rust-lang.org/t/how-can-i-init-tracing-registry-dynamically-with-multiple-outputs/94307/6
#[derive(Default)]
struct LayerStack {
layers:
Option<Box<dyn tracing_subscriber::Layer<tracing_subscriber::Registry> + Sync + Send>>,
}
impl LayerStack {
fn add_layer<L>(&mut self, new_layer: L)
where
L: tracing_subscriber::Layer<tracing_subscriber::Registry> + Send + Sync,
{
let new = match self.layers.take() {
Some(layers) => Some(layers.and_then(new_layer).boxed()),
None => Some(new_layer.boxed()),
};
self.layers = new;
}
}
let mut layers = LayerStack::default();
layers.add_layer({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(false)
@@ -106,15 +147,58 @@ pub fn init(
};
log_layer.with_filter(rust_log_env_filter())
});
let r = r.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
layers
.add_layer(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
let tracing_chrome_layer_flush_guard = if env_config::var(
"NEON_UTILS_LOGGING_ENABLE_TRACING_CHROME",
env_config::Bool::new_const::<false>,
)
.into()
{
let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
.trace_style(tracing_chrome::TraceStyle::Async)
.build();
layers.add_layer(layer.with_filter(rust_log_env_filter()));
Some(guard)
} else {
None
};
let tracing_flame_flush_guard = if env_config::var(
"NEON_UTILS_LOGGING_ENABLE_TRACING_FLAME",
env_config::Bool::new_const::<false>,
)
.into()
{
let (layer, guard) = tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
let layer = layer
.with_empty_samples(false)
.with_module_path(false)
.with_file_and_line(false)
.with_threads_collapsed(true);
layers.add_layer(layer.with_filter(rust_log_env_filter()));
Some(guard)
} else {
None
};
match tracing_error_layer_enablement {
TracingErrorLayerEnablement::EnableWithRustLogFilter => r
.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
.init(),
TracingErrorLayerEnablement::Disabled => r.init(),
TracingErrorLayerEnablement::EnableWithRustLogFilter => layers
.add_layer(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter())),
TracingErrorLayerEnablement::Disabled => (),
}
Ok(())
let r = tracing_subscriber::registry();
r.with(layers.layers.expect("we add at least one layer"))
.init();
Ok(FlushGuardInner {
_tracing_chrome_layer: tracing_chrome_layer_flush_guard,
_tracing_flame_layer: tracing_flame_flush_guard,
}
.into())
}
/// Disable the default rust panic hook by using `set_hook`.

View File

@@ -62,6 +62,7 @@ thiserror.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
tokio-io-timeout.workspace = true
tokio-postgres.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }
tracing.workspace = true

View File

@@ -166,71 +166,111 @@ where
}
}
// Gather non-relational files from object storage pages.
debug!("Gather non-relational files from object storage pages");
for kind in [
SlruKind::Clog,
SlruKind::MultiXactOffsets,
SlruKind::MultiXactMembers,
] {
for segno in self
.timeline
.list_slru_segments(kind, self.lsn, self.ctx)
.await?
{
self.add_slru_segment(kind, segno).await?;
async {
debug!("list slru segments");
for segno in self
.timeline
.list_slru_segments(kind, self.lsn, self.ctx)
.await?
{
async {
debug!("add slru segment");
self.add_slru_segment(kind, segno).await?;
anyhow::Ok(())
}
.instrument(debug_span!("slru segment", ?segno))
.await?;
}
anyhow::Ok(())
}
.instrument(debug_span!("non-rel file", ?kind))
.await?;
}
let mut min_restart_lsn: Lsn = Lsn::MAX;
// Create tablespace directories
debug!("Create tablespace directories");
for ((spcnode, dbnode), has_relmap_file) in
self.timeline.list_dbdirs(self.lsn, self.ctx).await?
{
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
async {
debug!("iter");
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
// If full backup is requested, include all relation files.
// Otherwise only include init forks of unlogged relations.
let rels = self
.timeline
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
.await?;
for &rel in rels.iter() {
// Send init fork as main fork to provide well formed empty
// contents of UNLOGGED relations. Postgres copies it in
// `reinit.c` during recovery.
if rel.forknum == INIT_FORKNUM {
// I doubt we need _init fork itself, but having it at least
// serves as a marker relation is unlogged.
self.add_rel(rel, rel).await?;
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
continue;
}
// If full backup is requested, include all relation files.
// Otherwise only include init forks of unlogged relations.
debug!("list rels");
let rels = self
.timeline
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
.await?;
for &rel in rels.iter() {
async {
debug!("iter");
// Send init fork as main fork to provide well formed empty
// contents of UNLOGGED relations. Postgres copies it in
// `reinit.c` during recovery.
if rel.forknum == INIT_FORKNUM {
// I doubt we need _init fork itself, but having it at least
// serves as a marker relation is unlogged.
self.add_rel(rel, rel).await?;
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
return Ok(());
}
if self.full_backup {
if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
{
// skip this, will include it when we reach the init fork
continue;
if self.full_backup {
if rel.forknum == MAIN_FORKNUM
&& rels.contains(&rel.with_forknum(INIT_FORKNUM))
{
// skip this, will include it when we reach the init fork
return Ok(());
}
self.add_rel(rel, rel).await?;
}
anyhow::Ok(())
}
self.add_rel(rel, rel).await?;
.instrument(debug_span!("process rel", ?rel))
.await?;
}
}
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
if path.starts_with("pg_replslot") {
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
let restart_lsn = Lsn(u64::from_le_bytes(
content[offs..offs + 8].try_into().unwrap(),
));
info!("Replication slot {} restart LSN={}", path, restart_lsn);
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
debug!("list aux files");
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
async {
debug!("iter");
if path.starts_with("pg_replslot") {
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
let restart_lsn = Lsn(u64::from_le_bytes(
content[offs..offs + 8].try_into().unwrap(),
));
info!("Replication slot {} restart LSN={}", path, restart_lsn);
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
}
let header = new_tar_header(&path, content.len() as u64)?;
self.ar
.append(&header, &*content)
.await
.context("could not add aux file to basebackup tarball")?;
anyhow::Ok(())
}
.instrument(debug_span!("process aux file", ?path))
.await?;
}
let header = new_tar_header(&path, content.len() as u64)?;
self.ar
.append(&header, &*content)
.await
.context("could not add aux file to basebackup tarball")?;
debug!("done");
anyhow::Ok(())
}
.instrument(debug_span!(
"process tablespace directory",
?spcnode,
?dbnode
))
.await?;
}
if min_restart_lsn != Lsn::MAX {
info!(
@@ -244,19 +284,25 @@ where
.await
.context("could not add restart.lsn file to basebackup tarball")?;
}
debug!("list twophase files");
for xid in self
.timeline
.list_twophase_files(self.lsn, self.ctx)
.await?
{
self.add_twophase_file(xid).await?;
async {
self.add_twophase_file(xid).await?;
anyhow::Ok(())
}
.instrument(debug_span!("process twophase file", ?xid))
.await?;
}
fail_point!("basebackup-before-control-file", |_| {
bail!("failpoint basebackup-before-control-file")
});
// Generate pg_control and bootstrap WAL segment.
debug!("Generate pg_control and bootstrap WAL segment.");
self.add_pgcontrol_file().await?;
self.ar.finish().await?;
debug!("all tarred up!");

View File

@@ -103,7 +103,7 @@ fn main() -> anyhow::Result<()> {
} else {
TracingErrorLayerEnablement::Disabled
};
logging::init(
let _guard = logging::init(
conf.log_format,
tracing_error_layer_enablement,
logging::Output::Stdout,

View File

@@ -3716,7 +3716,7 @@ pub(crate) mod harness {
pub deletion_queue: MockDeletionQueue,
}
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
static LOG_HANDLE: OnceCell<logging::FlushGuard> = OnceCell::new();
pub(crate) fn setup_logging() {
LOG_HANDLE.get_or_init(|| {

View File

@@ -20,12 +20,14 @@ use std::io::{Error, ErrorKind};
impl<'a> BlockCursor<'a> {
/// Read a blob into a new buffer.
#[tracing::instrument(skip_all, fields(%offset), level = tracing::Level::DEBUG)]
pub async fn read_blob(
&self,
offset: u64,
ctx: &RequestContext,
) -> Result<Vec<u8>, std::io::Error> {
let mut buf = Vec::new();
tracing::debug!("reading blob");
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
Ok(buf)
}

View File

@@ -141,6 +141,7 @@ impl<'a> BlockCursor<'a> {
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
#[inline(always)]
#[tracing::instrument(skip_all, level = tracing::Level::DEBUG)]
pub async fn read_blk(
&self,
blknum: u32,

View File

@@ -181,6 +181,7 @@ impl LayerMap {
/// NOTE: This only searches the 'historic' layers, *not* the
/// 'open' and 'frozen' layers!
///
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
let latest_delta = version.delta_coverage.query(key.to_i128());

View File

@@ -234,6 +234,7 @@ impl Layer {
/// # Cancellation-Safety
///
/// This method is cancellation-safe.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn get_value_reconstruct_data(
&self,
key: Key,

View File

@@ -498,6 +498,7 @@ impl Timeline {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
#[instrument(skip_all, fields(%key, %lsn), level = tracing::Level::DEBUG)]
pub async fn get(
&self,
key: Key,
@@ -2198,6 +2199,7 @@ impl Timeline {
/// # Cancel-Safety
///
/// This method is cancellation-safe.
#[instrument(level = tracing::Level::DEBUG, skip_all)]
async fn get_reconstruct_data(
&self,
key: Key,

View File

@@ -200,7 +200,7 @@ async fn main() -> anyhow::Result<()> {
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(
let _guard = logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,

View File

@@ -431,7 +431,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(
let _guard = logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,