mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 13:10:38 +00:00
Compare commits
4 Commits
release-pr
...
problame/i
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
57e6b00c4d | ||
|
|
8bc8d9d918 | ||
|
|
bbe9c52357 | ||
|
|
0be19f4a6d |
41
Cargo.lock
generated
41
Cargo.lock
generated
@@ -3125,6 +3125,16 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
|
||||
dependencies = [
|
||||
"overload",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num"
|
||||
version = "0.4.1"
|
||||
@@ -3461,6 +3471,12 @@ version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "p256"
|
||||
version = "0.11.1"
|
||||
@@ -6290,6 +6306,17 @@ dependencies = [
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-chrome"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "496b3cd5447f7ff527bbbf19b071ad542a000adf297d4127078b4dfdb931f41a"
|
||||
dependencies = [
|
||||
"serde_json",
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-core"
|
||||
version = "0.1.31"
|
||||
@@ -6310,6 +6337,17 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-flame"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0bae117ee14789185e129aaee5d93750abe67fdc5a9a62650452bfe4e122a3a9"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-log"
|
||||
version = "0.1.3"
|
||||
@@ -6352,6 +6390,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex",
|
||||
"serde",
|
||||
@@ -6597,7 +6636,9 @@ dependencies = [
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-chrome",
|
||||
"tracing-error",
|
||||
"tracing-flame",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"uuid",
|
||||
|
||||
@@ -171,7 +171,7 @@ fn main() -> anyhow::Result<()> {
|
||||
async fn async_main() -> anyhow::Result<()> {
|
||||
let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate()));
|
||||
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
|
||||
@@ -86,7 +86,10 @@ where
|
||||
.stdout(process_log_file)
|
||||
.stderr(same_file_for_stderr)
|
||||
.args(args);
|
||||
let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
|
||||
|
||||
let filled_cmd = fill_env_vars_prefixed_neon(fill_remote_storage_secrets_vars(
|
||||
fill_rust_env_vars(background_command),
|
||||
));
|
||||
filled_cmd.envs(envs);
|
||||
|
||||
let pid_file_to_check = match &initial_pid_file {
|
||||
@@ -268,6 +271,15 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
|
||||
cmd
|
||||
}
|
||||
|
||||
fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command {
|
||||
for (var, val) in std::env::vars() {
|
||||
if var.starts_with("NEON_") {
|
||||
cmd = cmd.env(var, val);
|
||||
}
|
||||
}
|
||||
cmd
|
||||
}
|
||||
|
||||
/// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
|
||||
/// 1. Claims a pidfile with a fcntl lock on it and
|
||||
/// 2. Sets up the pidfile's file descriptor so that it (and the lock)
|
||||
|
||||
@@ -154,7 +154,7 @@ mod reliable_copy_test {
|
||||
/// Run test simulations.
|
||||
#[test]
|
||||
fn sim_example_reliable_copy() {
|
||||
utils::logging::init(
|
||||
let _guard = utils::logging::init(
|
||||
utils::logging::LogFormat::Test,
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
|
||||
@@ -205,7 +205,7 @@ pub(crate) async fn upload_remote_data(
|
||||
|
||||
pub(crate) fn ensure_logging_ready() {
|
||||
LOGGING_DONE.get_or_init(|| {
|
||||
utils::logging::init(
|
||||
let _ = utils::logging::init(
|
||||
utils::logging::LogFormat::Test,
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
|
||||
@@ -60,6 +60,8 @@ const_format.workspace = true
|
||||
# to use tokio channels as streams, this is faster to compile than async_stream
|
||||
# why is it only here? no other crate should use it, streams are rarely needed.
|
||||
tokio-stream = { version = "0.1.14" }
|
||||
tracing-chrome = "0.7.1"
|
||||
tracing-flame = "0.2.0"
|
||||
|
||||
serde_path_to_error.workspace = true
|
||||
|
||||
|
||||
48
libs/utils/src/env_config.rs
Normal file
48
libs/utils/src/env_config.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
use std::{fmt::Display, str::FromStr};
|
||||
|
||||
pub fn var<V, E, D>(varname: &str, default: D) -> V
|
||||
where
|
||||
V: FromStr<Err = E>,
|
||||
E: Display,
|
||||
D: FnOnce() -> V,
|
||||
{
|
||||
match std::env::var(varname) {
|
||||
Ok(s) => s
|
||||
.parse()
|
||||
.map_err(|e| format!("failed to parse env var {varname}: {e:#}"))
|
||||
.unwrap(),
|
||||
Err(std::env::VarError::NotPresent) => default(),
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var {varname} is not unicode")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Bool(bool);
|
||||
|
||||
impl Bool {
|
||||
pub const fn new_const<const V: bool>() -> Self {
|
||||
Bool(V)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for Bool {
|
||||
type Err = String;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
if let Ok(b) = s.parse() {
|
||||
return Ok(Bool(b));
|
||||
}
|
||||
Ok(Bool(match s {
|
||||
"0" => false,
|
||||
"1" => true,
|
||||
_ => return Err(format!("not a bool, accepting 0|1|{}|{}", false, true)),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<bool> for Bool {
|
||||
fn into(self) -> bool {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
@@ -27,6 +27,7 @@ pub mod auth;
|
||||
pub mod id;
|
||||
|
||||
mod hex;
|
||||
pub mod env_config;
|
||||
pub use hex::Hex;
|
||||
|
||||
// http endpoint utils
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
use std::str::FromStr;
|
||||
use std::{
|
||||
io::BufWriter,
|
||||
str::FromStr,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use metrics::{IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
use strum_macros::{EnumString, EnumVariantNames};
|
||||
|
||||
use super::env_config;
|
||||
|
||||
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum LogFormat {
|
||||
@@ -98,11 +104,26 @@ pub enum Output {
|
||||
Stderr,
|
||||
}
|
||||
|
||||
/// Keep alive and drop it before the program terminates.
|
||||
#[must_use]
|
||||
pub struct FlushGuard(Arc<Mutex<FlushGuardInner>>);
|
||||
|
||||
struct FlushGuardInner {
|
||||
_tracing_chrome_layer: Option<tracing_chrome::FlushGuard>,
|
||||
_tracing_flame_layer: Option<tracing_flame::FlushGuard<BufWriter<std::fs::File>>>,
|
||||
}
|
||||
|
||||
impl From<FlushGuardInner> for FlushGuard {
|
||||
fn from(value: FlushGuardInner) -> Self {
|
||||
Self(Arc::new(Mutex::new(value)))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(
|
||||
log_format: LogFormat,
|
||||
tracing_error_layer_enablement: TracingErrorLayerEnablement,
|
||||
output: Output,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> anyhow::Result<FlushGuard> {
|
||||
// We fall back to printing all spans at info-level or above if
|
||||
// the RUST_LOG environment variable is not set.
|
||||
let rust_log_env_filter = || {
|
||||
@@ -113,8 +134,28 @@ pub fn init(
|
||||
// NB: the order of the with() calls does not matter.
|
||||
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
|
||||
use tracing_subscriber::prelude::*;
|
||||
let r = tracing_subscriber::registry();
|
||||
let r = r.with({
|
||||
|
||||
// https://users.rust-lang.org/t/how-can-i-init-tracing-registry-dynamically-with-multiple-outputs/94307/6
|
||||
#[derive(Default)]
|
||||
struct LayerStack {
|
||||
layers:
|
||||
Option<Box<dyn tracing_subscriber::Layer<tracing_subscriber::Registry> + Sync + Send>>,
|
||||
}
|
||||
impl LayerStack {
|
||||
fn add_layer<L>(&mut self, new_layer: L)
|
||||
where
|
||||
L: tracing_subscriber::Layer<tracing_subscriber::Registry> + Send + Sync,
|
||||
{
|
||||
let new = match self.layers.take() {
|
||||
Some(layers) => Some(layers.and_then(new_layer).boxed()),
|
||||
None => Some(new_layer.boxed()),
|
||||
};
|
||||
self.layers = new;
|
||||
}
|
||||
}
|
||||
let mut layers = LayerStack::default();
|
||||
|
||||
layers.add_layer({
|
||||
let log_layer = tracing_subscriber::fmt::layer()
|
||||
.with_target(false)
|
||||
.with_ansi(false)
|
||||
@@ -131,17 +172,60 @@ pub fn init(
|
||||
};
|
||||
log_layer.with_filter(rust_log_env_filter())
|
||||
});
|
||||
let r = r.with(
|
||||
|
||||
layers
|
||||
.add_layer(
|
||||
TracingEventCountLayer(&TRACING_EVENT_COUNT_METRIC).with_filter(rust_log_env_filter()),
|
||||
);
|
||||
|
||||
let tracing_chrome_layer_flush_guard = if env_config::var(
|
||||
"NEON_UTILS_LOGGING_ENABLE_TRACING_CHROME",
|
||||
env_config::Bool::new_const::<false>,
|
||||
)
|
||||
.into()
|
||||
{
|
||||
let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
|
||||
.trace_style(tracing_chrome::TraceStyle::Threaded)
|
||||
.build();
|
||||
layers.add_layer(layer.with_filter(rust_log_env_filter()));
|
||||
Some(guard)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let tracing_flame_flush_guard = if env_config::var(
|
||||
"NEON_UTILS_LOGGING_ENABLE_TRACING_FLAME",
|
||||
env_config::Bool::new_const::<false>,
|
||||
)
|
||||
.into()
|
||||
{
|
||||
let (layer, guard) = tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
|
||||
let layer = layer
|
||||
.with_empty_samples(false)
|
||||
.with_module_path(false)
|
||||
.with_file_and_line(false)
|
||||
.with_threads_collapsed(true);
|
||||
layers.add_layer(layer.with_filter(rust_log_env_filter()));
|
||||
Some(guard)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
match tracing_error_layer_enablement {
|
||||
TracingErrorLayerEnablement::EnableWithRustLogFilter => r
|
||||
.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
|
||||
.init(),
|
||||
TracingErrorLayerEnablement::Disabled => r.init(),
|
||||
TracingErrorLayerEnablement::EnableWithRustLogFilter => layers
|
||||
.add_layer(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter())),
|
||||
TracingErrorLayerEnablement::Disabled => (),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
let r = tracing_subscriber::registry();
|
||||
r.with(layers.layers.expect("we add at least one layer"))
|
||||
.init();
|
||||
|
||||
Ok(FlushGuardInner {
|
||||
_tracing_chrome_layer: tracing_chrome_layer_flush_guard,
|
||||
_tracing_flame_layer: tracing_flame_flush_guard,
|
||||
}
|
||||
.into())
|
||||
}
|
||||
|
||||
/// Disable the default rust panic hook by using `set_hook`.
|
||||
|
||||
@@ -22,12 +22,16 @@ pub async fn create_zst_tarball(path: &Utf8Path, tarball: &Utf8Path) -> Result<(
|
||||
let file = OpenOptions::new()
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(&tarball)
|
||||
.await
|
||||
.with_context(|| format!("tempfile creation {tarball}"))?;
|
||||
|
||||
let buffered_file = tokio::io::BufWriter::with_capacity(
|
||||
128 * 1024, /* TODO use BUFFER_SIZE, same with other constant */
|
||||
file,
|
||||
);
|
||||
|
||||
let mut paths = Vec::new();
|
||||
for entry in WalkDir::new(path) {
|
||||
let entry = entry?;
|
||||
@@ -42,7 +46,7 @@ pub async fn create_zst_tarball(path: &Utf8Path, tarball: &Utf8Path) -> Result<(
|
||||
// Do a sort to get a more consistent listing
|
||||
paths.sort_unstable();
|
||||
let zstd = ZstdEncoder::with_quality_and_params(
|
||||
file,
|
||||
buffered_file,
|
||||
Level::Default,
|
||||
&[CParameter::enable_long_distance_matching(true)],
|
||||
);
|
||||
@@ -60,7 +64,9 @@ pub async fn create_zst_tarball(path: &Utf8Path, tarball: &Utf8Path) -> Result<(
|
||||
}
|
||||
let mut zstd = builder.into_inner().await?;
|
||||
zstd.shutdown().await?;
|
||||
let mut compressed = zstd.into_inner();
|
||||
let mut compressed_buffered: tokio::io::BufWriter<File> = zstd.into_inner();
|
||||
compressed_buffered.flush().await?;
|
||||
let mut compressed = compressed_buffered.into_inner();
|
||||
let compressed_len = compressed.metadata().await?.len();
|
||||
compressed.seek(SeekFrom::Start(0)).await?;
|
||||
Ok((compressed, compressed_len))
|
||||
|
||||
@@ -30,7 +30,7 @@ enum Args {
|
||||
}
|
||||
|
||||
fn main() {
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
logging::LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stderr,
|
||||
|
||||
@@ -104,7 +104,7 @@ fn main() -> anyhow::Result<()> {
|
||||
} else {
|
||||
TracingErrorLayerEnablement::Disabled
|
||||
};
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
conf.log_format,
|
||||
tracing_error_layer_enablement,
|
||||
logging::Output::Stdout,
|
||||
|
||||
@@ -72,10 +72,11 @@ pub async fn import_timeline_from_postgres_datadir(
|
||||
let absolute_path = entry.path();
|
||||
let relative_path = absolute_path.strip_prefix(pgdata_path)?;
|
||||
|
||||
let mut file = tokio::fs::File::open(absolute_path).await?;
|
||||
let file = tokio::fs::File::open(absolute_path).await?;
|
||||
let mut bufread = tokio::io::BufReader::with_capacity(128 * 1024, file);
|
||||
let len = metadata.len() as usize;
|
||||
if let Some(control_file) =
|
||||
import_file(&mut modification, relative_path, &mut file, len, ctx).await?
|
||||
import_file(&mut modification, relative_path, &mut bufread, len, ctx).await?
|
||||
{
|
||||
pg_control = Some(control_file);
|
||||
}
|
||||
@@ -288,15 +289,14 @@ async fn import_wal(
|
||||
}
|
||||
|
||||
// Slurp the WAL file
|
||||
let mut file = std::fs::File::open(&path)?;
|
||||
let mut file = tokio::fs::File::open(&path).await?;
|
||||
|
||||
if offset > 0 {
|
||||
use std::io::Seek;
|
||||
file.seek(std::io::SeekFrom::Start(offset as u64))?;
|
||||
use tokio::io::AsyncSeekExt;
|
||||
file.seek(std::io::SeekFrom::Start(offset as u64)).await?;
|
||||
}
|
||||
|
||||
use std::io::Read;
|
||||
let nread = file.read_to_end(&mut buf)?;
|
||||
let nread = file.read_to_end(&mut buf).await?;
|
||||
if nread != WAL_SEGMENT_SIZE - offset {
|
||||
// Maybe allow this for .partial files?
|
||||
error!("read only {} bytes from WAL file", nread);
|
||||
|
||||
@@ -3186,10 +3186,10 @@ impl Tenant {
|
||||
run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
|
||||
|
||||
// Upload the created data dir to S3
|
||||
if self.tenant_shard_id().is_zero() {
|
||||
self.upload_initdb(&timelines_path, &pgdata_path, &timeline_id)
|
||||
.await?;
|
||||
}
|
||||
// if self.tenant_shard_id().is_zero() {
|
||||
// self.upload_initdb(&timelines_path, &pgdata_path, &timeline_id)
|
||||
// .await?;
|
||||
// }
|
||||
}
|
||||
let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
|
||||
|
||||
@@ -3671,7 +3671,7 @@ pub(crate) mod harness {
|
||||
pub deletion_queue: MockDeletionQueue,
|
||||
}
|
||||
|
||||
static LOG_HANDLE: OnceCell<()> = OnceCell::new();
|
||||
static LOG_HANDLE: OnceCell<logging::FlushGuard> = OnceCell::new();
|
||||
|
||||
pub(crate) fn setup_logging() {
|
||||
LOG_HANDLE.get_or_init(|| {
|
||||
|
||||
@@ -219,7 +219,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
// 1. init logging
|
||||
// 2. tracing panic hook
|
||||
// 3. sentry
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
LogFormat::from_config(&args.log_format)?,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
|
||||
@@ -632,7 +632,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// 1. init logging
|
||||
// 2. tracing panic hook
|
||||
// 3. sentry
|
||||
logging::init(
|
||||
let _guard = logging::init(
|
||||
LogFormat::from_config(&args.log_format)?,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
|
||||
Reference in New Issue
Block a user