Compare commits

...

2 Commits

Author SHA1 Message Date
Kirill Bulatov
cafffd3a93 Code review fixes: test log toggling for multiple timelines 2023-02-07 13:27:02 +02:00
Kirill Bulatov
65fe6b5f90 Add a new HTTP management API to toggle pageserver logs 2023-02-07 13:27:00 +02:00
16 changed files with 1093 additions and 86 deletions

112
Cargo.lock generated
View File

@@ -65,9 +65,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
[[package]]
name = "anyhow"
version = "1.0.68"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61"
checksum = "224afbd727c3d6e4b90103ece64b8d1b67fbb1973b1046c2281eed3f3803f800"
dependencies = [
"backtrace",
]
@@ -861,7 +861,7 @@ dependencies = [
"tokio-postgres",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"tracing-subscriber 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)",
"tracing-utils",
"url",
"workspace_hack",
@@ -1136,9 +1136,9 @@ dependencies = [
[[package]]
name = "darling"
version = "0.14.2"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0dd3cd20dc6b5a876612a6e5accfe7f3dd883db6d07acfbf14c128f61550dfa"
checksum = "c0808e1bd8671fb44a113a14e13497557533369847788fa2ae912b6ebfce9fa8"
dependencies = [
"darling_core",
"darling_macro",
@@ -1146,9 +1146,9 @@ dependencies = [
[[package]]
name = "darling_core"
version = "0.14.2"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a784d2ccaf7c98501746bf0be29b2022ba41fd62a2e622af997a03e9f972859f"
checksum = "001d80444f28e193f30c2f293455da62dcf9a6b29918a4253152ae2b1de592cb"
dependencies = [
"fnv",
"ident_case",
@@ -1160,9 +1160,9 @@ dependencies = [
[[package]]
name = "darling_macro"
version = "0.14.2"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7618812407e9402654622dd402b0a89dff9ba93badd6540781526117b92aab7e"
checksum = "b36230598a2d5de7ec1c6f51f72d8a99a9208daff41de2084d06e3fd3ea56685"
dependencies = [
"darling_core",
"quote",
@@ -1649,6 +1649,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "856b5cb0902c2b6d65d5fd97dfa30f9b70c7538e770b98eab5ed52d8db923e01"
[[package]]
name = "hex"
version = "0.4.3"
@@ -1742,9 +1748,9 @@ dependencies = [
[[package]]
name = "hyper"
version = "0.14.23"
version = "0.14.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c"
checksum = "5e011372fa0b68db8350aa7a248930ecc7839bf46d8485577d69f117a75f164c"
dependencies = [
"bytes",
"futures-channel",
@@ -1886,12 +1892,12 @@ dependencies = [
[[package]]
name = "io-lifetimes"
version = "1.0.4"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e"
checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3"
dependencies = [
"libc",
"windows-sys 0.42.0",
"windows-sys 0.45.0",
]
[[package]]
@@ -1902,14 +1908,14 @@ checksum = "30e22bd8629359895450b59ea7a776c850561b96a3b1d31321c1949d9e6c9146"
[[package]]
name = "is-terminal"
version = "0.4.2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28dfb6c8100ccc63462345b67d1bbc3679177c75ee4bf59bf29c8b1d110b8189"
checksum = "22e18b0a45d56fe973d6db23972bf5bc46f988a4a2385deac9cc29572f09daef"
dependencies = [
"hermit-abi 0.2.6",
"hermit-abi 0.3.0",
"io-lifetimes",
"rustix",
"windows-sys 0.42.0",
"windows-sys 0.45.0",
]
[[package]]
@@ -2117,9 +2123,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "miniz_oxide"
version = "0.6.4"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2e212582ede878b109755efd0773a4f0f4ec851584cf0aefbeb4d9ecc114822"
checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa"
dependencies = [
"adler",
]
@@ -2535,9 +2541,9 @@ checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "petgraph"
version = "0.6.2"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143"
checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4"
dependencies = [
"fixedbitset",
"indexmap",
@@ -2763,9 +2769,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068"
[[package]]
name = "proc-macro2"
version = "1.0.50"
version = "1.0.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ef7d57beacfaf2d8aee5937dab7b7f28de3cb8b1828479bb5de2a7106f2bae2"
checksum = "5d727cae5b39d21da60fa540906919ad737832fe0b1c165da3a34d6548c849d6"
dependencies = [
"unicode-ident",
]
@@ -2905,7 +2911,7 @@ dependencies = [
"tokio-postgres-rustls",
"tokio-rustls",
"tracing",
"tracing-subscriber",
"tracing-subscriber 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)",
"url",
"utils",
"uuid",
@@ -3189,16 +3195,16 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.36.7"
version = "0.36.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03"
checksum = "f43abb88211988493c1abb44a70efa56ff0ce98f233b7b276146f1f3f7ba9644"
dependencies = [
"bitflags",
"errno",
"io-lifetimes",
"libc",
"linux-raw-sys",
"windows-sys 0.42.0",
"windows-sys 0.45.0",
]
[[package]]
@@ -3485,9 +3491,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.91"
version = "1.0.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883"
checksum = "7434af0dc1cbd59268aa98b4c22c131c0584d2232f6fb166efb993e2832e896a"
dependencies = [
"itoa",
"ryu",
@@ -3900,9 +3906,9 @@ dependencies = [
[[package]]
name = "tinyvec_macros"
version = "0.1.0"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tls-listener"
@@ -4253,6 +4259,16 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-log"
version = "0.1.3"
source = "git+https://github.com/neondatabase/tracing.git?branch=kb/dynamic-filter-may-override-static#668be3b81a210d1797402628b51c4dd46953fbfb"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-opentelemetry"
version = "0.18.0"
@@ -4263,15 +4279,14 @@ dependencies = [
"opentelemetry",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
"tracing-log 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"tracing-subscriber 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tracing-serde"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1"
source = "git+https://github.com/neondatabase/tracing.git?branch=kb/dynamic-filter-may-override-static#668be3b81a210d1797402628b51c4dd46953fbfb"
dependencies = [
"serde",
"tracing-core",
@@ -4282,6 +4297,23 @@ name = "tracing-subscriber"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6176eae26dd70d0c919749377897b54a9276bd7061339665dd68777926b5a70"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.16"
source = "git+https://github.com/neondatabase/tracing.git?branch=kb/dynamic-filter-may-override-static#668be3b81a210d1797402628b51c4dd46953fbfb"
dependencies = [
"matchers",
"nu-ansi-term",
@@ -4294,7 +4326,7 @@ dependencies = [
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-log 0.1.3 (git+https://github.com/neondatabase/tracing.git?branch=kb/dynamic-filter-may-override-static)",
"tracing-serde",
]
@@ -4310,7 +4342,7 @@ dependencies = [
"tokio",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"tracing-subscriber 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)",
"workspace_hack",
]
@@ -4470,7 +4502,7 @@ dependencies = [
"tokio",
"tokio-rustls",
"tracing",
"tracing-subscriber",
"tracing-subscriber 0.3.16 (git+https://github.com/neondatabase/tracing.git?branch=kb/dynamic-filter-may-override-static)",
"url",
"workspace_hack",
]
@@ -4782,7 +4814,6 @@ dependencies = [
"futures-executor",
"futures-util",
"hashbrown 0.12.3",
"indexmap",
"itertools",
"libc",
"log",
@@ -4809,7 +4840,6 @@ dependencies = [
"tower",
"tracing",
"tracing-core",
"tracing-subscriber",
"url",
]

View File

@@ -154,6 +154,12 @@ tonic-build = "0.8"
[patch.crates-io]
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="43e6db254a97fdecbce33d8bc0890accfd74495e" }
# Make tracing fork to use common base crates.
# See fork version usages for more info why this fork is needed.
[patch.'https://github.com/neondatabase/tracing.git']
tracing = "0.1.37"
tracing-core = "0.1.30"
################# Binary contents sections
[profile.release]

View File

@@ -10,6 +10,7 @@ use serde_with::{serde_as, DisplayFromStr};
use utils::{
history_buffer::HistoryBufferWithDropCounter,
id::{NodeId, TenantId, TimelineId},
logging::{Directive, Level},
lsn::Lsn,
};
@@ -17,6 +18,52 @@ use crate::reltag::RelTag;
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
/// A way to change pageserver's log level in runtime.
#[serde_as]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum ChangeLogLevelRequest {
/// Arbitrary [`Directive`] string, in format `target[span{field=value}]=level`.
/// E.g. `error`, `pageserver=debug`, `[{tenant=98d670ab7bee6f0051494306a1ab888f}]=warn`
///
/// Note that you cannot have `,` in a [`Directive`], so `error,pageserver=debug` is not a valid directive.
Custom {
#[serde_as(as = "DisplayFromStr")]
directive: Directive,
/// `true` value applies the log level, `false` value removes it (if applied before)
enabled: bool,
},
/// A few pageserver-specific log filters, able to expande into [`Directive`].
Predefined {
/// `Some` value applies the log level, `None` value removes it (if applied before)
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
log_level: Option<Level>,
/// A scope of the log level to apply to.
scope: Scope,
},
}
/// A scope in the pageserver, having the same log level.
#[serde_as]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum Scope {
/// Try to filter all logs with certain tenant_id field value span fields.
Tenant {
#[serde_as(as = "DisplayFromStr")]
tenant_id: TenantId,
},
/// Try to filter all logs with certain timeline_id in their span fields.
Timeline {
#[serde_as(as = "DisplayFromStr")]
tenant_id: TenantId,
#[serde_as(as = "DisplayFromStr")]
timeline_id: TimelineId,
},
}
/// A state of a tenant in pageserver's memory.
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum TenantState {

View File

@@ -19,8 +19,6 @@ serde_json.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-rustls.workspace = true
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["json"] }
nix.workspace = true
signal-hook.workspace = true
rand.workspace = true
@@ -33,10 +31,35 @@ serde_with.workspace = true
once_cell.workspace = true
strum.workspace = true
strum_macros.workspace = true
metrics.workspace = true
pq_proto.workspace = true
tracing.workspace = true
# On every log event like `test_event` below
# ```rust
# let _entered = info_span!("timeline_detail", tenant = %tenant_id, timeline = %timeline_id).entered();
# info!("test_event");
# ```
# there are static values: `info` log level, `timeline_detail` span name, etc.
# and dynamic values: field names (`tenant`, `timeline`) and their values.
#
# Tracing allows the later to change during runtime, and generally cannot statically understand what's behind `%timeline_id` format call.
# Due to that, env filters like `error,[{tenant=98d670ab7bee6f0051494306a1ab888f}]=debug` are split into two categories.
# Static, `error` (another example could be `timeline_detail=error`) one that filters based on static event values,
# and dynamic, `[{tenant=98d670ab7bee6f0051494306a1ab888f}]=debug`, that would require tracing to resolve the filter on every span
# that might fit to the filter: some span with `tenant` field name and a dynamic value to resolve.
#
# Current tracing optimizes the span resolution, and filters out all `[{tenant=98d670ab7bee6f0051494306a1ab888f}]=debug` events because
# there's a static filter with stricter log level: the global `error` one.
# Patched version forces tracing to consider such dynamic filters as potential exceptions and check more potentially matching events
# against it.
# There's a patch to upstream, that makes that, but does not cover the performance implication in detail, hence not reviewed yet.
# Use a fork with the patch merged to enable the dynamic log reloading feature.
#
# TODO upstream dynamic filter change, see https://github.com/tokio-rs/tracing/issues/1388 for more details.
tracing-subscriber = { version = "0.3", git = "https://github.com/neondatabase/tracing.git", branch = "kb/dynamic-filter-may-override-static", features = ["env-filter", "json"] }
workspace_hack.workspace = true
url.workspace = true

View File

@@ -1,13 +1,28 @@
use std::str::FromStr;
use std::{
str::FromStr,
sync::{Arc, Mutex},
};
use anyhow::Context;
use strum_macros::{EnumString, EnumVariantNames};
use tracing_subscriber::{
fmt::{format, Layer, TestWriter},
layer::Layered,
reload, Registry,
};
pub use tracing::Level;
pub use tracing_subscriber::{filter::Directive, EnvFilter};
pub const DEFAULT_LOG_LEVEL: Level = Level::INFO;
#[derive(EnumString, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy)]
#[strum(serialize_all = "snake_case")]
pub enum LogFormat {
Plain,
Json,
/// Makes tracing to output logs similar way the std test crate does,
/// see [`TestWriter`] for the details.
Test,
}
@@ -23,25 +38,100 @@ impl LogFormat {
}
}
pub fn init(log_format: LogFormat) -> anyhow::Result<()> {
let default_filter_str = "info";
type PlainSubscriber = Layered<Layer<Registry>, Registry>;
type StdoutWriter = fn() -> std::io::Stdout;
type JsonSubscriber = Layered<
Layer<Registry, format::JsonFields, format::Format<format::Json>, StdoutWriter>,
Registry,
>;
type TestSubscriber =
Layered<Layer<Registry, format::DefaultFields, format::Format, TestWriter>, Registry>;
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_filter_str));
/// A helper enum to ease log reloads.
/// Tracing [0.4](https://github.com/tokio-rs/tracing/milestone/11)
/// includes https://github.com/tokio-rs/tracing/pull/1035
/// that makes this enum and related types obsolete.
#[derive(Clone)]
pub enum LogReloadHandle {
Plain(reload::Handle<EnvFilter, PlainSubscriber>),
Test(reload::Handle<EnvFilter, TestSubscriber>),
Json(reload::Handle<EnvFilter, JsonSubscriber>),
/// Allows to update and read its [`EnvFilter`] state via the [`reload::Handle`]-like API,
/// but does not actually affect any logging.
/// Could be used in any unit tests that wrap [`LogReloadHandle`] around.
Noop(Arc<Mutex<EnvFilter>>),
}
let base_logger = tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout);
match log_format {
LogFormat::Json => base_logger.json().init(),
LogFormat::Plain => base_logger.init(),
LogFormat::Test => base_logger.with_test_writer().init(),
impl LogReloadHandle {
pub fn noop() -> Self {
Self::Noop(Arc::new(Mutex::new(initial_env_filter())))
}
Ok(())
pub fn reload(&self, new_value: impl Into<EnvFilter>) -> Result<(), reload::Error> {
match self {
Self::Plain(p) => p.reload(new_value),
Self::Json(j) => j.reload(new_value),
Self::Test(t) => t.reload(new_value),
Self::Noop(state) => {
*state.lock().unwrap() = new_value.into();
Ok(())
}
}
}
pub fn modify(&self, f: impl FnOnce(&mut EnvFilter)) -> Result<(), reload::Error> {
match self {
Self::Plain(p) => p.modify(f),
Self::Json(j) => j.modify(f),
Self::Test(t) => t.modify(f),
Self::Noop(state) => {
f(&mut state.lock().unwrap());
Ok(())
}
}
}
pub fn with_current<T>(&self, f: impl FnOnce(&EnvFilter) -> T) -> Result<T, reload::Error> {
match self {
Self::Plain(p) => p.with_current(f),
Self::Json(j) => j.with_current(f),
Self::Test(t) => t.with_current(f),
Self::Noop(state) => Ok(f(&state.lock().unwrap())),
}
}
}
/// We fall back to printing all spans at [`DEFAULT_LOG_LEVEL`] or above if `RUST_LOG` environment variable is not set.
pub fn initial_env_filter() -> EnvFilter {
EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new(DEFAULT_LOG_LEVEL.to_string()))
}
pub fn init(log_format: LogFormat) -> LogReloadHandle {
let base_logger = tracing_subscriber::fmt()
.with_env_filter(initial_env_filter())
.with_target(false)
.with_ansi(atty::is(atty::Stream::Stdout))
.with_writer(std::io::stdout as StdoutWriter);
match log_format {
LogFormat::Json => {
let json = base_logger.json().with_filter_reloading();
let handle = json.reload_handle();
json.init();
LogReloadHandle::Json(handle)
}
LogFormat::Plain => {
let plain = base_logger.with_filter_reloading();
let handle = plain.reload_handle();
plain.init();
LogReloadHandle::Plain(handle)
}
LogFormat::Test => {
let test = base_logger.with_test_writer().with_filter_reloading();
let handle = test.reload_handle();
test.init();
LogReloadHandle::Test(handle)
}
}
}

View File

@@ -211,7 +211,7 @@ fn start_pageserver(
conf: &'static PageServerConf,
) -> anyhow::Result<()> {
// Initialize logging
logging::init(conf.log_format)?;
let reload_handle = logging::init(conf.log_format);
// Print version and launch timestamp to the log,
// and expose them as prometheus metrics.
@@ -321,9 +321,10 @@ fn start_pageserver(
{
let _rt_guard = MGMT_REQUEST_RUNTIME.enter();
let router = http::make_router(conf, launch_ts, auth.clone(), remote_storage)?
.build()
.map_err(|err| anyhow!(err))?;
let router =
http::make_router(conf, launch_ts, auth.clone(), remote_storage, reload_handle)?
.build()
.map_err(|err| anyhow!(err))?;
let service = utils::http::RouterService::new(router).unwrap();
let server = hyper::Server::from_tcp(http_listener)?
.serve(service)

View File

@@ -0,0 +1,496 @@
//! A simple log reloading mechanism, that uses [`LogReloadHandle`] received from `tracing` to toggle
//! different log levels for various pageserver logical parts.
//!
//! TODO Currently, not persistent and gets reset on pageserver restart, and allows to have
//! log filters for tenants and timelines, that do not exist on the pageserver.
use std::{
collections::{hash_map, BTreeSet, HashMap},
str::FromStr,
};
use anyhow::Context;
use pageserver_api::models::{ChangeLogLevelRequest, Scope};
use utils::{
id::{TenantId, TimelineId},
logging::{
initial_env_filter, Directive, EnvFilter, Level, LogReloadHandle, DEFAULT_LOG_LEVEL,
},
};
/// A way to update view, (re)load and reset custom pageserver log filters, applied dynamically.
///
/// TODO it would be nicer to wrap [`EnvFilter`] directly and modify/reload it, but
/// it has no `Clone` method to get the filter copy, neither there is any "view"
/// data struct for such filter in `tracing`.
///
/// Similarly, lower `update_filers` method could visit `EnvFilter` directly and adjust it, but there's no way
/// to publicly amend these, only replace and even inside, `Directive` has similar issues:
/// https://github.com/tokio-rs/tracing/blob/dd676608528847addf9187dd7e104955b563e550/tracing-subscriber/src/filter/env/directive.rs#L13
/// So the module uses strings and public builder API to construct new filters to replace the old ones.
///
/// Also, the filter applying logic might be improved:
/// https://github.com/tokio-rs/tracing/issues/2320
/// so we keep the wrapper for a while.
pub struct LogFilterManager {
log_reload_handle: LogReloadHandle,
directives: CustomFilterDirectives,
}
impl LogFilterManager {
pub fn new(log_reload_handle: LogReloadHandle) -> Self {
Self {
log_reload_handle,
directives: CustomFilterDirectives {
general: BTreeSet::new(),
tenant_timeline: HashMap::new(),
},
}
}
pub fn current_log_filter(&self) -> EnvFilter {
self.log_reload_handle
.with_current(|current_filter| current_filter.to_string())
.expect("Failed to get handle's current filter")
.parse()
.expect("Filter failed to parse its own string representation")
}
/// Applies the filter update, adding a new one or removing an existing one from the filter directives.
pub fn update_filter(&mut self, filter_update: ChangeLogLevelRequest) -> anyhow::Result<bool> {
if self.directives.update(filter_update) {
let new_filter = self.directives.create_filter();
self.log_reload_handle
.reload(new_filter)
.context("Failed to reload log filters with new value: {new_filter}")?;
Ok(true)
} else {
Ok(false)
}
}
/// Removes all custom filter directives, falling back to the initial state.
pub fn reset_log_filter(&mut self) -> anyhow::Result<()> {
self.directives.clear();
let new_filter = self.directives.create_filter();
self.log_reload_handle
.reload(new_filter)
.context("Failed to reload log filters with new value: {new_filter}")
}
}
/// A dynamic set of log filtering directives to use for global log filtering.
/// Global [`Directive`] pattern: `target[span{field1=value1,field2=value2}]=level`
/// is generated based on every directive added.
#[derive(Debug, Default)]
struct CustomFilterDirectives {
/// E.g. `info`, `hyper=debug`, even `[{tenant=98d670ab7bee6f0051494306a1ab888f}]=error`,
/// despite the possibility to add it in a better typed way.
general: BTreeSet<Directive>,
/// `[{tenant=98d670ab7bee6f0051494306a1ab888f}]=error` or `[{timeline=aa043653cb6f6189e17bab2fd0f30e1e}]=error`
/// Multiple fields in the filter are not suppoered yet, see https://github.com/tokio-rs/tracing/issues/1584
tenant_timeline: HashMap<TenantId, TenantFilterDirective>,
}
/// A tenant and related timeline log filters.
#[derive(Debug, Default)]
struct TenantFilterDirective {
/// Tenant-wide log filter can be toggled independently of the timeline ones.
/// If both enabled, the final log filter could look like
/// `[{tenant=98d670ab7bee6f0051494306a1ab888f}]=error,[{timeline=aa043653cb6f6189e17bab2fd0f30e1e}]=debug,[{timeline=...`
level: Option<Level>,
timeline_directives: HashMap<TimelineId, Option<Level>>,
}
fn tenant_directive(tenant_id: TenantId, level: Level) -> Directive {
Directive::from_str(&format!("[{{tenant={tenant_id}}}]={level}"))
.expect("Tenant log filter directive should be correct")
}
fn timeline_directive(timeline_id: TimelineId, level: Level) -> Directive {
Directive::from_str(&format!("[{{timeline={timeline_id}}}]={level}"))
.expect("Timeline log filter directive should be correct")
}
impl CustomFilterDirectives {
fn create_filter(&self) -> EnvFilter {
let initial_env_filter = initial_env_filter();
if self.is_empty() {
return initial_env_filter;
}
let global_level = initial_env_filter
.max_level_hint()
.and_then(|level_filter| level_filter.into_level())
.unwrap_or(DEFAULT_LOG_LEVEL);
let mut env_filter = EnvFilter::builder()
.with_regex(false)
.with_default_directive(Directive::from(global_level))
.parse("")
.expect("Default env filter should be parsed");
for log_override in self.general.clone() {
env_filter = env_filter.add_directive(log_override)
}
for (&tenant_id, tenant_entry) in &self.tenant_timeline {
if let Some(tenant_log_level) = tenant_entry.level {
env_filter =
env_filter.add_directive(tenant_directive(tenant_id, tenant_log_level));
}
for (&timeline_id, &timeline_level) in &tenant_entry.timeline_directives {
if let Some(timeline_level) = timeline_level {
env_filter =
env_filter.add_directive(timeline_directive(timeline_id, timeline_level));
}
}
}
env_filter
}
fn is_empty(&self) -> bool {
self.general.is_empty() && self.tenant_timeline.is_empty()
}
fn clear(&mut self) {
self.general.clear();
self.tenant_timeline.clear();
}
fn update(&mut self, update: ChangeLogLevelRequest) -> bool {
match update {
ChangeLogLevelRequest::Custom { directive, enabled } => {
if enabled {
self.general.insert(directive)
} else {
self.general.remove(&directive)
}
}
ChangeLogLevelRequest::Predefined { log_level, scope } => {
let current_level = match scope {
Scope::Tenant { tenant_id } => {
&mut self.tenant_timeline.entry(tenant_id).or_default().level
}
Scope::Timeline {
tenant_id,
timeline_id,
} => self
.tenant_timeline
.entry(tenant_id)
.or_default()
.timeline_directives
.entry(timeline_id)
.or_default(),
};
let updated = *current_level != log_level;
*current_level = log_level;
self.clean_empty_entry(scope);
updated
}
}
}
fn clean_empty_entry(&mut self, update_scope: Scope) {
match update_scope {
Scope::Tenant { tenant_id } => {
if let hash_map::Entry::Occupied(mut tenant_o) =
self.tenant_timeline.entry(tenant_id)
{
let tenant_entry = tenant_o.get_mut();
if tenant_entry.level.is_none() && tenant_entry.timeline_directives.is_empty() {
tenant_o.remove();
}
}
}
Scope::Timeline {
tenant_id,
timeline_id,
} => {
if let hash_map::Entry::Occupied(mut tenant_o) =
self.tenant_timeline.entry(tenant_id)
{
let tenant_entry = tenant_o.get_mut();
if let hash_map::Entry::Occupied(timeline_o) =
tenant_entry.timeline_directives.entry(timeline_id)
{
if timeline_o.get().is_none() {
timeline_o.remove();
}
}
if tenant_entry.level.is_none() && tenant_entry.timeline_directives.is_empty() {
tenant_o.remove();
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn log_filter_additions_and_noop_general() {
let mut filters = test_log_filters();
let expected_filter = EnvFilter::try_new(DEFAULT_LOG_LEVEL.to_string())
.unwrap()
.to_string();
assert_eq!(
filters.current_log_filter().to_string(),
expected_filter,
"Freshly created LogFilters should not override the default level"
);
filters
.update_filter(ChangeLogLevelRequest::Custom {
directive: "pageserver=debug".parse().unwrap(),
enabled: false,
})
.unwrap();
for scope in [
Scope::Tenant {
tenant_id: TenantId::generate(),
},
Scope::Timeline {
tenant_id: TenantId::generate(),
timeline_id: TimelineId::generate(),
},
] {
filters
.update_filter(ChangeLogLevelRequest::Predefined {
log_level: None,
scope,
})
.unwrap();
}
assert_eq!(
filters.current_log_filter().to_string(),
expected_filter,
"LogFilters should not get udpated when new disabled custom filters or scoped ones without the log level are added"
);
}
#[test]
fn log_filter_custom_directive_additions_and_noop_readditions() {
let mut filters = test_log_filters();
let mut expected_filter = EnvFilter::try_new(DEFAULT_LOG_LEVEL.to_string()).unwrap();
let new_directive: Directive = "pageserver=debug".parse().unwrap();
filters
.update_filter(ChangeLogLevelRequest::Custom {
directive: new_directive.clone(),
enabled: true,
})
.unwrap();
expected_filter = expected_filter.add_directive(new_directive.clone());
let updated_directives_string = expected_filter.to_string();
assert_eq!(
updated_directives_string,
filters.current_log_filter().to_string(),
"Adding a new enabled custom directive should add it to the filter list"
);
filters
.update_filter(ChangeLogLevelRequest::Custom {
directive: new_directive,
enabled: true,
})
.unwrap();
assert_eq!(
updated_directives_string,
filters.current_log_filter().to_string(),
"Re-adding the same enabled filter is a noop"
);
}
#[test]
fn log_filter_scoped_additions_and_noop_readditions() {
let mut filters = test_log_filters();
let mut expected_filter = EnvFilter::try_new(DEFAULT_LOG_LEVEL.to_string()).unwrap();
let level = Level::DEBUG;
let tenant_id = TenantId::generate();
filters
.update_filter(ChangeLogLevelRequest::Predefined {
log_level: Some(Level::DEBUG),
scope: Scope::Tenant { tenant_id },
})
.unwrap();
expected_filter = expected_filter.add_directive(tenant_directive(tenant_id, level));
let updated_directives_string = expected_filter.to_string();
assert_eq!(
updated_directives_string,
filters.current_log_filter().to_string(),
"Adding a new enabled custom directive should add it to the filter list"
);
filters
.update_filter(ChangeLogLevelRequest::Predefined {
log_level: Some(Level::DEBUG),
scope: Scope::Tenant { tenant_id },
})
.unwrap();
assert_eq!(
updated_directives_string,
filters.current_log_filter().to_string(),
"Re-adding the same enabled filter is a noop"
);
}
#[test]
fn log_filters_toggling() {
let mut filters = test_log_filters();
let directive_to_toggle: Directive = "hyper=warn".parse().unwrap();
let other_directive: Directive = "pageserver=debug".parse().unwrap();
let tenant_to_toggle = TenantId::generate();
let other_tenant = TenantId::generate();
let tenant_with_no_log_level = TenantId::generate();
let timeline_to_toggle = TimelineId::generate();
let other_timeline = TimelineId::generate();
for request in [
ChangeLogLevelRequest::Custom {
directive: directive_to_toggle.clone(),
enabled: true,
},
ChangeLogLevelRequest::Custom {
directive: other_directive,
enabled: true,
},
//
ChangeLogLevelRequest::Predefined {
log_level: Some(Level::DEBUG),
scope: Scope::Tenant {
tenant_id: tenant_to_toggle,
},
},
ChangeLogLevelRequest::Predefined {
log_level: Some(Level::INFO),
scope: Scope::Tenant {
tenant_id: other_tenant,
},
},
//
ChangeLogLevelRequest::Predefined {
log_level: Some(Level::WARN), // timeline_to_toggle's log level
scope: Scope::Timeline {
tenant_id: tenant_with_no_log_level,
timeline_id: timeline_to_toggle,
},
},
ChangeLogLevelRequest::Predefined {
log_level: Some(Level::WARN), // other_timeline's log level
scope: Scope::Timeline {
tenant_id: tenant_with_no_log_level,
timeline_id: other_timeline,
},
},
] {
let update_happened = filters.update_filter(request.clone()).unwrap_or_else(|e| {
panic!("Standard log level filter {request:?} update caused an error: {e}")
});
assert!(
update_happened,
"Expected to enable the filter with request {request:?}"
);
}
let filled_filters = filters.directives.create_filter().to_string();
assert!(
!filled_filters.contains(&tenant_with_no_log_level.to_string()),
"'{filled_filters}' string should not contain filter for tenant {tenant_with_no_log_level} since it had no logs explicitly enabled"
);
assert!(
filled_filters.contains(&tenant_to_toggle.to_string()),
"'{filled_filters}' string should contain filter for tenant {tenant_to_toggle}"
);
assert!(
filled_filters.contains(&other_tenant.to_string()),
"'{filled_filters}' string should contain filter for tenant {other_tenant}"
);
assert!(
filled_filters.contains(&timeline_to_toggle.to_string()),
"'{filled_filters}' string should contain filter for timeline {timeline_to_toggle}"
);
assert!(
filled_filters.contains(&other_timeline.to_string()),
"'{filled_filters}' string should contain filter for timeline {other_timeline}"
);
for request in [
ChangeLogLevelRequest::Custom {
directive: directive_to_toggle,
enabled: false,
},
ChangeLogLevelRequest::Predefined {
log_level: None,
scope: Scope::Tenant {
tenant_id: tenant_to_toggle,
},
},
ChangeLogLevelRequest::Predefined {
log_level: None,
scope: Scope::Timeline {
tenant_id: tenant_with_no_log_level,
timeline_id: timeline_to_toggle,
},
},
] {
let update_happened = filters.update_filter(request.clone()).unwrap_or_else(|e| {
panic!("Standard log level filter {request:?} update caused an error: {e}")
});
assert!(
update_happened,
"Expected to disable the filter with request {request:?}"
);
}
let toggled_filters = filters.directives.create_filter().to_string();
assert!(
!toggled_filters.contains(&tenant_with_no_log_level.to_string()),
"'{filled_filters}' string should not contain filter for tenant {tenant_with_no_log_level} since it had no logs explicitly enabled"
);
assert!(
!toggled_filters.contains(&tenant_to_toggle.to_string()),
"'{filled_filters}' string should not have tenant {tenant_to_toggle} that got its log level toggled back"
);
assert!(
toggled_filters.contains(&other_tenant.to_string()),
"'{filled_filters}' string should still contain tenant {other_tenant} that was not affected by toggling"
);
assert!(
!toggled_filters.contains(&timeline_to_toggle.to_string()),
"'{filled_filters}' string should not contain timeline {timeline_to_toggle} that got its log level toggled back"
);
assert!(
toggled_filters.contains(&other_timeline.to_string()),
"'{filled_filters}' string should still contain timeline {other_timeline} that was not affected by toggling"
);
let default_log_filter = EnvFilter::try_new(DEFAULT_LOG_LEVEL.to_string())
.unwrap()
.to_string();
assert_ne!(default_log_filter, filters.current_log_filter().to_string());
filters.reset_log_filter().unwrap();
assert_eq!(
default_log_filter,
filters.current_log_filter().to_string(),
"Resetting the filters should result in the default resulting filter"
);
}
fn test_log_filters() -> LogFilterManager {
LogFilterManager::new(LogReloadHandle::noop())
}
}

View File

@@ -1,4 +1,5 @@
mod log_reload;
pub mod routes;
pub use routes::make_router;
pub use pageserver_api::models;
pub use routes::make_router;

View File

@@ -1,16 +1,18 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use anyhow::{anyhow, Context, Result};
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use pageserver_api::models::{ChangeLogLevelRequest, DownloadRemoteLayersTaskSpawnRequest};
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
use utils::logging::LogReloadHandle;
use super::log_reload::LogFilterManager;
use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest, TimelineInfo,
@@ -45,6 +47,7 @@ struct State {
auth: Option<Arc<JwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
log_filter_manager: Mutex<LogFilterManager>,
}
impl State {
@@ -52,6 +55,7 @@ impl State {
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
log_reload_handle: LogReloadHandle,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
.iter()
@@ -62,6 +66,7 @@ impl State {
auth,
allowlist_routes,
remote_storage,
log_filter_manager: Mutex::new(LogFilterManager::new(log_reload_handle)),
})
}
}
@@ -808,6 +813,47 @@ async fn update_tenant_config_handler(
json_response(StatusCode::OK, ())
}
async fn get_log_filter_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let state = get_state(&request);
let log_filter_string = state
.log_filter_manager
.lock()
.unwrap()
.current_log_filter()
.to_string();
json_response(StatusCode::OK, log_filter_string)
}
async fn set_log_filter_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let request_data: ChangeLogLevelRequest = json_request(&mut request).await?;
let state = get_state(&request);
let update_happened = state
.log_filter_manager
.lock()
.unwrap()
.update_filter(request_data)
.context("log filter update")
.map_err(ApiError::InternalServerError)?;
if update_happened {
json_response(StatusCode::CREATED, ())
} else {
json_response(StatusCode::OK, ())
}
}
async fn reset_log_filter_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let state = get_state(&request);
state
.log_filter_manager
.lock()
.unwrap()
.reset_log_filter()
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
#[cfg(feature = "testing")]
async fn failpoints_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
@@ -958,6 +1004,7 @@ pub fn make_router(
launch_ts: &'static LaunchTimestamp,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
log_reload_handle: LogReloadHandle,
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
let spec = include_bytes!("openapi_spec.yml");
let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
@@ -1001,13 +1048,17 @@ pub fn make_router(
Ok(router
.data(Arc::new(
State::new(conf, auth, remote_storage).context("Failed to initialize router state")?,
State::new(conf, auth, remote_storage, log_reload_handle)
.context("Failed to initialize router state")?,
))
.get("/v1/status", status_handler)
.put(
"/v1/failpoints",
testing_api!("manage failpoints", failpoints_handler),
)
.get("/v1/log_filter", get_log_filter_handler)
.put("/v1/log_filter", set_log_filter_handler)
.post("/v1/reset_log_filter", reset_log_filter_handler)
.get("/v1/tenant", tenant_list_handler)
.post("/v1/tenant", tenant_create_handler)
.get("/v1/tenant/:tenant_id", tenant_status)

View File

@@ -2789,7 +2789,7 @@ pub mod harness {
};
LOG_HANDLE.get_or_init(|| {
logging::init(logging::LogFormat::Test).expect("Failed to init test logging")
let _ = logging::init(logging::LogFormat::Test);
});
let repo_dir = PageServerConf::test_repo_dir(test_name);

View File

@@ -126,7 +126,7 @@ fn main() -> anyhow::Result<()> {
return Ok(());
}
logging::init(LogFormat::from_config(&args.log_format)?)?;
logging::init(LogFormat::from_config(&args.log_format)?);
info!("version: {GIT_VERSION}");
let args_workdir = &args.datadir;

View File

@@ -429,7 +429,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
logging::init(LogFormat::from_config(&args.log_format)?)?;
logging::init(LogFormat::from_config(&args.log_format)?);
info!("version: {GIT_VERSION}");
::metrics::set_build_info_metric(GIT_VERSION);

View File

@@ -595,13 +595,14 @@ class NeonEnvBuilder:
# fsync is disabled by default to make the tests go faster
safekeepers_enable_fsync: bool = False,
auth_enabled: bool = False,
rust_log_override: Optional[str] = None,
rust_log_env_var: Optional[str] = None,
default_branch_name: str = DEFAULT_BRANCH_NAME,
preserve_database_files: bool = False,
initial_tenant: Optional[TenantId] = None,
initial_timeline: Optional[TimelineId] = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
self.rust_log_env_var = rust_log_env_var
self.port_distributor = port_distributor
self.remote_storage = remote_storage
self.remote_storage_users = remote_storage_users
@@ -622,6 +623,7 @@ class NeonEnvBuilder:
self.pg_version = pg_version
self.preserve_database_files = preserve_database_files
self.initial_tenant = initial_tenant or TenantId.generate()
self.initial_timeline = initial_timeline or TimelineId.generate()
def init_configs(self) -> NeonEnv:
# Cannot create more than one environment from one builder
@@ -640,10 +642,10 @@ class NeonEnvBuilder:
# Prepare the default branch to start the postgres on later.
# Pageserver itself does not create tenants and timelines, until started first and asked via HTTP API.
log.info(
f"Services started, creating initial tenant {env.initial_tenant} and its initial timeline"
f"Services started, creating initial timeline {env.initial_tenant}/{env.initial_timeline}"
)
initial_tenant, initial_timeline = env.neon_cli.create_tenant(tenant_id=env.initial_tenant)
log.info(f"Initial timeline {initial_tenant}/{initial_timeline} created successfully")
env.neon_cli.create_tenant(tenant_id=env.initial_tenant, timeline_id=env.initial_timeline)
log.info("Initial timeline created successfully")
return env
@@ -888,7 +890,7 @@ class NeonEnv:
def __init__(self, config: NeonEnvBuilder):
self.repo_dir = config.repo_dir
self.rust_log_override = config.rust_log_override
self.rust_log_env_var = config.rust_log_env_var
self.port_distributor = config.port_distributor
self.s3_mock_server = config.mock_s3_server
self.neon_cli = NeonCli(env=self)
@@ -904,6 +906,7 @@ class NeonEnv:
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
self.initial_tenant = config.initial_tenant
self.initial_timeline = config.initial_timeline
# Create a config file corresponding to the options
toml = textwrap.dedent(
@@ -1118,6 +1121,80 @@ class PageserverApiException(Exception):
pass
class PageserverLogScope(abc.ABC):
log_level: Optional[str]
tenant_id: Optional[TenantId]
timeline_id: Optional[TimelineId]
@abc.abstractmethod
def to_json(self) -> dict[str, Any]:
pass
def disable(self):
self.log_level = None
class CustomLogScope(PageserverLogScope):
directive: str
enabled: bool
def __init__(self, directive: str, enabled: bool):
self.directive = directive
self.enabled = enabled
def __str__(self) -> str:
return f"CustomLogScope{{directive: {self.directive}, enabled: {self.enabled}}}"
def to_json(self) -> dict[str, Any]:
return {
"directive": self.directive,
"enabled": self.enabled,
}
def disable(self):
self.enabled = False
class TenantLogScope(PageserverLogScope):
tenant_id: TenantId
def __init__(self, log_level: Optional[str], tenant_id: TenantId):
self.log_level = log_level
self.tenant_id = tenant_id
def __str__(self) -> str:
return f"TenantLogScope{{log_level: {self.log_level}, tenant_id: {self.tenant_id}}}"
def to_json(self) -> dict[str, Any]:
return {
"log_level": self.log_level if self.log_level else None,
"scope": {"kind": "Tenant", "tenant_id": str(self.tenant_id)},
}
class TimelineLogScope(PageserverLogScope):
tenant_id: TenantId
timeline_id: TimelineId
def __init__(self, log_level: Optional[str], tenant_id: TenantId, timeline_id: TimelineId):
self.tenant_id = tenant_id
self.log_level = log_level
self.timeline_id = timeline_id
def __str__(self) -> str:
return f"TimelineLogScope{{log_level: {self.log_level}, tenant_id: {self.tenant_id}, timeline_id: {self.timeline_id}}}"
def to_json(self) -> dict[str, Any]:
return {
"log_level": self.log_level if self.log_level else None,
"scope": {
"kind": "Timeline",
"tenant_id": str(self.tenant_id),
"timeline_id": str(self.timeline_id),
},
}
class PageserverHttpClient(requests.Session):
def __init__(self, port: int, is_testing_enabled_or_skip: Fn, auth_token: Optional[str] = None):
super().__init__()
@@ -1161,6 +1238,27 @@ class PageserverHttpClient(requests.Session):
assert res_json is None
return res_json
def current_log_filter(self) -> str:
res = self.get(f"http://localhost:{self.port}/v1/log_filter")
self.verbose_error(res)
log_filter = res.json()
assert isinstance(log_filter, str)
return log_filter
def change_log_filter(self, scope: PageserverLogScope):
res = self.put(
f"http://localhost:{self.port}/v1/log_filter",
json=scope.to_json(),
)
self.verbose_error(res)
assert (
res.status_code == 201
), f"Expected 201 status code, but got {res.status_code} for scope {scope}"
def reset_log_filter(self):
res = self.post(f"http://localhost:{self.port}/v1/reset_log_filter")
self.verbose_error(res)
def tenant_list(self) -> List[Dict[Any, Any]]:
res = self.get(f"http://localhost:{self.port}/v1/tenant")
self.verbose_error(res)
@@ -1635,8 +1733,8 @@ class AbstractNeonCli(abc.ABC):
env_vars = os.environ.copy()
env_vars["NEON_REPO_DIR"] = str(self.env.repo_dir)
env_vars["POSTGRES_DISTRIB_DIR"] = str(self.env.pg_distrib_dir)
if self.env.rust_log_override is not None:
env_vars["RUST_LOG"] = self.env.rust_log_override
if self.env.rust_log_env_var is not None:
env_vars["RUST_LOG"] = self.env.rust_log_env_var
for (extra_env_key, extra_env_value) in (extra_env_vars or {}).items():
env_vars[extra_env_key] = extra_env_value
@@ -2158,6 +2256,17 @@ class NeonPageserver(PgProtocol):
return None
def log_lines(self) -> List[str]:
logfile = open(os.path.join(self.env.repo_dir, "pageserver.log"), "r")
lines = []
while True:
line = logfile.readline()
if line:
lines.append(line)
else:
return lines
def append_pageserver_param_overrides(
params_to_update: List[str],

View File

@@ -326,7 +326,7 @@ def check_neon_works(
# TODO: replace with NeonEnvBuilder / NeonEnv
config: Any = type("NeonEnvStub", (object,), {})
config.rust_log_override = None
config.rust_log_env_var = None
config.repo_dir = repo_dir
config.pg_version = pg_version
config.initial_tenant = snapshot_config["default_tenant_id"]

View File

@@ -1,12 +1,17 @@
import subprocess
from pathlib import Path
from typing import Optional
from typing import List, Optional
import pytest
from fixtures.neon_fixtures import (
DEFAULT_BRANCH_NAME,
CustomLogScope,
NeonEnv,
NeonEnvBuilder,
PageserverHttpClient,
PageserverLogScope,
TenantLogScope,
TimelineLogScope,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
@@ -185,3 +190,154 @@ def test_pageserver_http_api_client_auth_enabled(neon_env_builder: NeonEnvBuilde
with env.pageserver.http_client(auth_token=pageserver_token) as client:
check_client(client, env.initial_tenant)
def pageserver_predefined_scopes() -> List[PageserverLogScope]:
return [
TenantLogScope("debug", TenantId.generate()),
TimelineLogScope("debug", TenantId.generate(), TimelineId.generate()),
]
def pageserver_custom_scopes() -> List[PageserverLogScope]:
return [CustomLogScope("pageserver=debug", True), CustomLogScope("hyper=debug", True)]
@pytest.mark.parametrize(
"pageserver_log_scope", pageserver_predefined_scopes() + pageserver_custom_scopes()
)
def test_pageserver_logs_toggle_overrides_env_var(
neon_env_builder: NeonEnvBuilder, pageserver_log_scope: PageserverLogScope
):
# Disable global logging, including the pageserver one, using an env var
neon_env_builder.rust_log_env_var = "error"
env = neon_env_builder.init_start()
pageserver = env.pageserver
pageserver_http = pageserver.http_client()
initial_log_filter = pageserver_http.current_log_filter()
# Ensure that pageserver does not produce logs with such setting
new_timeline_with_no_logs = TimelineId.generate()
pageserver_http.timeline_create(
tenant_id=env.initial_tenant, new_timeline_id=new_timeline_with_no_logs
)
pageserver_http.tenant_status(env.initial_tenant)
assert (
len(pageserver.log_lines()) == 0
), "No err log lines expected, others should be turned off"
# Make a dynamic override via HTTP api and check that it makes pageserver to produce some logs
pageserver_log_scope.tenant_id = env.initial_tenant
pageserver_log_scope.timeline_id = env.initial_timeline
pageserver_log_scope.log_level = "debug"
pageserver_http.change_log_filter(pageserver_log_scope)
log_filter_after_update = pageserver_http.current_log_filter()
assert (
initial_log_filter != log_filter_after_update
), "Turning a filter on should change the log filter"
# Query pageserver to produce some logs with the override
pageserver_http.tenant_status(env.initial_tenant)
pageserver_http.timeline_compact(env.initial_tenant, env.initial_timeline)
# Disable the override back
pageserver_log_scope.disable()
pageserver_http.change_log_filter(pageserver_log_scope)
log_filter_after_toggle = pageserver_http.current_log_filter()
assert (
initial_log_filter == log_filter_after_toggle
), "Turning a filter on and off should restore the initial state"
updated_log_lines = pageserver.log_lines()
log_lines_count_after_override = len(updated_log_lines)
assert (
log_lines_count_after_override > 0
), f"Pageserver should produce logs due to the new override, filter: '{pageserver_http.current_log_filter()}'"
pageserver_http.tenant_status(env.initial_tenant)
pageserver_http.timeline_compact(env.initial_tenant, env.initial_timeline)
latest_log_lines = pageserver.log_lines()
assert (
len(latest_log_lines) == log_lines_count_after_override
), "After override is disabled, no more logs should be produced again"
assert all(
str(new_timeline_with_no_logs) not in line for line in latest_log_lines
), f"Should not contain unised timeline {new_timeline_with_no_logs} in logs"
def test_pageserver_logs_restart(neon_env_builder: NeonEnvBuilder):
# Disable global logging, including the pageserver one, using an env var
neon_env_builder.rust_log_env_var = "error"
env = neon_env_builder.init_start()
pageserver = env.pageserver
pageserver_http = pageserver.http_client()
initial_log_filter = pageserver_http.current_log_filter()
current_log_filter = initial_log_filter
for pageserver_log_scope in pageserver_predefined_scopes():
pageserver_log_scope.tenant_id = env.initial_tenant
pageserver_log_scope.timeline_id = env.initial_timeline
pageserver_log_scope.log_level = "debug"
pageserver_http.change_log_filter(pageserver_log_scope)
new_filter = pageserver_http.current_log_filter()
assert new_filter != current_log_filter, "New filter should appear after the toggle"
current_log_filter = new_filter
# Make a dynamic override via HTTP api and check that it makes pageserver to produce some logs
pageserver_http.tenant_status(env.initial_tenant)
pageserver_http.timeline_compact(env.initial_tenant, env.initial_timeline)
env.pageserver.stop()
updated_log_lines = pageserver.log_lines()
log_lines_count_after_override = len(updated_log_lines)
assert (
log_lines_count_after_override > 0
), f"Pageserver should produce logs due to the new override, filter: '{current_log_filter}'"
env.pageserver.start()
pageserver_http.tenant_status(env.initial_tenant)
pageserver_http.timeline_compact(env.initial_tenant, env.initial_timeline)
assert (
initial_log_filter == pageserver_http.current_log_filter()
), "Pageserver log filter should get back to default after a restart"
assert (
len(pageserver.log_lines()) == log_lines_count_after_override
), "After pageserver restart, no more log overrides should produce debug logs"
def test_pageserver_logs_reset(neon_env_builder: NeonEnvBuilder):
# Disable global logging, including the pageserver one, using an env var
neon_env_builder.rust_log_env_var = "error"
env = neon_env_builder.init_start()
pageserver = env.pageserver
pageserver_http = pageserver.http_client()
initial_log_filter = pageserver_http.current_log_filter()
current_log_filter = initial_log_filter
for pageserver_log_scope in pageserver_custom_scopes():
pageserver_log_scope.tenant_id = env.initial_tenant
pageserver_log_scope.timeline_id = env.initial_timeline
pageserver_log_scope.log_level = "debug"
pageserver_http.change_log_filter(pageserver_log_scope)
new_filter = pageserver_http.current_log_filter()
assert new_filter != current_log_filter, "New filter should appear after the toggle"
current_log_filter = new_filter
pageserver_http.tenant_status(env.initial_tenant)
pageserver_http.timeline_compact(env.initial_tenant, env.initial_timeline)
pageserver_http.reset_log_filter()
updated_log_lines = pageserver.log_lines()
log_lines_count_after_override = len(updated_log_lines)
assert (
log_lines_count_after_override > 0
), f"Pageserver should produce logs due to the new override, filter: '{current_log_filter}'"
pageserver_http.tenant_status(env.initial_tenant)
pageserver_http.timeline_compact(env.initial_tenant, env.initial_timeline)
assert (
initial_log_filter == pageserver_http.current_log_filter()
), "Pageserver log filter should get back to default after a reset"
assert (
len(pageserver.log_lines()) == log_lines_count_after_override
), "After pageserver restart, no more log overrides should produce debug logs"

View File

@@ -25,7 +25,6 @@ futures-channel = { version = "0.3", features = ["sink"] }
futures-executor = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
hashbrown = { version = "0.12", features = ["raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }
@@ -51,7 +50,6 @@ tonic = { version = "0.8", features = ["tls-roots"] }
tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
url = { version = "2", features = ["serde"] }
[build-dependencies]
@@ -59,7 +57,6 @@ anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] }
either = { version = "1" }
hashbrown = { version = "0.12", features = ["raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits"] }
log = { version = "0.4", default-features = false, features = ["std"] }