pageserver: add tracing configuration knobs

This commit is contained in:
Vlad Lazar
2025-02-10 23:14:23 +01:00
parent e8a00a3464
commit ff9bb05f17
5 changed files with 105 additions and 0 deletions

1
Cargo.lock generated
View File

@@ -4339,6 +4339,7 @@ dependencies = [
"strum",
"strum_macros",
"thiserror 1.0.69",
"tracing-utils",
"utils",
]

View File

@@ -34,6 +34,7 @@ postgres_backend.workspace = true
nix = {workspace = true, optional = true}
reqwest.workspace = true
rand.workspace = true
tracing-utils.workspace = true
[dev-dependencies]
bincode.workspace = true

View File

@@ -127,6 +127,7 @@ pub struct ConfigToml {
pub load_previous_heatmap: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub generate_unarchival_heatmap: Option<bool>,
pub tracing: Option<Tracing>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -184,6 +185,58 @@ pub enum GetVectoredConcurrentIo {
SidecarTask,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct Ratio {
pub numerator: usize,
pub denominator: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct OtelExporterConfig {
pub endpoint: String,
pub protocol: OtelExporterProtocol,
#[serde(with = "humantime_serde")]
pub timeout: Duration,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum OtelExporterProtocol {
Grpc,
HttpBinary,
HttpJson,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "kebab-case")]
pub struct Tracing {
pub sampling_ratio: Ratio,
pub export_config: OtelExporterConfig,
}
impl From<&OtelExporterConfig> for tracing_utils::ExportConfig {
fn from(val: &OtelExporterConfig) -> Self {
tracing_utils::ExportConfig {
endpoint: Some(val.endpoint.clone()),
protocol: val.protocol.into(),
timeout: val.timeout,
}
}
}
impl From<OtelExporterProtocol> for tracing_utils::Protocol {
fn from(val: OtelExporterProtocol) -> Self {
match val {
OtelExporterProtocol::Grpc => tracing_utils::Protocol::Grpc,
OtelExporterProtocol::HttpJson => tracing_utils::Protocol::HttpJson,
OtelExporterProtocol::HttpBinary => tracing_utils::Protocol::HttpBinary,
}
}
}
pub mod statvfs {
pub mod mock {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -529,6 +582,7 @@ impl Default for ConfigToml {
validate_wal_contiguity: None,
load_previous_heatmap: None,
generate_unarchival_heatmap: None,
tracing: None,
}
}
}

View File

@@ -201,6 +201,8 @@ pub struct PageServerConf {
/// When set, include visible layers in the next uploaded heatmaps of an unarchived timeline.
pub generate_unarchival_heatmap: bool,
pub tracing: Option<pageserver_api::config::Tracing>,
}
/// Token for authentication to safekeepers
@@ -367,6 +369,7 @@ impl PageServerConf {
validate_wal_contiguity,
load_previous_heatmap,
generate_unarchival_heatmap,
tracing,
} = config_toml;
let mut conf = PageServerConf {
@@ -412,6 +415,7 @@ impl PageServerConf {
wal_receiver_protocol,
page_service_pipelining,
get_vectored_concurrent_io,
tracing,
// ------------------------------------------------------------
// fields that require additional validation or custom handling
@@ -476,6 +480,17 @@ impl PageServerConf {
);
}
if let Some(tracing_config) = conf.tracing.as_ref() {
let ratio = &tracing_config.sampling_ratio;
ensure!(
ratio.denominator != 0 && ratio.denominator >= ratio.numerator,
format!(
"Invalid sampling ratio: {}/{}",
ratio.numerator, ratio.denominator
)
);
}
IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance)
.map_err(anyhow::Error::msg)
.with_context(|| {

View File

@@ -376,6 +376,28 @@ class PageserverWalReceiverProtocol(StrEnum):
raise ValueError(f"Unknown protocol type: {proto}")
@dataclass
class PageserverTracingConfig:
sampling_ratio: tuple[int, int]
endpoint: str
protocol: str
timeout: str
def to_config_key_value(self) -> tuple[str, dict[str, Any]]:
value = {
"sampling-ratio": {
"numerator": self.sampling_ratio[0],
"denominator": self.sampling_ratio[1],
},
"export-config": {
"endpoint": self.endpoint,
"protocol": self.protocol,
"timeout": self.timeout,
},
}
return ("tracing", value)
class NeonEnvBuilder:
"""
Builder object to create a Neon runtime environment
@@ -425,6 +447,7 @@ class NeonEnvBuilder:
pageserver_virtual_file_io_mode: str | None = None,
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
pageserver_get_vectored_concurrent_io: str | None = None,
pageserver_tracing_config: PageserverTracingConfig | None = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -468,6 +491,8 @@ class NeonEnvBuilder:
pageserver_get_vectored_concurrent_io
)
self.pageserver_tracing_config = pageserver_tracing_config
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
pageserver_default_tenant_config_compaction_algorithm
)
@@ -1113,6 +1138,7 @@ class NeonEnv:
self.pageserver_virtual_file_io_mode = config.pageserver_virtual_file_io_mode
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io
self.pageserver_tracing_config = config.pageserver_tracing_config
# Create the neon_local's `NeonLocalInitConf`
cfg: dict[str, Any] = {
@@ -1216,6 +1242,14 @@ class NeonEnv:
if key not in ps_cfg:
ps_cfg[key] = value
if self.pageserver_tracing_config is not None:
key, value = self.pageserver_tracing_config.to_config_key_value()
if key not in ps_cfg:
ps_cfg[key] = value
ps_cfg[key] = value
# Create a corresponding NeonPageserver object
self.pageservers.append(
NeonPageserver(self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"])