Compare commits

...

4 Commits

16 changed files with 227 additions and 33 deletions

41
Cargo.lock generated
View File

@@ -3125,6 +3125,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num"
version = "0.4.1"
@@ -3461,6 +3471,12 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "p256"
version = "0.11.1"
@@ -6290,6 +6306,17 @@ dependencies = [
"syn 2.0.52",
]
[[package]]
name = "tracing-chrome"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "496b3cd5447f7ff527bbbf19b071ad542a000adf297d4127078b4dfdb931f41a"
dependencies = [
"serde_json",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "tracing-core"
version = "0.1.31"
@@ -6310,6 +6337,17 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "tracing-flame"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bae117ee14789185e129aaee5d93750abe67fdc5a9a62650452bfe4e122a3a9"
dependencies = [
"lazy_static",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-log"
version = "0.1.3"
@@ -6352,6 +6390,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
@@ -6597,7 +6636,9 @@ dependencies = [
"tokio-tar",
"tokio-util",
"tracing",
"tracing-chrome",
"tracing-error",
"tracing-flame",
"tracing-subscriber",
"url",
"uuid",

View File

@@ -171,7 +171,7 @@ fn main() -> anyhow::Result<()> {
async fn async_main() -> anyhow::Result<()> {
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
logging::init(
let _guard = logging::init(
LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,

View File

@@ -86,7 +86,10 @@ where
.stdout(process_log_file)
.stderr(same_file_for_stderr)
.args(args);
let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
let filled_cmd = fill_env_vars_prefixed_neon(fill_remote_storage_secrets_vars(
fill_rust_env_vars(background_command),
));
filled_cmd.envs(envs);
let pid_file_to_check = match &initial_pid_file {
@@ -268,6 +271,15 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
cmd
}
fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command {
for (var, val) in std::env::vars() {
if var.starts_with("NEON_") {
cmd = cmd.env(var, val);
}
}
cmd
}
/// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
/// 1. Claims a pidfile with a fcntl lock on it and
/// 2. Sets up the pidfile's file descriptor so that it (and the lock)

View File

@@ -154,7 +154,7 @@ mod reliable_copy_test {
/// Run test simulations.
#[test]
fn sim_example_reliable_copy() {
utils::logging::init(
let _guard = utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::Output::Stdout,

View File

@@ -205,7 +205,7 @@ pub(crate) async fn upload_remote_data(
pub(crate) fn ensure_logging_ready() {
LOGGING_DONE.get_or_init(|| {
utils::logging::init(
let _ = utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::Output::Stdout,

View File

@@ -60,6 +60,8 @@ const_format.workspace = true
# to use tokio channels as streams, this is faster to compile than async_stream
# why is it only here? no other crate should use it, streams are rarely needed.
tokio-stream = { version = "0.1.14" }
tracing-chrome = "0.7.1"
tracing-flame = "0.2.0"
serde_path_to_error.workspace = true

View File

@@ -0,0 +1,48 @@
use std::{fmt::Display, str::FromStr};
pub fn var<V, E, D>(varname: &str, default: D) -> V
where
V: FromStr<Err = E>,
E: Display,
D: FnOnce() -> V,
{
match std::env::var(varname) {
Ok(s) => s
.parse()
.map_err(|e| format!("failed to parse env var {varname}: {e:#}"))
.unwrap(),
Err(std::env::VarError::NotPresent) => default(),
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var {varname} is not unicode")
}
}
}
pub struct Bool(bool);
impl Bool {
pub const fn new_const<const V: bool>() -> Self {
Bool(V)
}
}
impl FromStr for Bool {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(b) = s.parse() {
return Ok(Bool(b));
}
Ok(Bool(match s {
"0" => false,
"1" => true,
_ => return Err(format!("not a bool, accepting 0|1|{}|{}", false, true)),
}))
}
}
impl Into<bool> for Bool {
fn into(self) -> bool {
self.0
}
}

View File

@@ -27,6 +27,7 @@ pub mod auth;
pub mod id;
mod hex;
pub mod env_config;
pub use hex::Hex;
// http endpoint utils

View File

@@ -1,10 +1,16 @@
use std::str::FromStr;
use std::{
io::BufWriter,
str::FromStr,
sync::{Arc, Mutex},
};
use anyhow::Context;
use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, EnumVariantNames};
use super::env_config;
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
#[strum(serialize_all = "snake_case")]
pub enum LogFormat {
@@ -98,11 +104,26 @@ pub enum Output {
Stderr,
}
/// Keep alive and drop it before the program terminates.
#[must_use]
pub struct FlushGuard(Arc<Mutex<FlushGuardInner>>);
struct FlushGuardInner {
_tracing_chrome_layer: Option<tracing_chrome::FlushGuard>,
_tracing_flame_layer: Option<tracing_flame::FlushGuard<BufWriter<std::fs::File>>>,
}
impl From<FlushGuardInner> for FlushGuard {
fn from(value: FlushGuardInner) -> Self {
Self(Arc::new(Mutex::new(value)))
}
}
pub fn init(
log_format: LogFormat,
tracing_error_layer_enablement: TracingErrorLayerEnablement,
output: Output,
) -> anyhow::Result<()> {
) -> anyhow::Result<FlushGuard> {
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let rust_log_env_filter = || {
@@ -113,8 +134,28 @@ pub fn init(
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
let r = tracing_subscriber::registry();
let r = r.with({
// https://users.rust-lang.org/t/how-can-i-init-tracing-registry-dynamically-with-multiple-outputs/94307/6
#[derive(Default)]
struct LayerStack {
layers:
Option<Box<dyn tracing_subscriber::Layer<tracing_subscriber::Registry> + Sync + Send>>,
}
impl LayerStack {
fn add_layer<L>(&mut self, new_layer: L)
where
L: tracing_subscriber::Layer<tracing_subscriber::Registry> + Send + Sync,
{
let new = match self.layers.take() {
Some(layers) => Some(layers.and_then(new_layer).boxed()),
None => Some(new_layer.boxed()),
};
self.layers = new;
}
}
let mut layers = LayerStack::default();
layers.add_layer({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(false)
@@ -131,17 +172,60 @@ pub fn init(
};
log_layer.with_filter(rust_log_env_filter())
});
let r = r.with(
layers
.add_layer(
TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),
);
let tracing_chrome_layer_flush_guard = if env_config::var(
"NEON_UTILS_LOGGING_ENABLE_TRACING_CHROME",
env_config::Bool::new_const::<false>,
)
.into()
{
let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
.trace_style(tracing_chrome::TraceStyle::Threaded)
.build();
layers.add_layer(layer.with_filter(rust_log_env_filter()));
Some(guard)
} else {
None
};
let tracing_flame_flush_guard = if env_config::var(
"NEON_UTILS_LOGGING_ENABLE_TRACING_FLAME",
env_config::Bool::new_const::<false>,
)
.into()
{
let (layer, guard) = tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
let layer = layer
.with_empty_samples(false)
.with_module_path(false)
.with_file_and_line(false)
.with_threads_collapsed(true);
layers.add_layer(layer.with_filter(rust_log_env_filter()));
Some(guard)
} else {
None
};
match tracing_error_layer_enablement {
TracingErrorLayerEnablement::EnableWithRustLogFilter => r
.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
.init(),
TracingErrorLayerEnablement::Disabled => r.init(),
TracingErrorLayerEnablement::EnableWithRustLogFilter => layers
.add_layer(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter())),
TracingErrorLayerEnablement::Disabled => (),
}
Ok(())
let r = tracing_subscriber::registry();
r.with(layers.layers.expect("we add at least one layer"))
.init();
Ok(FlushGuardInner {
_tracing_chrome_layer: tracing_chrome_layer_flush_guard,
_tracing_flame_layer: tracing_flame_flush_guard,
}
.into())
}
/// Disable the default rust panic hook by using `set_hook`.

View File

@@ -22,12 +22,16 @@ pub async fn create_zst_tarball(path: &Utf8Path, tarball: &Utf8Path) -> Result<(
let file = OpenOptions::new()
.create(true)
.truncate(true)
.read(true)
.write(true)
.open(&tarball)
.await
.with_context(|| format!("tempfile creation {tarball}"))?;
let buffered_file = tokio::io::BufWriter::with_capacity(
128 * 1024, /* TODO use BUFFER_SIZE, same with other constant */
file,
);
let mut paths = Vec::new();
for entry in WalkDir::new(path) {
let entry = entry?;
@@ -42,7 +46,7 @@ pub async fn create_zst_tarball(path: &Utf8Path, tarball: &Utf8Path) -> Result<(
// Do a sort to get a more consistent listing
paths.sort_unstable();
let zstd = ZstdEncoder::with_quality_and_params(
file,
buffered_file,
Level::Default,
&[CParameter::enable_long_distance_matching(true)],
);
@@ -60,7 +64,9 @@ pub async fn create_zst_tarball(path: &Utf8Path, tarball: &Utf8Path) -> Result<(
}
let mut zstd = builder.into_inner().await?;
zstd.shutdown().await?;
let mut compressed = zstd.into_inner();
let mut compressed_buffered: tokio::io::BufWriter<File> = zstd.into_inner();
compressed_buffered.flush().await?;
let mut compressed = compressed_buffered.into_inner();
let compressed_len = compressed.metadata().await?.len();
compressed.seek(SeekFrom::Start(0)).await?;
Ok((compressed, compressed_len))

View File

@@ -30,7 +30,7 @@ enum Args {
}
fn main() {
logging::init(
let _guard = logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stderr,

View File

@@ -104,7 +104,7 @@ fn main() -> anyhow::Result<()> {
} else {
TracingErrorLayerEnablement::Disabled
};
logging::init(
let _guard = logging::init(
conf.log_format,
tracing_error_layer_enablement,
logging::Output::Stdout,

View File

@@ -72,10 +72,11 @@ pub async fn import_timeline_from_postgres_datadir(
let absolute_path = entry.path();
let relative_path = absolute_path.strip_prefix(pgdata_path)?;
let mut file = tokio::fs::File::open(absolute_path).await?;
let file = tokio::fs::File::open(absolute_path).await?;
let mut bufread = tokio::io::BufReader::with_capacity(128 * 1024, file);
let len = metadata.len() as usize;
if let Some(control_file) =
import_file(&mut modification, relative_path, &mut file, len, ctx).await?
import_file(&mut modification, relative_path, &mut bufread, len, ctx).await?
{
pg_control = Some(control_file);
}
@@ -288,15 +289,14 @@ async fn import_wal(
}
// Slurp the WAL file
let mut file = std::fs::File::open(&path)?;
let mut file = tokio::fs::File::open(&path).await?;
if offset > 0 {
use std::io::Seek;
file.seek(std::io::SeekFrom::Start(offset as u64))?;
use tokio::io::AsyncSeekExt;
file.seek(std::io::SeekFrom::Start(offset as u64)).await?;
}
use std::io::Read;
let nread = file.read_to_end(&mut buf)?;
let nread = file.read_to_end(&mut buf).await?;
if nread != WAL_SEGMENT_SIZE - offset {
// Maybe allow this for .partial files?
error!("read only {} bytes from WAL file", nread);

View File

@@ -3186,10 +3186,10 @@ impl Tenant {
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
// Upload the created data dir to S3
if self.tenant_shard_id().is_zero() {
self.upload_initdb(&timelines_path, &pgdata_path, &timeline_id)
.await?;
}
// if self.tenant_shard_id().is_zero() {
// self.upload_initdb(&timelines_path, &pgdata_path, &timeline_id)
// .await?;
// }
}
let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
@@ -3671,7 +3671,7 @@ pub(crate) mod harness {
pub deletion_queue: MockDeletionQueue,
}
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
static LOG_HANDLE: OnceCell<logging::FlushGuard> = OnceCell::new();
pub(crate) fn setup_logging() {
LOG_HANDLE.get_or_init(|| {

View File

@@ -219,7 +219,7 @@ async fn main() -> anyhow::Result<()> {
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(
let _guard = logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,

View File

@@ -632,7 +632,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(
let _guard = logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,