Compare commits

...

12 Commits

Author SHA1 Message Date
Dmitrii Kovalkov
4532f2331f Disable extra jobs 2025-05-28 07:11:45 +02:00
Dmitrii Kovalkov
ab79bac7f7 Custom ingest bench workflow 2025-05-27 13:53:41 +02:00
Erik Grinaker
95a5f749c8 pageserver: use an Option for GcCutoffs::time (#11984)
## Problem

It is not currently possible to disambiguate a timeline with an
uninitialized PITR cutoff from one that was created within the PITR
window -- both of these have `GcCutoffs::time == Lsn(0)`. For billing
metrics, we need to disambiguate these to avoid accidentally billing the
entire history when a tenant is initially loaded.

Touches https://github.com/neondatabase/cloud/issues/28155.

## Summary of changes

Make `GcCutoffs::time` an `Option<Lsn>`, and only set it to `Some` when
initialized. A `pitr_interval` of 0 will yield `Some(last_record_lsn)`.

This PR takes a conservative approach, and mostly retains the old
behavior of consumers by using `unwrap_or_default()` to yield 0 when
uninitialized, to avoid accidentally introducing bugs -- except in cases
where there is high confidence that the change is beneficial (e.g. for
the `pageserver_pitr_history_size` Prometheus metric and to return early
during GC).
2025-05-21 15:42:11 +00:00
Konstantin Merenkov
5db20af8a7 Keep the conn info cache on max_client_conn from pgbouncer (#11986)
## Problem
Hitting max_client_conn from pgbouncer would lead to invalidation of the
conn info cache.
Customers would hit the limit on wake_compute.

## Summary of changes
`should_retry_wake_compute` detects this specific error from pgbouncer
as non-retriable,
meaning we won't try to wake up the compute again.
2025-05-21 15:27:30 +00:00
Arpad Müller
136cf1979b Add metric for number of offloaded timelines (#11976)
We want to keep track of the number of offloaded timelines. It's a
per-tenant shard metric because each shard makes offloading decisions on
its own.
2025-05-21 11:28:22 +00:00
Vlad Lazar
08bb72e516 pageserver: allow in-mem reads to be planned during writes (#11937)
## Problem

Get page tracing revealed situations where planning an in-memory layer
is taking around 150ms. Upon investigation, the culprit is the inner
in-mem layer file lock. A batch being written holds the write lock and a
read being planned wants the read lock. See [this
trace](https://neonprod.grafana.net/explore?schemaVersion=1&panes=%7B%22j61%22:%7B%22datasource%22:%22JMfY_5TVz%22,%22queries%22:%5B%7B%22refId%22:%22traceId%22,%22queryType%22:%22traceql%22,%22query%22:%22412ec4522fe1750798aca54aec2680ac%22,%22datasource%22:%7B%22type%22:%22tempo%22,%22uid%22:%22JMfY_5TVz%22%7D,%22limit%22:20,%22tableType%22:%22traces%22,%22metricsQueryType%22:%22range%22%7D%5D,%22range%22:%7B%22to%22:%221746702606349%22,%22from%22:%221746681006349%22%7D,%22panelsState%22:%7B%22trace%22:%7B%22spanId%22:%2291e9f1879c9bccc0%22%7D%7D%7D,%226d0%22:%7B%22datasource%22:%22JMfY_5TVz%22,%22queries%22:%5B%7B%22refId%22:%22traceId%22,%22queryType%22:%22traceql%22,%22query%22:%2220a4757706b16af0e1fbab83f9d2e925%22,%22datasource%22:%7B%22type%22:%22tempo%22,%22uid%22:%22JMfY_5TVz%22%7D,%22limit%22:20,%22tableType%22:%22traces%22,%22metricsQueryType%22:%22range%22%7D%5D,%22range%22:%7B%22to%22:%221746702614807%22,%22from%22:%221746681014807%22%7D,%22panelsState%22:%7B%22trace%22:%7B%22spanId%22:%2260e7825512bc2a6b%22%7D%7D%7D%7D)
for example.

## Summary of changes

Lift the index into its own RwLock such that we can at least plan during
write IO.

I tried to be smarter in
https://github.com/neondatabase/neon/pull/11866: arc swap + structurally
shared datastructure
and that killed ingest perf for small keys.

## Benchmarking

* No statistically significant difference for rust inget benchmarks when
compared to main.
2025-05-21 11:08:49 +00:00
Alexander Sarantcev
6f4f3691a5 pageserver: Add tracing endpoint correctness check in config validation (#11970)
## Problem

When using an incorrect endpoint string - `"localhost:4317"`, it's a
runtime error, but it can be a config error
- Closes: https://github.com/neondatabase/neon/issues/11394

## Summary of changes

Add config parse time check via `request::Url::parse` validation.

---------

Co-authored-by: Aleksandr Sarantsev <ephemeralsad@gmail.com>
2025-05-21 09:03:26 +00:00
dependabot[bot]
a2b756843e chore(deps): bump setuptools from 70.0.0 to 78.1.1 in the pip group across 1 directory (#11977)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-05-20 23:00:49 +00:00
Conrad Ludgate
f3c9d0adf4 proxy(logging): significant changes to json logging internals for performance. (#11974)
#11962 

Please review each commit separately.

Each commit is rather small in goal. The overall goal of this PR is to
keep the behaviour identical, but shave away small inefficiencies here
and there.
2025-05-20 17:57:59 +00:00
Konstantin Knizhnik
2e3dc9a8c2 Add rel_size_replica_cache (#11889)
## Problem

See 
Discussion:
https://neondb.slack.com/archives/C033RQ5SPDH/p1746645666075799
Issue: https://github.com/neondatabase/cloud/issues/28609

Relation size cache is not correctly updated at PS in case of replicas.

## Summary of changes

1. Have two caches for relation size in timeline:
`rel_size_primary_cache` and `rel_size_replica_cache`.
2. `rel_size_primary_cache` is actually what we have now. The only
difference is that it is not updated in `get_rel_size`, only by WAL
ingestion
3. `rel_size_replica_cache` has limited size (LruCache) and it's key is
`(Lsn,RelTag)` . It is updated in `get_rel_size`. Only strict LSN
matches are accepted as cache hit.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-20 15:38:27 +00:00
Konstantin Merenkov
568779fa8a proxy/scram: avoid memory copy to improve performance (#11980)
Touches #11941

## Problem
Performance of our PBKDF2 was worse than reference.

## Summary of changes
Avoided memory copy when HMACing in a tight loop.
2025-05-20 15:23:54 +00:00
Alexey Kondratov
e94acbc816 fix(compute_ctl): Dollar escaping and tests (#11969)
## Problem

In the escaping path we were checking that `${tag}$` or `${outer_tag}$`
are present in the string, but that's not enough, as original string
surrounded by `$` can also form a 'tag', like `$x$xx$x$`, which is fine
on it's own, but cannot be used in the string escaped with `$xx$`.

## Summary of changes

Remove `$` from the checks, just check if `{tag}` or `{outer_tag}` are
present. Add more test cases and change the catalog test to stress the
`drop_subscriptions_before_start: true` path as well.

Fixes https://github.com/neondatabase/cloud/issues/29198
2025-05-20 09:03:36 +00:00
33 changed files with 1208 additions and 563 deletions

View File

@@ -32,31 +32,31 @@ jobs:
fail-fast: false # allow other variants to continue even if one fails
matrix:
include:
- target_project: new_empty_project_stripe_size_2048
stripe_size: 2048 # 16 MiB
postgres_version: 16
disable_sharding: false
- target_project: new_empty_project_stripe_size_32768
stripe_size: 32768 # 256 MiB # note that this is different from null because using null will shard_split the project only if it reaches the threshold
# while here it is sharded from the beginning with a shard size of 256 MiB
disable_sharding: false
postgres_version: 16
- target_project: new_empty_project
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
disable_sharding: false
postgres_version: 16
# - target_project: new_empty_project_stripe_size_2048
# stripe_size: 2048 # 16 MiB
# postgres_version: 16
# disable_sharding: false
# - target_project: new_empty_project_stripe_size_32768
# stripe_size: 32768 # 256 MiB # note that this is different from null because using null will shard_split the project only if it reaches the threshold
# # while here it is sharded from the beginning with a shard size of 256 MiB
# disable_sharding: false
# postgres_version: 16
# - target_project: new_empty_project
# stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
# disable_sharding: false
# postgres_version: 16
- target_project: new_empty_project
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
disable_sharding: false
postgres_version: 17
- target_project: large_existing_project
stripe_size: null # cannot re-shared or choose different stripe size for existing, already sharded project
disable_sharding: false
postgres_version: 16
- target_project: new_empty_project_unsharded
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
disable_sharding: true
postgres_version: 16
# - target_project: large_existing_project
# stripe_size: null # cannot re-shared or choose different stripe size for existing, already sharded project
# disable_sharding: false
# postgres_version: 16
# - target_project: new_empty_project_unsharded
# stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
# disable_sharding: true
# postgres_version: 16
max-parallel: 1 # we want to run each stripe size sequentially to be able to compare the results
permissions:
contents: write
@@ -115,6 +115,9 @@ jobs:
stripe_size: ${{ matrix.stripe_size }}
disable_sharding: ${{ matrix.disable_sharding }}
- name: Sleep for 10 minutes
run: sleep 600
- name: Initialize Neon project
if: ${{ startsWith(matrix.target_project, 'new_empty_project') }}
env:

41
Cargo.lock generated
View File

@@ -3898,6 +3898,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num"
version = "0.4.1"
@@ -4182,6 +4192,12 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "p256"
version = "0.11.1"
@@ -4286,6 +4302,7 @@ dependencies = [
"enumset",
"fail",
"futures",
"hashlink",
"hex",
"hex-literal",
"http-utils",
@@ -5238,6 +5255,7 @@ dependencies = [
"tracing-log",
"tracing-opentelemetry",
"tracing-subscriber",
"tracing-test",
"tracing-utils",
"try-lock",
"typed-json",
@@ -7688,6 +7706,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
@@ -7701,6 +7720,27 @@ dependencies = [
"tracing-serde",
]
[[package]]
name = "tracing-test"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
dependencies = [
"tracing-core",
"tracing-subscriber",
"tracing-test-macro",
]
[[package]]
name = "tracing-test-macro"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
dependencies = [
"quote",
"syn 2.0.100",
]
[[package]]
name = "tracing-utils"
version = "0.1.0"
@@ -8553,6 +8593,7 @@ dependencies = [
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
"url",
"uuid",
"zeroize",

View File

@@ -213,8 +213,10 @@ impl Escaping for PgIdent {
// Find the first suitable tag that is not present in the string.
// Postgres' max role/DB name length is 63 bytes, so even in the
// worst case it won't take long.
while self.contains(&format!("${tag}$")) || self.contains(&format!("${outer_tag}$")) {
// worst case it won't take long. Outer tag is always `tag + "x"`,
// so if `tag` is not present in the string, `outer_tag` is not
// present in the string either.
while self.contains(&tag.to_string()) {
tag += "x";
outer_tag = tag.clone() + "x";
}

View File

@@ -71,6 +71,14 @@ test.escaping = 'here''s a backslash \\ and a quote '' and a double-quote " hoor
("name$$$", ("$x$name$$$$x$", "xx")),
("name$$$$", ("$x$name$$$$$x$", "xx")),
("name$x$", ("$xx$name$x$$xx$", "xxx")),
("x", ("$xx$x$xx$", "xxx")),
("xx", ("$xxx$xx$xxx$", "xxxx")),
("$x", ("$xx$$x$xx$", "xxx")),
("x$", ("$xx$x$$xx$", "xxx")),
("$x$", ("$xx$$x$$xx$", "xxx")),
("xx$", ("$xxx$xx$$xxx$", "xxxx")),
("$xx", ("$xxx$$xx$xxx$", "xxxx")),
("$xx$", ("$xxx$$xx$$xxx$", "xxxx")),
];
for (input, expected) in test_cases {

View File

@@ -546,6 +546,11 @@ impl PageServerNode {
.map(serde_json::from_str)
.transpose()
.context("Falied to parse 'sampling_ratio'")?,
relsize_snapshot_cache_capacity: settings
.remove("relsize snapshot cache capacity")
.map(|x| x.parse::<usize>())
.transpose()
.context("Falied to parse 'relsize_snapshot_cache_capacity' as integer")?,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")

View File

@@ -491,6 +491,8 @@ pub struct TenantConfigToml {
/// Tenant level performance sampling ratio override. Controls the ratio of get page requests
/// that will get perf sampling for the tenant.
pub sampling_ratio: Option<Ratio>,
/// Capacity of relsize snapshot cache (used by replicas).
pub relsize_snapshot_cache_capacity: usize,
}
pub mod defaults {
@@ -730,6 +732,7 @@ pub mod tenant_conf_defaults {
pub const DEFAULT_GC_COMPACTION_VERIFICATION: bool = true;
pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB: u64 = 5 * 1024 * 1024; // 5GB
pub const DEFAULT_GC_COMPACTION_RATIO_PERCENT: u64 = 100;
pub const DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY: usize = 1000;
}
impl Default for TenantConfigToml {
@@ -787,6 +790,7 @@ impl Default for TenantConfigToml {
gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB,
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,
sampling_ratio: None,
relsize_snapshot_cache_capacity: DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY,
}
}
}

View File

@@ -630,6 +630,8 @@ pub struct TenantConfigPatch {
pub gc_compaction_ratio_percent: FieldPatch<u64>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub sampling_ratio: FieldPatch<Option<Ratio>>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub relsize_snapshot_cache_capacity: FieldPatch<usize>,
}
/// Like [`crate::config::TenantConfigToml`], but preserves the information
@@ -759,6 +761,9 @@ pub struct TenantConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub sampling_ratio: Option<Option<Ratio>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub relsize_snapshot_cache_capacity: Option<usize>,
}
impl TenantConfig {
@@ -804,6 +809,7 @@ impl TenantConfig {
mut gc_compaction_initial_threshold_kb,
mut gc_compaction_ratio_percent,
mut sampling_ratio,
mut relsize_snapshot_cache_capacity,
} = self;
patch.checkpoint_distance.apply(&mut checkpoint_distance);
@@ -905,6 +911,9 @@ impl TenantConfig {
.gc_compaction_ratio_percent
.apply(&mut gc_compaction_ratio_percent);
patch.sampling_ratio.apply(&mut sampling_ratio);
patch
.relsize_snapshot_cache_capacity
.apply(&mut relsize_snapshot_cache_capacity);
Ok(Self {
checkpoint_distance,
@@ -944,6 +953,7 @@ impl TenantConfig {
gc_compaction_initial_threshold_kb,
gc_compaction_ratio_percent,
sampling_ratio,
relsize_snapshot_cache_capacity,
})
}
@@ -1052,6 +1062,9 @@ impl TenantConfig {
.gc_compaction_ratio_percent
.unwrap_or(global_conf.gc_compaction_ratio_percent),
sampling_ratio: self.sampling_ratio.unwrap_or(global_conf.sampling_ratio),
relsize_snapshot_cache_capacity: self
.relsize_snapshot_cache_capacity
.unwrap_or(global_conf.relsize_snapshot_cache_capacity),
}
}
}

View File

@@ -86,6 +86,27 @@ pub struct DbError {
}
impl DbError {
pub fn new_test_error(code: SqlState, message: String) -> Self {
DbError {
severity: "ERROR".to_string(),
parsed_severity: Some(Severity::Error),
code,
message,
detail: None,
hint: None,
position: None,
where_: None,
schema: None,
table: None,
column: None,
datatype: None,
constraint: None,
file: None,
line: None,
routine: None,
}
}
pub(crate) fn parse(fields: &mut ErrorFields<'_>) -> io::Result<DbError> {
let mut severity = None;
let mut parsed_severity = None;

View File

@@ -30,6 +30,7 @@ crc32c.workspace = true
either.workspace = true
fail.workspace = true
futures.workspace = true
hashlink.workspace = true
hex.workspace = true
humantime.workspace = true
humantime-serde.workspace = true

View File

@@ -343,7 +343,7 @@ where
// Gather non-relational files from object storage pages.
let slru_partitions = self
.timeline
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
.get_slru_keyspace(Version::at(self.lsn), self.ctx)
.await?
.partition(
self.timeline.get_shard_identity(),
@@ -378,7 +378,7 @@ where
// Otherwise only include init forks of unlogged relations.
let rels = self
.timeline
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx)
.await?;
for &rel in rels.iter() {
// Send init fork as main fork to provide well formed empty
@@ -517,7 +517,7 @@ where
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> Result<(), BasebackupError> {
let nblocks = self
.timeline
.get_rel_size(src, Version::Lsn(self.lsn), self.ctx)
.get_rel_size(src, Version::at(self.lsn), self.ctx)
.await?;
// If the relation is empty, create an empty file
@@ -577,7 +577,7 @@ where
let relmap_img = if has_relmap_file {
let img = self
.timeline
.get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.get_relmap_file(spcnode, dbnode, Version::at(self.lsn), self.ctx)
.await?;
if img.len()
@@ -631,7 +631,7 @@ where
if !has_relmap_file
&& self
.timeline
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
.list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx)
.await?
.is_empty()
{

View File

@@ -544,6 +544,23 @@ impl PageServerConf {
ratio.numerator, ratio.denominator
)
);
let url = Url::parse(&tracing_config.export_config.endpoint)
.map_err(anyhow::Error::msg)
.with_context(|| {
format!(
"tracing endpoint URL is invalid : {}",
tracing_config.export_config.endpoint
)
})?;
ensure!(
url.scheme() == "http" || url.scheme() == "https",
format!(
"tracing endpoint URL must start with http:// or https://: {}",
tracing_config.export_config.endpoint
)
);
}
IndexEntry::validate_checkpoint_distance(conf.default_tenant_conf.checkpoint_distance)
@@ -660,4 +677,25 @@ mod tests {
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
.expect("parse_and_validate");
}
#[test]
fn test_config_tracing_endpoint_is_invalid() {
let input = r#"
control_plane_api = "http://localhost:6666"
[tracing]
sampling_ratio = { numerator = 1, denominator = 0 }
[tracing.export_config]
endpoint = "localhost:4317"
protocol = "http-binary"
timeout = "1ms"
"#;
let config_toml = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input)
.expect("config has valid fields");
let workdir = Utf8PathBuf::from("/nonexistent");
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
.expect_err("parse_and_validate should fail for endpoint without scheme");
}
}

View File

@@ -449,7 +449,7 @@ async fn build_timeline_info_common(
// Internally we distinguish between the planned GC cutoff (PITR point) and the "applied" GC cutoff (where we
// actually trimmed data to), which can pass each other when PITR is changed.
let min_readable_lsn = std::cmp::max(
timeline.get_gc_cutoff_lsn(),
timeline.get_gc_cutoff_lsn().unwrap_or_default(),
*timeline.get_applied_gc_cutoff_lsn(),
);

View File

@@ -843,23 +843,50 @@ pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy<IntCounter> = Lazy::new(|
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
pub(crate) static RELSIZE_LATEST_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_relsize_cache_entries",
"Number of entries in the relation size cache",
"pageserver_relsize_latest_cache_entries",
"Number of entries in the latest relation size cache",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!("pageserver_relsize_cache_hits", "Relation size cache hits",)
.expect("failed to define a metric")
pub(crate) static RELSIZE_LATEST_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_relsize_latest_cache_hits",
"Latest relation size cache hits",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
pub(crate) static RELSIZE_LATEST_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_relsize_cache_misses",
"Relation size cache misses",
"pageserver_relsize_latest_cache_misses",
"Relation size latest cache misses",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_SNAPSHOT_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_relsize_snapshot_cache_entries",
"Number of entries in the pitr relation size cache",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_SNAPSHOT_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_relsize_snapshot_cache_hits",
"Pitr relation size cache hits",
)
.expect("failed to define a metric")
});
pub(crate) static RELSIZE_SNAPSHOT_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_relsize_snapshot_cache_misses",
"Relation size snapshot cache misses",
)
.expect("failed to define a metric")
});
@@ -1039,6 +1066,15 @@ pub(crate) static TENANT_SYNTHETIC_SIZE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|
.expect("Failed to register pageserver_tenant_synthetic_cached_size_bytes metric")
});
pub(crate) static TENANT_OFFLOADED_TIMELINES: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_tenant_offloaded_timelines",
"Number of offloaded timelines of a tenant",
&["tenant_id", "shard_id"]
)
.expect("Failed to register pageserver_tenant_offloaded_timelines metric")
});
pub(crate) static EVICTION_ITERATION_DURATION: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_eviction_iteration_duration_seconds_global",
@@ -3524,11 +3560,14 @@ impl TimelineMetrics {
}
pub(crate) fn remove_tenant_metrics(tenant_shard_id: &TenantShardId) {
let tid = tenant_shard_id.tenant_id.to_string();
let shard_id = tenant_shard_id.shard_slug().to_string();
// Only shard zero deals in synthetic sizes
if tenant_shard_id.is_shard_zero() {
let tid = tenant_shard_id.tenant_id.to_string();
let _ = TENANT_SYNTHETIC_SIZE_METRIC.remove_label_values(&[&tid]);
}
let _ = TENANT_OFFLOADED_TIMELINES.remove_label_values(&[&tid, &shard_id]);
tenant_throttling::remove_tenant_metrics(tenant_shard_id);

View File

@@ -62,7 +62,7 @@ use crate::metrics::{
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
SmgrOpTimer, TimelineMetrics,
};
use crate::pgdatadir_mapping::Version;
use crate::pgdatadir_mapping::{LsnRange, Version};
use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
@@ -642,7 +642,7 @@ impl std::fmt::Display for BatchedPageStreamError {
struct BatchedGetPageRequest {
req: PagestreamGetPageRequest,
timer: SmgrOpTimer,
effective_request_lsn: Lsn,
lsn_range: LsnRange,
ctx: RequestContext,
}
@@ -764,12 +764,12 @@ impl BatchedFeMessage {
match batching_strategy {
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
if let Some(last_in_batch) = accum_pages.last() {
if last_in_batch.effective_request_lsn
!= this_pages[0].effective_request_lsn
if last_in_batch.lsn_range.effective_lsn
!= this_pages[0].lsn_range.effective_lsn
{
trace!(
accum_lsn = %last_in_batch.effective_request_lsn,
this_lsn = %this_pages[0].effective_request_lsn,
accum_lsn = %last_in_batch.lsn_range.effective_lsn,
this_lsn = %this_pages[0].lsn_range.effective_lsn,
"stopping batching because LSN changed"
);
@@ -784,15 +784,15 @@ impl BatchedFeMessage {
let same_page_different_lsn = accum_pages.iter().any(|batched| {
batched.req.rel == this_pages[0].req.rel
&& batched.req.blkno == this_pages[0].req.blkno
&& batched.effective_request_lsn
!= this_pages[0].effective_request_lsn
&& batched.lsn_range.effective_lsn
!= this_pages[0].lsn_range.effective_lsn
});
if same_page_different_lsn {
trace!(
rel=%this_pages[0].req.rel,
blkno=%this_pages[0].req.blkno,
lsn=%this_pages[0].effective_request_lsn,
lsn=%this_pages[0].lsn_range.effective_lsn,
"stopping batching because same page was requested at different LSNs"
);
@@ -1158,7 +1158,7 @@ impl PageServerHandler {
.await?;
// We're holding the Handle
let effective_request_lsn = match Self::effective_request_lsn(
let effective_lsn = match Self::effective_request_lsn(
&shard,
shard.get_last_record_lsn(),
req.hdr.request_lsn,
@@ -1177,7 +1177,10 @@ impl PageServerHandler {
pages: smallvec::smallvec![BatchedGetPageRequest {
req,
timer,
effective_request_lsn,
lsn_range: LsnRange {
effective_lsn,
request_lsn: req.hdr.request_lsn
},
ctx,
}],
// The executor grabs the batch when it becomes idle.
@@ -2127,7 +2130,14 @@ impl PageServerHandler {
.await?;
let exists = timeline
.get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
.get_rel_exists(
req.rel,
Version::LsnRange(LsnRange {
effective_lsn: lsn,
request_lsn: req.hdr.request_lsn,
}),
ctx,
)
.await?;
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
@@ -2154,7 +2164,14 @@ impl PageServerHandler {
.await?;
let n_blocks = timeline
.get_rel_size(req.rel, Version::Lsn(lsn), ctx)
.get_rel_size(
req.rel,
Version::LsnRange(LsnRange {
effective_lsn: lsn,
request_lsn: req.hdr.request_lsn,
}),
ctx,
)
.await?;
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
@@ -2181,7 +2198,15 @@ impl PageServerHandler {
.await?;
let total_blocks = timeline
.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
.get_db_size(
DEFAULTTABLESPACE_OID,
req.dbnode,
Version::LsnRange(LsnRange {
effective_lsn: lsn,
request_lsn: req.hdr.request_lsn,
}),
ctx,
)
.await?;
let db_size = total_blocks as i64 * BLCKSZ as i64;
@@ -2214,7 +2239,7 @@ impl PageServerHandler {
// Ignore error (trace buffer may be full or tracer may have disconnected).
_ = page_trace.try_send(PageTraceEvent {
key,
effective_lsn: batch.effective_request_lsn,
effective_lsn: batch.lsn_range.effective_lsn,
time,
});
}
@@ -2229,7 +2254,7 @@ impl PageServerHandler {
perf_instrument = true;
}
req.effective_request_lsn
req.lsn_range.effective_lsn
})
.max()
.expect("batch is never empty");
@@ -2283,7 +2308,7 @@ impl PageServerHandler {
(
&p.req.rel,
&p.req.blkno,
p.effective_request_lsn,
p.lsn_range,
p.ctx.attached_child(),
)
}),

View File

@@ -43,7 +43,9 @@ use crate::aux_file;
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::metrics::{
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS,
RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS,
RELSIZE_SNAPSHOT_CACHE_MISSES,
};
use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
@@ -90,6 +92,28 @@ pub enum LsnForTimestamp {
NoData(Lsn),
}
/// Each request to page server contains LSN range: `not_modified_since..request_lsn`.
/// See comments libs/pageserver_api/src/models.rs.
/// Based on this range and `last_record_lsn` PS calculates `effective_lsn`.
/// But to distinguish requests from primary and replicas we need also to pass `request_lsn`.
#[derive(Debug, Clone, Copy, Default)]
pub struct LsnRange {
pub effective_lsn: Lsn,
pub request_lsn: Lsn,
}
impl LsnRange {
pub fn at(lsn: Lsn) -> LsnRange {
LsnRange {
effective_lsn: lsn,
request_lsn: lsn,
}
}
pub fn is_latest(&self) -> bool {
self.request_lsn == Lsn::MAX
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum CalculateLogicalSizeError {
#[error("cancelled")]
@@ -202,13 +226,13 @@ impl Timeline {
io_concurrency: IoConcurrency,
) -> Result<Bytes, PageReconstructError> {
match version {
Version::Lsn(effective_lsn) => {
Version::LsnRange(lsns) => {
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
let res = self
.get_rel_page_at_lsn_batched(
pages.iter().map(|(tag, blknum)| {
(tag, blknum, effective_lsn, ctx.attached_child())
}),
pages
.iter()
.map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())),
io_concurrency.clone(),
ctx,
)
@@ -246,7 +270,7 @@ impl Timeline {
/// The ordering of the returned vec corresponds to the ordering of `pages`.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, Lsn, RequestContext)>,
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
io_concurrency: IoConcurrency,
ctx: &RequestContext,
) -> Vec<Result<Bytes, PageReconstructError>> {
@@ -265,7 +289,7 @@ impl Timeline {
let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
HashMap::with_capacity(pages.len());
for (response_slot_idx, (tag, blknum, lsn, ctx)) in pages.enumerate() {
for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() {
if tag.relnode == 0 {
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
@@ -274,7 +298,7 @@ impl Timeline {
slots_filled += 1;
continue;
}
let lsn = lsns.effective_lsn;
let nblocks = {
let ctx = RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
@@ -289,7 +313,7 @@ impl Timeline {
.attached_child();
match self
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
.get_rel_size(*tag, Version::LsnRange(lsns), &ctx)
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
.await
{
@@ -470,7 +494,7 @@ impl Timeline {
));
}
if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
return Ok(nblocks);
}
@@ -488,7 +512,7 @@ impl Timeline {
let mut buf = version.get(self, key, ctx).await?;
let nblocks = buf.get_u32_le();
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
self.update_cached_rel_size(tag, version, nblocks);
Ok(nblocks)
}
@@ -510,7 +534,7 @@ impl Timeline {
}
// first try to lookup relation in cache
if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) {
return Ok(true);
}
// then check if the database was already initialized.
@@ -632,7 +656,7 @@ impl Timeline {
) -> Result<Bytes, PageReconstructError> {
assert!(self.tenant_shard_id.is_shard_zero());
let n_blocks = self
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
.get_slru_segment_size(kind, segno, Version::at(lsn), ctx)
.await?;
let keyspace = KeySpace::single(
@@ -867,11 +891,11 @@ impl Timeline {
mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
) -> Result<T, PageReconstructError> {
for segno in self
.list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
.list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx)
.await?
{
let nblocks = self
.get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
.get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx)
.await?;
let keyspace = KeySpace::single(
@@ -1137,7 +1161,7 @@ impl Timeline {
let mut total_size: u64 = 0;
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
for rel in self
.list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
.list_rels(*spcnode, *dbnode, Version::at(lsn), ctx)
.await?
{
if self.cancel.is_cancelled() {
@@ -1212,7 +1236,7 @@ impl Timeline {
result.add_key(rel_dir_to_key(spcnode, dbnode));
let mut rels: Vec<RelTag> = self
.list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
.list_rels(spcnode, dbnode, Version::at(lsn), ctx)
.await?
.into_iter()
.collect();
@@ -1329,59 +1353,75 @@ impl Timeline {
Ok((dense_keyspace, sparse_keyspace))
}
/// Get cached size of relation if it not updated after specified LSN
pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
let rel_size_cache = self.rel_size_cache.read().unwrap();
if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
if lsn >= *cached_lsn {
RELSIZE_CACHE_HITS.inc();
return Some(*nblocks);
/// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of
/// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size
/// at the particular LSN (snapshot).
pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option<BlockNumber> {
let lsn = version.get_lsn();
{
let rel_size_cache = self.rel_size_latest_cache.read().unwrap();
if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
if lsn >= *cached_lsn {
RELSIZE_LATEST_CACHE_HITS.inc();
return Some(*nblocks);
}
RELSIZE_CACHE_MISSES_OLD.inc();
}
RELSIZE_CACHE_MISSES_OLD.inc();
}
RELSIZE_CACHE_MISSES.inc();
{
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) {
RELSIZE_SNAPSHOT_CACHE_HITS.inc();
return Some(*nblock);
}
}
if version.is_latest() {
RELSIZE_LATEST_CACHE_MISSES.inc();
} else {
RELSIZE_SNAPSHOT_CACHE_MISSES.inc();
}
None
}
/// Update cached relation size if there is no more recent update
pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if lsn < rel_size_cache.complete_as_of {
// Do not cache old values. It's safe to cache the size on read, as long as
// the read was at an LSN since we started the WAL ingestion. Reasoning: we
// never evict values from the cache, so if the relation size changed after
// 'lsn', the new value is already in the cache.
return;
}
match rel_size_cache.map.entry(tag) {
hash_map::Entry::Occupied(mut entry) => {
let cached_lsn = entry.get_mut();
if lsn >= cached_lsn.0 {
*cached_lsn = (lsn, nblocks);
pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) {
let lsn = version.get_lsn();
if version.is_latest() {
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
match rel_size_cache.entry(tag) {
hash_map::Entry::Occupied(mut entry) => {
let cached_lsn = entry.get_mut();
if lsn >= cached_lsn.0 {
*cached_lsn = (lsn, nblocks);
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert((lsn, nblocks));
RELSIZE_LATEST_CACHE_ENTRIES.inc();
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert((lsn, nblocks));
RELSIZE_CACHE_ENTRIES.inc();
} else {
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
if rel_size_cache.capacity() != 0 {
rel_size_cache.insert((lsn, tag), nblocks);
RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64);
}
}
}
/// Store cached relation size
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
RELSIZE_CACHE_ENTRIES.inc();
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() {
RELSIZE_LATEST_CACHE_ENTRIES.inc();
}
}
/// Remove cached relation size
pub fn remove_cached_rel_size(&self, tag: &RelTag) {
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
if rel_size_cache.map.remove(tag).is_some() {
RELSIZE_CACHE_ENTRIES.dec();
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
if rel_size_cache.remove(tag).is_some() {
RELSIZE_LATEST_CACHE_ENTRIES.dec();
}
}
}
@@ -1585,7 +1625,10 @@ impl DatadirModification<'_> {
// check the cache too. This is because eagerly checking the cache results in
// less work overall and 10% better performance. It's more work on cache miss
// but cache miss is rare.
if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
if let Some(nblocks) = self
.tline
.get_cached_rel_size(&rel, Version::Modified(self))
{
Ok(nblocks)
} else if !self
.tline
@@ -2667,7 +2710,7 @@ pub struct DatadirModificationStats {
/// timeline to not miss the latest updates.
#[derive(Clone, Copy)]
pub enum Version<'a> {
Lsn(Lsn),
LsnRange(LsnRange),
Modified(&'a DatadirModification<'a>),
}
@@ -2679,7 +2722,7 @@ impl Version<'_> {
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
match self {
Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await,
Version::Modified(modification) => modification.get(key, ctx).await,
}
}
@@ -2701,12 +2744,26 @@ impl Version<'_> {
}
}
fn get_lsn(&self) -> Lsn {
pub fn is_latest(&self) -> bool {
match self {
Version::Lsn(lsn) => *lsn,
Version::LsnRange(lsns) => lsns.is_latest(),
Version::Modified(_) => true,
}
}
pub fn get_lsn(&self) -> Lsn {
match self {
Version::LsnRange(lsns) => lsns.effective_lsn,
Version::Modified(modification) => modification.lsn,
}
}
pub fn at(lsn: Lsn) -> Self {
Version::LsnRange(LsnRange {
effective_lsn: lsn,
request_lsn: lsn,
})
}
}
//--- Metadata structs stored in key-value pairs in the repository.

View File

@@ -86,8 +86,8 @@ use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError};
use crate::l0_flush::L0FlushGlobalState;
use crate::metrics::{
BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS,
INITDB_RUN_TIME, INITDB_SEMAPHORE_ACQUISITION_TIME, TENANT, TENANT_STATE_METRIC,
TENANT_SYNTHETIC_SIZE_METRIC, remove_tenant_metrics,
INITDB_RUN_TIME, INITDB_SEMAPHORE_ACQUISITION_TIME, TENANT, TENANT_OFFLOADED_TIMELINES,
TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC, remove_tenant_metrics,
};
use crate::task_mgr::TaskKind;
use crate::tenant::config::LocationMode;
@@ -3348,6 +3348,13 @@ impl TenantShard {
activated_timelines += 1;
}
let tid = self.tenant_shard_id.tenant_id.to_string();
let shard_id = self.tenant_shard_id.shard_slug().to_string();
let offloaded_timeline_count = timelines_offloaded_accessor.len();
TENANT_OFFLOADED_TIMELINES
.with_label_values(&[&tid, &shard_id])
.set(offloaded_timeline_count as u64);
self.state.send_modify(move |current_state| {
assert!(
matches!(current_state, TenantState::Activating(_)),
@@ -4587,7 +4594,7 @@ impl TenantShard {
target.cutoffs = GcCutoffs {
space: space_cutoff,
time: Lsn::INVALID,
time: None,
};
}
}
@@ -4671,7 +4678,7 @@ impl TenantShard {
if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
if let Some(ancestor_gc_cutoffs) = gc_cutoffs.get(&ancestor_id) {
target.within_ancestor_pitr =
timeline.get_ancestor_lsn() >= ancestor_gc_cutoffs.time;
Some(timeline.get_ancestor_lsn()) >= ancestor_gc_cutoffs.time;
}
}
@@ -4684,13 +4691,15 @@ impl TenantShard {
} else {
0
});
timeline.metrics.pitr_history_size.set(
timeline
.get_last_record_lsn()
.checked_sub(target.cutoffs.time)
.unwrap_or(Lsn(0))
.0,
);
if let Some(time_cutoff) = target.cutoffs.time {
timeline.metrics.pitr_history_size.set(
timeline
.get_last_record_lsn()
.checked_sub(time_cutoff)
.unwrap_or_default()
.0,
);
}
// Apply the cutoffs we found to the Timeline's GcInfo. Why might we _not_ have cutoffs for a timeline?
// - this timeline was created while we were finding cutoffs
@@ -4699,8 +4708,8 @@ impl TenantShard {
let original_cutoffs = target.cutoffs.clone();
// GC cutoffs should never go back
target.cutoffs = GcCutoffs {
space: Lsn(cutoffs.space.0.max(original_cutoffs.space.0)),
time: Lsn(cutoffs.time.0.max(original_cutoffs.time.0)),
space: cutoffs.space.max(original_cutoffs.space),
time: cutoffs.time.max(original_cutoffs.time),
}
}
}
@@ -5560,6 +5569,14 @@ impl TenantShard {
}
}
// Update metrics
let tid = self.tenant_shard_id.to_string();
let shard_id = self.tenant_shard_id.shard_slug().to_string();
let set_key = &[tid.as_str(), shard_id.as_str()][..];
TENANT_OFFLOADED_TIMELINES
.with_label_values(set_key)
.set(manifest.offloaded_timelines.len() as u64);
// Upload the manifest. Remote storage does no retries internally, so retry here.
match backoff::retry(
|| async {
@@ -8937,7 +8954,7 @@ mod tests {
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x30);
guard.cutoffs.time = Some(Lsn(0x30));
guard.cutoffs.space = Lsn(0x30);
}
@@ -9045,7 +9062,7 @@ mod tests {
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x40);
guard.cutoffs.time = Some(Lsn(0x40));
guard.cutoffs.space = Lsn(0x40);
}
tline
@@ -9463,7 +9480,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -9547,7 +9564,7 @@ mod tests {
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x40);
guard.cutoffs.time = Some(Lsn(0x40));
guard.cutoffs.space = Lsn(0x40);
}
tline
@@ -10018,7 +10035,7 @@ mod tests {
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -10081,7 +10098,7 @@ mod tests {
let verify_result = || async {
let gc_horizon = {
let gc_info = tline.gc_info.read().unwrap();
gc_info.cutoffs.time
gc_info.cutoffs.time.unwrap_or_default()
};
for idx in 0..10 {
assert_eq!(
@@ -10159,7 +10176,7 @@ mod tests {
.await;
// Update GC info
let mut guard = tline.gc_info.write().unwrap();
guard.cutoffs.time = Lsn(0x38);
guard.cutoffs.time = Some(Lsn(0x38));
guard.cutoffs.space = Lsn(0x38);
}
tline
@@ -10267,7 +10284,7 @@ mod tests {
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -10330,7 +10347,7 @@ mod tests {
let verify_result = || async {
let gc_horizon = {
let gc_info = tline.gc_info.read().unwrap();
gc_info.cutoffs.time
gc_info.cutoffs.time.unwrap_or_default()
};
for idx in 0..10 {
assert_eq!(
@@ -10516,7 +10533,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x18), branch_tline.timeline_id, MaybeOffloaded::No)],
cutoffs: GcCutoffs {
time: Lsn(0x10),
time: Some(Lsn(0x10)),
space: Lsn(0x10),
},
leases: Default::default(),
@@ -10536,7 +10553,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x40), branch_tline.timeline_id, MaybeOffloaded::No)],
cutoffs: GcCutoffs {
time: Lsn(0x50),
time: Some(Lsn(0x50)),
space: Lsn(0x50),
},
leases: Default::default(),
@@ -11257,7 +11274,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No)],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -11646,7 +11663,7 @@ mod tests {
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -11709,7 +11726,7 @@ mod tests {
let verify_result = || async {
let gc_horizon = {
let gc_info = tline.gc_info.read().unwrap();
gc_info.cutoffs.time
gc_info.cutoffs.time.unwrap_or_default()
};
for idx in 0..10 {
assert_eq!(
@@ -11898,7 +11915,7 @@ mod tests {
(Lsn(0x20), tline.timeline_id, MaybeOffloaded::No),
],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),
@@ -11961,7 +11978,7 @@ mod tests {
let verify_result = || async {
let gc_horizon = {
let gc_info = tline.gc_info.read().unwrap();
gc_info.cutoffs.time
gc_info.cutoffs.time.unwrap_or_default()
};
for idx in 0..10 {
assert_eq!(
@@ -12224,7 +12241,7 @@ mod tests {
*guard = GcInfo {
retain_lsns: vec![],
cutoffs: GcCutoffs {
time: Lsn(0x30),
time: Some(Lsn(0x30)),
space: Lsn(0x30),
},
leases: Default::default(),

View File

@@ -235,7 +235,7 @@ pub(super) async fn gather_inputs(
// than our internal space cutoff. This means that if someone drops a database and waits for their
// PITR interval, they will see synthetic size decrease, even if we are still storing data inside
// the space cutoff.
let mut next_pitr_cutoff = gc_info.cutoffs.time;
let mut next_pitr_cutoff = gc_info.cutoffs.time.unwrap_or_default(); // TODO: handle None
// If the caller provided a shorter retention period, use that instead of the GC cutoff.
let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period {

View File

@@ -63,7 +63,28 @@ pub struct InMemoryLayer {
opened_at: Instant,
/// The above fields never change, except for `end_lsn`, which is only set once.
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The [`IndexEntry`] is an offset into the
/// ephemeral file where the page version is stored.
///
/// We use a separate lock for the index to reduce the critical section
/// during which reads cannot be planned.
///
/// If you need access to both the index and the underlying file at the same time,
/// respect the following locking order to avoid deadlocks:
/// 1. [`InMemoryLayer::inner`]
/// 2. [`InMemoryLayer::index`]
///
/// Note that the file backing [`InMemoryLayer::inner`] is append-only,
/// so it is not necessary to hold simultaneous locks on index.
/// This avoids holding index locks across IO, and is crucial for avoiding read tail latency.
/// In particular:
/// 1. It is safe to read and release [`InMemoryLayer::index`] before locking and reading from [`InMemoryLayer::inner`].
/// 2. It is safe to write and release [`InMemoryLayer::inner`] before locking and updating [`InMemoryLayer::index`].
index: RwLock<BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>>,
/// The above fields never change, except for `end_lsn`, which is only set once,
/// and `index` (see rationale there).
/// All other changing parts are in `inner`, and protected by a mutex.
inner: RwLock<InMemoryLayerInner>,
@@ -81,11 +102,6 @@ impl std::fmt::Debug for InMemoryLayer {
}
pub struct InMemoryLayerInner {
/// All versions of all pages in the layer are kept here. Indexed
/// by block number and LSN. The [`IndexEntry`] is an offset into the
/// ephemeral file where the page version is stored.
index: BTreeMap<CompactKey, VecMap<Lsn, IndexEntry>>,
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file.
@@ -105,7 +121,7 @@ const MAX_SUPPORTED_BLOB_LEN_BITS: usize = {
trailing_ones
};
/// See [`InMemoryLayerInner::index`].
/// See [`InMemoryLayer::index`].
///
/// For memory efficiency, the data is packed into a u64.
///
@@ -425,7 +441,7 @@ impl InMemoryLayer {
.page_content_kind(PageContentKind::InMemoryLayer)
.attached_child();
let inner = self.inner.read().await;
let index = self.index.read().await;
struct ValueRead {
entry_lsn: Lsn,
@@ -435,10 +451,7 @@ impl InMemoryLayer {
let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
for range in keyspace.ranges.iter() {
for (key, vec_map) in inner
.index
.range(range.start.to_compact()..range.end.to_compact())
{
for (key, vec_map) in index.range(range.start.to_compact()..range.end.to_compact()) {
let key = Key::from_compact(*key);
let slice = vec_map.slice_range(lsn_range.clone());
@@ -466,7 +479,7 @@ impl InMemoryLayer {
}
}
}
drop(inner); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
drop(index); // release the lock before we spawn the IO; if it's serial-mode IO we will deadlock on the read().await below
let read_from = Arc::clone(self);
let read_ctx = ctx.attached_child();
reconstruct_state
@@ -573,8 +586,8 @@ impl InMemoryLayer {
start_lsn,
end_lsn: OnceLock::new(),
opened_at: Instant::now(),
index: RwLock::new(BTreeMap::new()),
inner: RwLock::new(InMemoryLayerInner {
index: BTreeMap::new(),
file,
resource_units: GlobalResourceUnits::new(),
}),
@@ -592,31 +605,39 @@ impl InMemoryLayer {
serialized_batch: SerializedValueBatch,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
let (base_offset, metadata) = {
let mut inner = self.inner.write().await;
self.assert_writable();
let base_offset = inner.file.len();
let base_offset = inner.file.len();
let SerializedValueBatch {
raw,
metadata,
max_lsn: _,
len: _,
} = serialized_batch;
let SerializedValueBatch {
raw,
metadata,
max_lsn: _,
len: _,
} = serialized_batch;
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
// also IndexEntry and higher levels in
//the code don't allow the file to grow that large
.unwrap();
assert_eq!(new_size, expected_new_len);
inner.resource_units.maybe_publish_size(new_size);
(base_offset, metadata)
};
// Update the index with the new entries
let mut index = self.index.write().await;
for meta in metadata {
let SerializedValueMeta {
key,
@@ -639,7 +660,7 @@ impl InMemoryLayer {
will_init,
})?;
let vec_map = inner.index.entry(key).or_default();
let vec_map = index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
if old.is_some() {
// This should not break anything, but is unexpected: ingestion code aims to filter out
@@ -658,8 +679,6 @@ impl InMemoryLayer {
);
}
inner.resource_units.maybe_publish_size(new_size);
Ok(())
}
@@ -680,6 +699,18 @@ impl InMemoryLayer {
/// Records the end_lsn for non-dropped layers.
/// `end_lsn` is exclusive
///
/// A note on locking:
/// The current API of [`InMemoryLayer`] does not ensure that there's no ongoing
/// writes while freezing the layer. This is enforced at a higher level via
/// [`crate::tenant::Timeline::write_lock`]. Freeze might be called via two code paths:
/// 1. Via the active [`crate::tenant::timeline::TimelineWriter`]. This holds the
/// Timeline::write_lock for its lifetime. The rolling is handled in
/// [`crate::tenant::timeline::TimelineWriter::put_batch`]. It's a &mut self function
/// so can't be called from different threads.
/// 2. In the background via [`crate::tenant::Timeline::maybe_freeze_ephemeral_layer`].
/// This only proceeds if try_lock on Timeline::write_lock succeeds (i.e. there's no active writer),
/// hence there can be no concurrent writes
pub async fn freeze(&self, end_lsn: Lsn) {
assert!(
self.start_lsn < end_lsn,
@@ -700,8 +731,8 @@ impl InMemoryLayer {
#[cfg(debug_assertions)]
{
let inner = self.inner.write().await;
for vec_map in inner.index.values() {
let index = self.index.read().await;
for vec_map in index.values() {
for (lsn, _) in vec_map.as_slice() {
assert!(*lsn < end_lsn);
}
@@ -724,14 +755,11 @@ impl InMemoryLayer {
) -> Result<Option<(PersistentLayerDesc, Utf8PathBuf)>> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
// though: another thread might have grabbed a reference to this layer
// in `get_layer_for_write' just before the checkpointer called
// `freeze`, and then `write_to_disk` on it. When the thread gets the
// lock, it will see that it's not writeable anymore and retry, but it
// would have to wait until we release it. That race condition is very
// rare though, so we just accept the potential latency hit for now.
// write lock on it, so we shouldn't block anyone. See the comment on
// [`InMemoryLayer::freeze`] to understand how locking between the append path
// and layer flushing works.
let inner = self.inner.read().await;
let index = self.index.read().await;
use l0_flush::Inner;
let _concurrency_permit = match l0_flush_global_state {
@@ -743,13 +771,9 @@ impl InMemoryLayer {
let key_count = if let Some(key_range) = key_range {
let key_range = key_range.start.to_compact()..key_range.end.to_compact();
inner
.index
.iter()
.filter(|(k, _)| key_range.contains(k))
.count()
index.iter().filter(|(k, _)| key_range.contains(k)).count()
} else {
inner.index.len()
index.len()
};
if key_count == 0 {
return Ok(None);
@@ -772,7 +796,7 @@ impl InMemoryLayer {
let file_contents = inner.file.load_to_io_buf(ctx).await?;
let file_contents = file_contents.freeze();
for (key, vec_map) in inner.index.iter() {
for (key, vec_map) in index.iter() {
// Write all page versions
for (lsn, entry) in vec_map
.as_slice()

View File

@@ -14,6 +14,7 @@ pub mod span;
pub mod uninit;
mod walreceiver;
use hashlink::LruCache;
use std::array;
use std::cmp::{max, min};
use std::collections::btree_map::Entry;
@@ -197,16 +198,6 @@ pub struct TimelineResources {
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
}
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
/// ingestion considerably, because WAL ingestion needs to check on most records if the record
/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end
/// of the timeline (disk_consistent_lsn). It's used on reads of relation sizes to check if the
/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`].
pub(crate) struct RelSizeCache {
pub(crate) complete_as_of: Lsn,
pub(crate) map: HashMap<RelTag, (Lsn, BlockNumber)>,
}
pub struct Timeline {
pub(crate) conf: &'static PageServerConf,
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
@@ -365,7 +356,8 @@ pub struct Timeline {
pub walreceiver: Mutex<Option<WalReceiver>>,
/// Relation size cache
pub(crate) rel_size_cache: RwLock<RelSizeCache>,
pub(crate) rel_size_latest_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
pub(crate) rel_size_snapshot_cache: Mutex<LruCache<(Lsn, RelTag), BlockNumber>>,
download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
@@ -537,29 +529,24 @@ impl GcInfo {
/// The `GcInfo` component describing which Lsns need to be retained. Functionally, this
/// is a single number (the oldest LSN which we must retain), but it internally distinguishes
/// between time-based and space-based retention for observability and consumption metrics purposes.
#[derive(Debug, Clone)]
#[derive(Clone, Debug, Default)]
pub(crate) struct GcCutoffs {
/// Calculated from the [`pageserver_api::models::TenantConfig::gc_horizon`], this LSN indicates how much
/// history we must keep to retain a specified number of bytes of WAL.
pub(crate) space: Lsn,
/// Calculated from [`pageserver_api::models::TenantConfig::pitr_interval`], this LSN indicates how much
/// history we must keep to enable reading back at least the PITR interval duration.
pub(crate) time: Lsn,
}
impl Default for GcCutoffs {
fn default() -> Self {
Self {
space: Lsn::INVALID,
time: Lsn::INVALID,
}
}
/// Calculated from [`pageserver_api::models::TenantConfig::pitr_interval`], this LSN indicates
/// how much history we must keep to enable reading back at least the PITR interval duration.
///
/// None indicates that the PITR cutoff has not been computed. A PITR interval of 0 will yield
/// Some(last_record_lsn).
pub(crate) time: Option<Lsn>,
}
impl GcCutoffs {
fn select_min(&self) -> Lsn {
std::cmp::min(self.space, self.time)
// NB: if we haven't computed the PITR cutoff yet, we can't GC anything.
self.space.min(self.time.unwrap_or_default())
}
}
@@ -1096,11 +1083,14 @@ impl Timeline {
/// Get the bytes written since the PITR cutoff on this branch, and
/// whether this branch's ancestor_lsn is within its parent's PITR.
pub(crate) fn get_pitr_history_stats(&self) -> (u64, bool) {
// TODO: for backwards compatibility, we return the full history back to 0 when the PITR
// cutoff has not yet been initialized. This should return None instead, but this is exposed
// in external HTTP APIs and callers may not handle a null value.
let gc_info = self.gc_info.read().unwrap();
let history = self
.get_last_record_lsn()
.checked_sub(gc_info.cutoffs.time)
.unwrap_or(Lsn(0))
.checked_sub(gc_info.cutoffs.time.unwrap_or_default())
.unwrap_or_default()
.0;
(history, gc_info.within_ancestor_pitr)
}
@@ -1110,9 +1100,10 @@ impl Timeline {
self.applied_gc_cutoff_lsn.read()
}
/// Read timeline's planned GC cutoff: this is the logical end of history that users
/// are allowed to read (based on configured PITR), even if physically we have more history.
pub(crate) fn get_gc_cutoff_lsn(&self) -> Lsn {
/// Read timeline's planned GC cutoff: this is the logical end of history that users are allowed
/// to read (based on configured PITR), even if physically we have more history. Returns None
/// if the PITR cutoff has not yet been initialized.
pub(crate) fn get_gc_cutoff_lsn(&self) -> Option<Lsn> {
self.gc_info.read().unwrap().cutoffs.time
}
@@ -2820,6 +2811,13 @@ impl Timeline {
self.remote_client.update_config(&new_conf.location);
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
if let Some(new_capacity) = new_conf.tenant_conf.relsize_snapshot_cache_capacity {
if new_capacity != rel_size_cache.capacity() {
rel_size_cache.set_capacity(new_capacity);
}
}
self.metrics
.evictions_with_low_residence_duration
.write()
@@ -2878,6 +2876,14 @@ impl Timeline {
ancestor_gc_info.insert_child(timeline_id, metadata.ancestor_lsn(), is_offloaded);
}
let relsize_snapshot_cache_capacity = {
let loaded_tenant_conf = tenant_conf.load();
loaded_tenant_conf
.tenant_conf
.relsize_snapshot_cache_capacity
.unwrap_or(conf.default_tenant_conf.relsize_snapshot_cache_capacity)
};
Arc::new_cyclic(|myself| {
let metrics = Arc::new(TimelineMetrics::new(
&tenant_shard_id,
@@ -2969,10 +2975,8 @@ impl Timeline {
last_image_layer_creation_check_instant: Mutex::new(None),
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(RelSizeCache {
complete_as_of: disk_consistent_lsn,
map: HashMap::new(),
}),
rel_size_latest_cache: RwLock::new(HashMap::new()),
rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)),
download_all_remote_layers_task_info: RwLock::new(None),
@@ -6230,14 +6234,12 @@ impl Timeline {
pausable_failpoint!("Timeline::find_gc_cutoffs-pausable");
if cfg!(test) {
if cfg!(test) && pitr == Duration::ZERO {
// Unit tests which specify zero PITR interval expect to avoid doing any I/O for timestamp lookup
if pitr == Duration::ZERO {
return Ok(GcCutoffs {
time: self.get_last_record_lsn(),
space: space_cutoff,
});
}
return Ok(GcCutoffs {
time: Some(self.get_last_record_lsn()),
space: space_cutoff,
});
}
// Calculate a time-based limit on how much to retain:
@@ -6251,14 +6253,14 @@ impl Timeline {
// PITR is not set. Retain the size-based limit, or the default time retention,
// whichever requires less data.
GcCutoffs {
time: self.get_last_record_lsn(),
time: Some(self.get_last_record_lsn()),
space: std::cmp::max(time_cutoff, space_cutoff),
}
}
(Duration::ZERO, None) => {
// PITR is not set, and time lookup failed
GcCutoffs {
time: self.get_last_record_lsn(),
time: Some(self.get_last_record_lsn()),
space: space_cutoff,
}
}
@@ -6266,7 +6268,7 @@ impl Timeline {
// PITR interval is set & we didn't look up a timestamp successfully. Conservatively assume PITR
// cannot advance beyond what was already GC'd, and respect space-based retention
GcCutoffs {
time: *self.get_applied_gc_cutoff_lsn(),
time: Some(*self.get_applied_gc_cutoff_lsn()),
space: space_cutoff,
}
}
@@ -6274,7 +6276,7 @@ impl Timeline {
// PITR interval is set and we looked up timestamp successfully. Ignore
// size based retention and make time cutoff authoritative
GcCutoffs {
time: time_cutoff,
time: Some(time_cutoff),
space: time_cutoff,
}
}
@@ -6327,7 +6329,7 @@ impl Timeline {
)
};
let mut new_gc_cutoff = Lsn::min(space_cutoff, time_cutoff);
let mut new_gc_cutoff = space_cutoff.min(time_cutoff.unwrap_or_default());
let standby_horizon = self.standby_horizon.load();
// Hold GC for the standby, but as a safety guard do it only within some
// reasonable lag.
@@ -6376,7 +6378,7 @@ impl Timeline {
async fn gc_timeline(
&self,
space_cutoff: Lsn,
time_cutoff: Lsn,
time_cutoff: Option<Lsn>, // None if uninitialized
retain_lsns: Vec<Lsn>,
max_lsn_with_valid_lease: Option<Lsn>,
new_gc_cutoff: Lsn,
@@ -6395,6 +6397,12 @@ impl Timeline {
return Ok(result);
}
let Some(time_cutoff) = time_cutoff else {
// The GC cutoff should have been computed by now, but let's be defensive.
info!("Nothing to GC: time_cutoff not yet computed");
return Ok(result);
};
// We need to ensure that no one tries to read page versions or create
// branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
// for details. This will block until the old value is no longer in use.

View File

@@ -1526,7 +1526,7 @@ impl Timeline {
info!(
"starting shard ancestor compaction, rewriting {} layers and dropping {} layers, \
checked {layers_checked}/{layers_total} layers \
(latest_gc_cutoff={} pitr_cutoff={})",
(latest_gc_cutoff={} pitr_cutoff={:?})",
layers_to_rewrite.len(),
drop_layers.len(),
*latest_gc_cutoff,

View File

@@ -1684,31 +1684,31 @@ mod tests {
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
.await?,
false
);
assert!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
.await
.is_err()
);
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
1
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
.await?,
3
);
@@ -1719,7 +1719,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x20)),
Version::at(Lsn(0x20)),
&ctx,
io_concurrency.clone()
)
@@ -1733,7 +1733,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x30)),
Version::at(Lsn(0x30)),
&ctx,
io_concurrency.clone()
)
@@ -1747,7 +1747,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x40)),
Version::at(Lsn(0x40)),
&ctx,
io_concurrency.clone()
)
@@ -1760,7 +1760,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x40)),
Version::at(Lsn(0x40)),
&ctx,
io_concurrency.clone()
)
@@ -1774,7 +1774,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -1787,7 +1787,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -1800,7 +1800,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
2,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -1820,7 +1820,7 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
.await?,
2
);
@@ -1829,7 +1829,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x60)),
Version::at(Lsn(0x60)),
&ctx,
io_concurrency.clone()
)
@@ -1842,7 +1842,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x60)),
Version::at(Lsn(0x60)),
&ctx,
io_concurrency.clone()
)
@@ -1854,7 +1854,7 @@ mod tests {
// should still see the truncated block with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
.await?,
3
);
@@ -1863,7 +1863,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
2,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -1880,7 +1880,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x68)), &ctx)
.await?,
0
);
@@ -1893,7 +1893,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x70)), &ctx)
.await?,
2
);
@@ -1902,7 +1902,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
0,
Version::Lsn(Lsn(0x70)),
Version::at(Lsn(0x70)),
&ctx,
io_concurrency.clone()
)
@@ -1915,7 +1915,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1,
Version::Lsn(Lsn(0x70)),
Version::at(Lsn(0x70)),
&ctx,
io_concurrency.clone()
)
@@ -1932,7 +1932,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
.await?,
1501
);
@@ -1942,7 +1942,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blk,
Version::Lsn(Lsn(0x80)),
Version::at(Lsn(0x80)),
&ctx,
io_concurrency.clone()
)
@@ -1956,7 +1956,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
1500,
Version::Lsn(Lsn(0x80)),
Version::at(Lsn(0x80)),
&ctx,
io_concurrency.clone()
)
@@ -1990,13 +1990,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
1
);
@@ -2011,7 +2011,7 @@ mod tests {
// Check that rel is not visible anymore
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x30)), &ctx)
.await?,
false
);
@@ -2029,13 +2029,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
.await?,
1
);
@@ -2077,26 +2077,26 @@ mod tests {
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
.await?,
false
);
assert!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
.await
.is_err()
);
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
.await?,
relsize
);
@@ -2110,7 +2110,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(lsn),
Version::at(lsn),
&ctx,
io_concurrency.clone()
)
@@ -2131,7 +2131,7 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
.await?,
1
);
@@ -2144,7 +2144,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x60)),
Version::at(Lsn(0x60)),
&ctx,
io_concurrency.clone()
)
@@ -2157,7 +2157,7 @@ mod tests {
// should still see all blocks with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
.await?,
relsize
);
@@ -2169,7 +2169,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x50)),
Version::at(Lsn(0x50)),
&ctx,
io_concurrency.clone()
)
@@ -2193,13 +2193,13 @@ mod tests {
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
.await?,
relsize
);
@@ -2212,7 +2212,7 @@ mod tests {
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x80)),
Version::at(Lsn(0x80)),
&ctx,
io_concurrency.clone()
)
@@ -2250,7 +2250,7 @@ mod tests {
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE + 1
);
@@ -2264,7 +2264,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE
);
@@ -2279,7 +2279,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
.await?,
RELSEG_SIZE - 1
);
@@ -2297,7 +2297,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
.await?,
size as BlockNumber
);

17
poetry.lock generated
View File

@@ -3170,19 +3170,24 @@ pbr = "*"
[[package]]
name = "setuptools"
version = "70.0.0"
version = "78.1.1"
description = "Easily download, build, install, upgrade, and uninstall Python packages"
optional = false
python-versions = ">=3.8"
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "setuptools-70.0.0-py3-none-any.whl", hash = "sha256:54faa7f2e8d2d11bcd2c07bed282eef1046b5c080d1c32add737d7b5817b1ad4"},
{file = "setuptools-70.0.0.tar.gz", hash = "sha256:f211a66637b8fa059bb28183da127d4e86396c991a942b028c6650d4319c3fd0"},
{file = "setuptools-78.1.1-py3-none-any.whl", hash = "sha256:c3a9c4211ff4c309edb8b8c4f1cbfa7ae324c4ba9f91ff254e3d305b9fd54561"},
{file = "setuptools-78.1.1.tar.gz", hash = "sha256:fcc17fd9cd898242f6b4adfaca46137a9edef687f43e6f78469692a5e70d851d"},
]
[package.extras]
docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
testing = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.1)", "pytest-checkdocs (>=2.4)", "pytest-cov ; platform_python_implementation != \"PyPy\"", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "ruff (>=0.8.0) ; sys_platform != \"cygwin\""]
core = ["importlib_metadata (>=6) ; python_version < \"3.10\"", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1) ; python_version < \"3.11\"", "wheel (>=0.43.0)"]
cover = ["pytest-cov"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"]
enabler = ["pytest-enabler (>=2.2)"]
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"]
type = ["importlib_metadata (>=7.0.2) ; python_version < \"3.10\"", "jaraco.develop (>=7.21) ; sys_platform != \"cygwin\"", "mypy (==1.14.*)", "pytest-mypy"]
[[package]]
name = "six"

View File

@@ -127,3 +127,4 @@ rstest.workspace = true
walkdir.workspace = true
rand_distr = "0.4"
tokio-postgres.workspace = true
tracing-test = "0.2"

View File

@@ -1,13 +1,11 @@
use std::cell::{Cell, RefCell};
use std::cell::RefCell;
use std::collections::HashMap;
use std::hash::BuildHasher;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::{array, env, fmt, io};
use std::{env, io};
use chrono::{DateTime, Utc};
use indexmap::IndexSet;
use opentelemetry::trace::TraceContextExt;
use scopeguard::defer;
use serde::ser::{SerializeMap, Serializer};
use tracing::subscriber::Interest;
use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
@@ -19,7 +17,6 @@ use tracing_subscriber::fmt::{FormatEvent, FormatFields};
use tracing_subscriber::layer::{Context, Layer};
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::{LookupSpan, SpanRef};
use try_lock::TryLock;
/// Initialize logging and OpenTelemetry tracing and exporter.
///
@@ -55,7 +52,7 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
StderrWriter {
stderr: std::io::stderr(),
},
["request_id", "session_id", "conn_id"],
&["request_id", "session_id", "conn_id"],
))
} else {
None
@@ -183,50 +180,65 @@ impl Clock for RealClock {
/// Name of the field used by tracing crate to store the event message.
const MESSAGE_FIELD: &str = "message";
/// Tracing used to enforce that spans/events have no more than 32 fields.
/// It seems this is no longer the case, but it's still documented in some places.
/// Generally, we shouldn't expect more than 32 fields anyway, so we can try and
/// rely on it for some (minor) performance gains.
const MAX_TRACING_FIELDS: usize = 32;
thread_local! {
/// Protects against deadlocks and double panics during log writing.
/// The current panic handler will use tracing to log panic information.
static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
/// Thread-local instance with per-thread buffer for log writing.
static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
static EVENT_FORMATTER: RefCell<EventFormatter> = const { RefCell::new(EventFormatter::new()) };
/// Cached OS thread ID.
static THREAD_ID: u64 = gettid::gettid();
}
/// Map for values fixed at callsite registration.
// We use papaya here because registration rarely happens post-startup.
// papaya is good for read-heavy workloads.
//
// We use rustc_hash here because callsite::Identifier will always be an integer with low-bit entropy,
// since it's always a pointer to static mutable data. rustc_hash was designed for low-bit entropy.
type CallsiteMap<T> =
papaya::HashMap<callsite::Identifier, T, std::hash::BuildHasherDefault<rustc_hash::FxHasher>>;
/// Implements tracing layer to handle events specific to logging.
struct JsonLoggingLayer<C: Clock, W: MakeWriter, const F: usize> {
struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
clock: C,
skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
callsite_ids: papaya::HashMap<callsite::Identifier, CallsiteId>,
writer: W,
// We use a const generic and arrays to bypass one heap allocation.
extract_fields: IndexSet<&'static str>,
_marker: std::marker::PhantomData<[&'static str; F]>,
/// tracks which fields of each **event** are duplicates
skipped_field_indices: CallsiteMap<SkippedFieldIndices>,
span_info: CallsiteMap<CallsiteSpanInfo>,
/// Fields we want to keep track of in a separate json object.
extract_fields: &'static [&'static str],
}
impl<C: Clock, W: MakeWriter, const F: usize> JsonLoggingLayer<C, W, F> {
fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self {
impl<C: Clock, W: MakeWriter> JsonLoggingLayer<C, W> {
fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self {
JsonLoggingLayer {
clock,
skipped_field_indices: papaya::HashMap::default(),
callsite_ids: papaya::HashMap::default(),
skipped_field_indices: CallsiteMap::default(),
span_info: CallsiteMap::default(),
writer,
extract_fields: IndexSet::from_iter(extract_fields),
_marker: std::marker::PhantomData,
extract_fields,
}
}
#[inline]
fn callsite_id(&self, cs: callsite::Identifier) -> CallsiteId {
*self
.callsite_ids
fn span_info(&self, metadata: &'static Metadata<'static>) -> CallsiteSpanInfo {
self.span_info
.pin()
.get_or_insert_with(cs, CallsiteId::next)
.get_or_insert_with(metadata.callsite(), || {
CallsiteSpanInfo::new(metadata, self.extract_fields)
})
.clone()
}
}
impl<S, C: Clock + 'static, W: MakeWriter + 'static, const F: usize> Layer<S>
for JsonLoggingLayer<C, W, F>
impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
@@ -237,35 +249,25 @@ where
// early, before OTel machinery, and add as event extension.
let now = self.clock.now();
let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
if entered.get() {
let mut formatter = EventFormatter::new();
formatter.format::<S, F>(
now,
event,
&ctx,
&self.skipped_field_indices,
&self.callsite_ids,
&self.extract_fields,
)?;
self.writer.make_writer().write_all(formatter.buffer())
} else {
entered.set(true);
defer!(entered.set(false););
let res: io::Result<()> = EVENT_FORMATTER.with(|f| {
let mut borrow = f.try_borrow_mut();
let formatter = match borrow.as_deref_mut() {
Ok(formatter) => formatter,
// If the thread local formatter is borrowed,
// then we likely hit an edge case were we panicked during formatting.
// We allow the logging to proceed with an uncached formatter.
Err(_) => &mut EventFormatter::new(),
};
EVENT_FORMATTER.with_borrow_mut(move |formatter| {
formatter.reset();
formatter.format::<S, F>(
now,
event,
&ctx,
&self.skipped_field_indices,
&self.callsite_ids,
&self.extract_fields,
)?;
self.writer.make_writer().write_all(formatter.buffer())
})
}
formatter.reset();
formatter.format(
now,
event,
&ctx,
&self.skipped_field_indices,
self.extract_fields,
)?;
self.writer.make_writer().write_all(formatter.buffer())
});
// In case logging fails we generate a simpler JSON object.
@@ -287,50 +289,48 @@ where
/// Registers a SpanFields instance as span extension.
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
let span = ctx.span(id).expect("span must exist");
let fields = SpanFields::default();
fields.record_fields(attrs);
// This could deadlock when there's a panic somewhere in the tracing
// event handling and a read or write guard is still held. This includes
// the OTel subscriber.
let mut exts = span.extensions_mut();
let mut fields = SpanFields::new(self.span_info(span.metadata()));
attrs.record(&mut fields);
exts.insert(fields);
// This is a new span: the extensions should not be locked
// unless some layer spawned a thread to process this span.
// I don't think any layers do that.
span.extensions_mut().insert(fields);
}
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
let span = ctx.span(id).expect("span must exist");
let ext = span.extensions();
if let Some(data) = ext.get::<SpanFields>() {
data.record_fields(values);
// assumption: `on_record` is rarely called.
// assumption: a span being updated by one thread,
// and formatted by another thread is even rarer.
let mut ext = span.extensions_mut();
if let Some(fields) = ext.get_mut::<SpanFields>() {
values.record(fields);
}
}
/// Called (lazily) whenever a new log call is executed. We quickly check
/// for duplicate field names and record duplicates as skippable. Last one
/// wins.
/// Called (lazily) roughly once per event/span instance. We quickly check
/// for duplicate field names and record duplicates as skippable. Last field wins.
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
debug_assert!(
metadata.fields().len() <= MAX_TRACING_FIELDS,
"callsite {metadata:?} has too many fields."
);
if !metadata.is_event() {
self.callsite_id(metadata.callsite());
// register the span info.
self.span_info(metadata);
// Must not be never because we wouldn't get trace and span data.
return Interest::always();
}
let mut field_indices = SkippedFieldIndices::default();
let mut seen_fields = HashMap::<&'static str, usize>::new();
let mut seen_fields = HashMap::new();
for field in metadata.fields() {
use std::collections::hash_map::Entry;
match seen_fields.entry(field.name()) {
Entry::Vacant(entry) => {
// field not seen yet
entry.insert(field.index());
}
Entry::Occupied(mut entry) => {
// replace currently stored index
let old_index = entry.insert(field.index());
// ... and append it to list of skippable indices
field_indices.push(old_index);
}
if let Some(old_index) = seen_fields.insert(field.name(), field.index()) {
field_indices.set(old_index);
}
}
@@ -344,110 +344,113 @@ where
}
}
#[derive(Copy, Clone, Debug, Default)]
#[repr(transparent)]
struct CallsiteId(u32);
/// Any span info that is fixed to a particular callsite. Not variable between span instances.
#[derive(Clone)]
struct CallsiteSpanInfo {
/// index of each field to extract. usize::MAX if not found.
extract: Arc<[usize]>,
impl CallsiteId {
#[inline]
fn next() -> Self {
// Start at 1 to reserve 0 for default.
static COUNTER: AtomicU32 = AtomicU32::new(1);
CallsiteId(COUNTER.fetch_add(1, Ordering::Relaxed))
}
/// tracks the fixed "callsite ID" for each span.
/// note: this is not stable between runs.
normalized_name: Arc<str>,
}
impl fmt::Display for CallsiteId {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
impl CallsiteSpanInfo {
fn new(metadata: &'static Metadata<'static>, extract_fields: &[&'static str]) -> Self {
// Start at 1 to reserve 0 for default.
static COUNTER: AtomicU32 = AtomicU32::new(1);
let names: Vec<&'static str> = metadata.fields().iter().map(|f| f.name()).collect();
// get all the indices of span fields we want to focus
let extract = extract_fields
.iter()
// use rposition, since we want last match wins.
.map(|f1| names.iter().rposition(|f2| f1 == f2).unwrap_or(usize::MAX))
.collect();
// normalized_name is unique for each callsite, but it is not
// unified across separate proxy instances.
// todo: can we do better here?
let cid = COUNTER.fetch_add(1, Ordering::Relaxed);
let normalized_name = format!("{}#{cid}", metadata.name()).into();
Self {
extract,
normalized_name,
}
}
}
/// Stores span field values recorded during the spans lifetime.
#[derive(Default)]
struct SpanFields {
// TODO: Switch to custom enum with lasso::Spur for Strings?
fields: papaya::HashMap<&'static str, serde_json::Value>,
values: [serde_json::Value; MAX_TRACING_FIELDS],
/// cached span info so we can avoid extra hashmap lookups in the hot path.
span_info: CallsiteSpanInfo,
}
impl SpanFields {
#[inline]
fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
fields.record(&mut SpanFieldsRecorder {
fields: self.fields.pin(),
});
fn new(span_info: CallsiteSpanInfo) -> Self {
Self {
span_info,
values: [const { serde_json::Value::Null }; MAX_TRACING_FIELDS],
}
}
}
/// Implements a tracing field visitor to convert and store values.
struct SpanFieldsRecorder<'m, S, G> {
fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
}
impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
impl tracing::field::Visit for SpanFields {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
}
#[inline]
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
}
#[inline]
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
}
#[inline]
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
if let Ok(value) = i64::try_from(value) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
} else {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value}")));
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
}
}
#[inline]
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
if let Ok(value) = u64::try_from(value) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
} else {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value}")));
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
}
}
#[inline]
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
}
#[inline]
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
}
#[inline]
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.fields
.insert(field.name(), serde_json::Value::from(value));
self.values[field.index()] = serde_json::Value::from(value);
}
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value:?}")));
self.values[field.index()] = serde_json::Value::from(format!("{value:?}"));
}
#[inline]
@@ -456,38 +459,33 @@ impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecor
field: &tracing::field::Field,
value: &(dyn std::error::Error + 'static),
) {
self.fields
.insert(field.name(), serde_json::Value::from(format!("{value}")));
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
}
}
/// List of field indices skipped during logging. Can list duplicate fields or
/// metafields not meant to be logged.
#[derive(Clone, Default)]
#[derive(Copy, Clone, Default)]
struct SkippedFieldIndices {
bits: u64,
// 32-bits is large enough for `MAX_TRACING_FIELDS`
bits: u32,
}
impl SkippedFieldIndices {
#[inline]
fn is_empty(&self) -> bool {
fn is_empty(self) -> bool {
self.bits == 0
}
#[inline]
fn push(&mut self, index: usize) {
self.bits |= 1u64
.checked_shl(index as u32)
.expect("field index too large");
fn set(&mut self, index: usize) {
debug_assert!(index <= 32, "index out of bounds of 32-bit set");
self.bits |= 1 << index;
}
#[inline]
fn contains(&self, index: usize) -> bool {
self.bits
& 1u64
.checked_shl(index as u32)
.expect("field index too large")
!= 0
fn contains(self, index: usize) -> bool {
self.bits & (1 << index) != 0
}
}
@@ -499,7 +497,7 @@ struct EventFormatter {
impl EventFormatter {
#[inline]
fn new() -> Self {
const fn new() -> Self {
EventFormatter {
logline_buffer: Vec::new(),
}
@@ -515,14 +513,13 @@ impl EventFormatter {
self.logline_buffer.clear();
}
fn format<S, const F: usize>(
fn format<S>(
&mut self,
now: DateTime<Utc>,
event: &Event<'_>,
ctx: &Context<'_, S>,
skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
callsite_ids: &papaya::HashMap<callsite::Identifier, CallsiteId>,
extract_fields: &IndexSet<&'static str>,
skipped_field_indices: &CallsiteMap<SkippedFieldIndices>,
extract_fields: &'static [&'static str],
) -> io::Result<()>
where
S: Subscriber + for<'a> LookupSpan<'a>,
@@ -533,8 +530,11 @@ impl EventFormatter {
let normalized_meta = event.normalized_metadata();
let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
let skipped_field_indices = skipped_field_indices.pin();
let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
let skipped_field_indices = skipped_field_indices
.pin()
.get(&meta.callsite())
.copied()
.unwrap_or_default();
let mut serialize = || {
let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
@@ -565,9 +565,11 @@ impl EventFormatter {
}
let spans = SerializableSpans {
ctx,
callsite_ids,
extract: ExtractedSpanFields::<'_, F>::new(extract_fields),
// collect all spans from parent to root.
spans: ctx
.event_span(event)
.map_or(vec![], |parent| parent.scope().collect()),
extracted: ExtractedSpanFields::new(extract_fields),
};
serializer.serialize_entry("spans", &spans)?;
@@ -620,9 +622,9 @@ impl EventFormatter {
}
}
if spans.extract.has_values() {
if spans.extracted.has_values() {
// TODO: add fields from event, too?
serializer.serialize_entry("extract", &spans.extract)?;
serializer.serialize_entry("extract", &spans.extracted)?;
}
serializer.end()
@@ -635,15 +637,15 @@ impl EventFormatter {
}
/// Extracts the message field that's mixed will other fields.
struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
struct MessageFieldExtractor<S: serde::ser::SerializeMap> {
serializer: S,
skipped_field_indices: Option<&'a SkippedFieldIndices>,
skipped_field_indices: SkippedFieldIndices,
state: Option<Result<(), S::Error>>,
}
impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
impl<S: serde::ser::SerializeMap> MessageFieldExtractor<S> {
#[inline]
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self {
Self {
serializer,
skipped_field_indices,
@@ -665,13 +667,11 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
fn accept_field(&self, field: &tracing::field::Field) -> bool {
self.state.is_none()
&& field.name() == MESSAGE_FIELD
&& !self
.skipped_field_indices
.is_some_and(|i| i.contains(field.index()))
&& !self.skipped_field_indices.contains(field.index())
}
}
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<S> {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
if self.accept_field(field) {
@@ -751,14 +751,14 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtracto
/// can be skipped.
// This is entirely optional and only cosmetic, though maybe helps a
// bit during log parsing in dashboards when there's no field with empty object.
struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
struct FieldsPresent(pub bool, SkippedFieldIndices);
// Even though some methods have an overhead (error, bytes) it is assumed the
// compiler won't include this since we ignore the value entirely.
impl tracing::field::Visit for FieldsPresent<'_> {
impl tracing::field::Visit for FieldsPresent {
#[inline]
fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
if !self.1.is_some_and(|i| i.contains(field.index()))
if !self.1.contains(field.index())
&& field.name() != MESSAGE_FIELD
&& !field.name().starts_with("log.")
{
@@ -768,10 +768,7 @@ impl tracing::field::Visit for FieldsPresent<'_> {
}
/// Serializes the fields directly supplied with a log event.
struct SerializableEventFields<'a, 'event>(
&'a tracing::Event<'event>,
Option<&'a SkippedFieldIndices>,
);
struct SerializableEventFields<'a, 'event>(&'a tracing::Event<'event>, SkippedFieldIndices);
impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
@@ -788,15 +785,15 @@ impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
}
/// A tracing field visitor that skips the message field.
struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
struct MessageFieldSkipper<S: serde::ser::SerializeMap> {
serializer: S,
skipped_field_indices: Option<&'a SkippedFieldIndices>,
skipped_field_indices: SkippedFieldIndices,
state: Result<(), S::Error>,
}
impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
impl<S: serde::ser::SerializeMap> MessageFieldSkipper<S> {
#[inline]
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self {
Self {
serializer,
skipped_field_indices,
@@ -809,9 +806,7 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
self.state.is_ok()
&& field.name() != MESSAGE_FIELD
&& !field.name().starts_with("log.")
&& !self
.skipped_field_indices
.is_some_and(|i| i.contains(field.index()))
&& !self.skipped_field_indices.contains(field.index())
}
#[inline]
@@ -821,7 +816,7 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
}
}
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<S> {
#[inline]
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
if self.accept_field(field) {
@@ -905,18 +900,17 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<
/// with the span names as keys. To prevent collision we append a numberic value
/// to the name. Also, collects any span fields we're interested in. Last one
/// wins.
struct SerializableSpans<'a, 'ctx, Span, const F: usize>
struct SerializableSpans<'ctx, S>
where
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
S: for<'lookup> LookupSpan<'lookup>,
{
ctx: &'a Context<'ctx, Span>,
callsite_ids: &'a papaya::HashMap<callsite::Identifier, CallsiteId>,
extract: ExtractedSpanFields<'a, F>,
spans: Vec<SpanRef<'ctx, S>>,
extracted: ExtractedSpanFields,
}
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpans<'_, '_, Span, F>
impl<S> serde::ser::Serialize for SerializableSpans<'_, S>
where
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
S: for<'lookup> LookupSpan<'lookup>,
{
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
where
@@ -924,25 +918,22 @@ where
{
let mut serializer = serializer.serialize_map(None)?;
if let Some(leaf_span) = self.ctx.lookup_current() {
for span in leaf_span.scope().from_root() {
// Append a numeric callsite ID to the span name to keep the name unique
// in the JSON object.
let cid = self
.callsite_ids
.pin()
.get(&span.metadata().callsite())
.copied()
.unwrap_or_default();
for span in self.spans.iter().rev() {
let ext = span.extensions();
// Loki turns the # into an underscore during field name concatenation.
serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?;
// all spans should have this extension.
let Some(fields) = ext.get() else { continue };
serializer.serialize_value(&SerializableSpanFields {
span: &span,
extract: &self.extract,
})?;
}
self.extracted.layer_span(fields);
let SpanFields { values, span_info } = fields;
serializer.serialize_entry(
&*span_info.normalized_name,
&SerializableSpanFields {
fields: span.metadata().fields(),
values,
},
)?;
}
serializer.end()
@@ -950,80 +941,77 @@ where
}
/// Serializes the span fields as object.
struct SerializableSpanFields<'a, 'span, Span, const F: usize>
where
Span: for<'lookup> LookupSpan<'lookup>,
{
span: &'a SpanRef<'span, Span>,
extract: &'a ExtractedSpanFields<'a, F>,
struct SerializableSpanFields<'span> {
fields: &'span tracing::field::FieldSet,
values: &'span [serde_json::Value; MAX_TRACING_FIELDS],
}
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F>
where
Span: for<'lookup> LookupSpan<'lookup>,
{
impl serde::ser::Serialize for SerializableSpanFields<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
let ext = self.span.extensions();
if let Some(data) = ext.get::<SpanFields>() {
for (name, value) in &data.fields.pin() {
serializer.serialize_entry(name, value)?;
// TODO: replace clone with reference, if possible.
self.extract.set(name, value.clone());
for (field, value) in std::iter::zip(self.fields, self.values) {
if value.is_null() {
continue;
}
serializer.serialize_entry(field.name(), value)?;
}
serializer.end()
}
}
struct ExtractedSpanFields<'a, const F: usize> {
names: &'a IndexSet<&'static str>,
// TODO: replace TryLock with something local thread and interior mutability.
// serde API doesn't let us use `mut`.
values: TryLock<([Option<serde_json::Value>; F], bool)>,
struct ExtractedSpanFields {
names: &'static [&'static str],
values: RefCell<Vec<serde_json::Value>>,
}
impl<'a, const F: usize> ExtractedSpanFields<'a, F> {
fn new(names: &'a IndexSet<&'static str>) -> Self {
impl ExtractedSpanFields {
fn new(names: &'static [&'static str]) -> Self {
ExtractedSpanFields {
names,
values: TryLock::new((array::from_fn(|_| Option::default()), false)),
values: RefCell::new(vec![serde_json::Value::Null; names.len()]),
}
}
#[inline]
fn set(&self, name: &'static str, value: serde_json::Value) {
if let Some((index, _)) = self.names.get_full(name) {
let mut fields = self.values.try_lock().expect("thread-local use");
fields.0[index] = Some(value);
fields.1 = true;
fn layer_span(&self, fields: &SpanFields) {
let mut v = self.values.borrow_mut();
let SpanFields { values, span_info } = fields;
// extract the fields
for (i, &j) in span_info.extract.iter().enumerate() {
let Some(value) = values.get(j) else { continue };
if !value.is_null() {
// TODO: replace clone with reference, if possible.
v[i] = value.clone();
}
}
}
#[inline]
fn has_values(&self) -> bool {
self.values.try_lock().expect("thread-local use").1
self.values.borrow().iter().any(|v| !v.is_null())
}
}
impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
impl serde::ser::Serialize for ExtractedSpanFields {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let mut serializer = serializer.serialize_map(None)?;
let values = self.values.try_lock().expect("thread-local use");
for (i, value) in values.0.iter().enumerate() {
if let Some(value) = value {
let key = self.names[i];
serializer.serialize_entry(key, value)?;
let values = self.values.borrow();
for (key, value) in std::iter::zip(self.names, &*values) {
if value.is_null() {
continue;
}
serializer.serialize_entry(key, value)?;
}
serializer.end()
@@ -1032,7 +1020,6 @@ impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
#[cfg(test)]
mod tests {
use std::marker::PhantomData;
use std::sync::{Arc, Mutex, MutexGuard};
use assert_json_diff::assert_json_eq;
@@ -1081,10 +1068,9 @@ mod tests {
let log_layer = JsonLoggingLayer {
clock: clock.clone(),
skipped_field_indices: papaya::HashMap::default(),
callsite_ids: papaya::HashMap::default(),
span_info: papaya::HashMap::default(),
writer: buffer.clone(),
extract_fields: IndexSet::from_iter(["x"]),
_marker: PhantomData::<[&'static str; 1]>,
extract_fields: &["x"],
};
let registry = tracing_subscriber::Registry::default().with(log_layer);

View File

@@ -48,7 +48,7 @@ impl ShouldRetryWakeCompute for postgres_client::error::DbError {
use postgres_client::error::SqlState;
// Here are errors that happens after the user successfully authenticated to the database.
// TODO: there are pgbouncer errors that should be retried, but they are not listed here.
!matches!(
let non_retriable_pg_errors = matches!(
self.code(),
&SqlState::TOO_MANY_CONNECTIONS
| &SqlState::OUT_OF_MEMORY
@@ -56,8 +56,20 @@ impl ShouldRetryWakeCompute for postgres_client::error::DbError {
| &SqlState::T_R_SERIALIZATION_FAILURE
| &SqlState::INVALID_CATALOG_NAME
| &SqlState::INVALID_SCHEMA_NAME
| &SqlState::INVALID_PARAMETER_VALUE
)
| &SqlState::INVALID_PARAMETER_VALUE,
);
if non_retriable_pg_errors {
return false;
}
// PGBouncer errors that should not trigger a wake_compute retry.
if self.code() == &SqlState::PROTOCOL_VIOLATION {
// Source for the error message:
// https://github.com/pgbouncer/pgbouncer/blob/f15997fe3effe3a94ba8bcc1ea562e6117d1a131/src/client.c#L1070
return !self
.message()
.contains("no more connections allowed (max_client_conn)");
}
true
}
}
@@ -110,3 +122,55 @@ pub(crate) fn retry_after(num_retries: u32, config: RetryConfig) -> time::Durati
.base_delay
.mul_f64(config.backoff_factor.powi((num_retries as i32) - 1))
}
#[cfg(test)]
mod tests {
use super::ShouldRetryWakeCompute;
use postgres_client::error::{DbError, SqlState};
#[test]
fn should_retry_wake_compute_for_db_error() {
// These SQLStates should NOT trigger a wake_compute retry.
let non_retry_states = [
SqlState::TOO_MANY_CONNECTIONS,
SqlState::OUT_OF_MEMORY,
SqlState::SYNTAX_ERROR,
SqlState::T_R_SERIALIZATION_FAILURE,
SqlState::INVALID_CATALOG_NAME,
SqlState::INVALID_SCHEMA_NAME,
SqlState::INVALID_PARAMETER_VALUE,
];
for state in non_retry_states {
let err = DbError::new_test_error(state.clone(), "oops".to_string());
assert!(
!err.should_retry_wake_compute(),
"State {state:?} unexpectedly retried"
);
}
// Errors coming from pgbouncer should not trigger a wake_compute retry
let non_retry_pgbouncer_errors = ["no more connections allowed (max_client_conn)"];
for error in non_retry_pgbouncer_errors {
let err = DbError::new_test_error(SqlState::PROTOCOL_VIOLATION, error.to_string());
assert!(
!err.should_retry_wake_compute(),
"PGBouncer error {error:?} unexpectedly retried"
);
}
// These SQLStates should trigger a wake_compute retry.
let retry_states = [
SqlState::CONNECTION_FAILURE,
SqlState::CONNECTION_EXCEPTION,
SqlState::CONNECTION_DOES_NOT_EXIST,
SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
];
for state in retry_states {
let err = DbError::new_test_error(state.clone(), "oops".to_string());
assert!(
err.should_retry_wake_compute(),
"State {state:?} unexpectedly skipped retry"
);
}
}
}

View File

@@ -15,6 +15,7 @@ use rstest::rstest;
use rustls::crypto::ring;
use rustls::pki_types;
use tokio::io::DuplexStream;
use tracing_test::traced_test;
use super::connect_compute::ConnectMechanism;
use super::retry::CouldRetry;
@@ -381,8 +382,14 @@ enum ConnectAction {
WakeFail,
WakeRetry,
Connect,
// connect_once -> Err, could_retry = true, should_retry_wake_compute = true
Retry,
// connect_once -> Err, could_retry = true, should_retry_wake_compute = false
RetryNoWake,
// connect_once -> Err, could_retry = false, should_retry_wake_compute = true
Fail,
// connect_once -> Err, could_retry = false, should_retry_wake_compute = false
FailNoWake,
}
#[derive(Clone)]
@@ -424,6 +431,7 @@ struct TestConnection;
#[derive(Debug)]
struct TestConnectError {
retryable: bool,
wakeable: bool,
kind: crate::error::ErrorKind,
}
@@ -448,7 +456,7 @@ impl CouldRetry for TestConnectError {
}
impl ShouldRetryWakeCompute for TestConnectError {
fn should_retry_wake_compute(&self) -> bool {
true
self.wakeable
}
}
@@ -471,10 +479,22 @@ impl ConnectMechanism for TestConnectMechanism {
ConnectAction::Connect => Ok(TestConnection),
ConnectAction::Retry => Err(TestConnectError {
retryable: true,
wakeable: true,
kind: ErrorKind::Compute,
}),
ConnectAction::RetryNoWake => Err(TestConnectError {
retryable: true,
wakeable: false,
kind: ErrorKind::Compute,
}),
ConnectAction::Fail => Err(TestConnectError {
retryable: false,
wakeable: true,
kind: ErrorKind::Compute,
}),
ConnectAction::FailNoWake => Err(TestConnectError {
retryable: false,
wakeable: false,
kind: ErrorKind::Compute,
}),
x => panic!("expecting action {x:?}, connect is called instead"),
@@ -709,3 +729,92 @@ async fn wake_non_retry() {
.unwrap_err();
mechanism.verify();
}
#[tokio::test]
#[traced_test]
async fn fail_but_wake_invalidates_cache() {
let ctx = RequestContext::test();
let mech = TestConnectMechanism::new(vec![
ConnectAction::Wake,
ConnectAction::Fail,
ConnectAction::Wake,
ConnectAction::Connect,
]);
let user = helper_create_connect_info(&mech);
let cfg = config();
connect_to_compute(&ctx, &mech, &user, cfg.retry, &cfg)
.await
.unwrap();
assert!(logs_contain(
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn fail_no_wake_skips_cache_invalidation() {
let ctx = RequestContext::test();
let mech = TestConnectMechanism::new(vec![
ConnectAction::Wake,
ConnectAction::FailNoWake,
ConnectAction::Connect,
]);
let user = helper_create_connect_info(&mech);
let cfg = config();
connect_to_compute(&ctx, &mech, &user, cfg.retry, &cfg)
.await
.unwrap();
assert!(!logs_contain(
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn retry_but_wake_invalidates_cache() {
let _ = env_logger::try_init();
use ConnectAction::*;
let ctx = RequestContext::test();
// Wake → Retry (retryable + wakeable) → Wake → Connect
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let cfg = config();
connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg)
.await
.unwrap();
mechanism.verify();
// Because Retry has wakeable=true, we should see invalidate_cache
assert!(logs_contain(
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn retry_no_wake_skips_invalidation() {
let _ = env_logger::try_init();
use ConnectAction::*;
let ctx = RequestContext::test();
// Wake → RetryNoWake (retryable + NOT wakeable)
let mechanism = TestConnectMechanism::new(vec![Wake, RetryNoWake]);
let user_info = helper_create_connect_info(&mechanism);
let cfg = config();
connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg)
.await
.unwrap_err();
mechanism.verify();
// Because RetryNoWake has wakeable=false, we must NOT see invalidate_cache
assert!(!logs_contain(
"invalidating stalled compute node info cache entry"
));
}

View File

@@ -13,22 +13,19 @@ pub(crate) struct Pbkdf2 {
// inspired from <https://github.com/neondatabase/rust-postgres/blob/20031d7a9ee1addeae6e0968e3899ae6bf01cee2/postgres-protocol/src/authentication/sasl.rs#L36-L61>
impl Pbkdf2 {
pub(crate) fn start(str: &[u8], salt: &[u8], iterations: u32) -> Self {
let hmac =
// key the HMAC and derive the first block in-place
let mut hmac =
Hmac::<Sha256>::new_from_slice(str).expect("HMAC is able to accept all key sizes");
let prev = hmac
.clone()
.chain_update(salt)
.chain_update(1u32.to_be_bytes())
.finalize()
.into_bytes();
hmac.update(salt);
hmac.update(&1u32.to_be_bytes());
let init_block = hmac.finalize_reset().into_bytes();
Self {
hmac,
// one consumed for the hash above
// one iteration spent above
iterations: iterations - 1,
hi: prev,
prev,
hi: init_block,
prev: init_block,
}
}
@@ -44,14 +41,17 @@ impl Pbkdf2 {
iterations,
} = self;
// only do 4096 iterations per turn before sharing the thread for fairness
// only do up to 4096 iterations per turn for fairness
let n = (*iterations).clamp(0, 4096);
for _ in 0..n {
*prev = hmac.clone().chain_update(*prev).finalize().into_bytes();
hmac.update(prev);
let block = hmac.finalize_reset().into_bytes();
for (hi, prev) in hi.iter_mut().zip(*prev) {
*hi ^= prev;
for (hi_byte, &b) in hi.iter_mut().zip(block.iter()) {
*hi_byte ^= b;
}
*prev = block;
}
*iterations -= n;

View File

@@ -184,6 +184,7 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
"pageserver_evictions_with_low_residence_duration_total",
"pageserver_aux_file_estimated_size",
"pageserver_valid_lsn_lease_count",
"pageserver_tenant_offloaded_timelines",
counter("pageserver_tenant_throttling_count_accounted_start"),
counter("pageserver_tenant_throttling_count_accounted_finish"),
counter("pageserver_tenant_throttling_wait_usecs_sum"),

View File

@@ -187,6 +187,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
},
"rel_size_v2_enabled": True,
"relsize_snapshot_cache_capacity": 10000,
"gc_compaction_enabled": True,
"gc_compaction_verification": False,
"gc_compaction_initial_threshold_kb": 1024000,

View File

@@ -19,6 +19,16 @@ TEST_ROLE_NAMES = [
{"name": "role$"},
{"name": "role$$"},
{"name": "role$x$"},
{"name": "x"},
{"name": "xx"},
{"name": "$x"},
{"name": "x$"},
{"name": "$x$"},
{"name": "xx$"},
{"name": "$xx"},
{"name": "$xx$"},
# 63 bytes is the limit for role/DB names in Postgres
{"name": "x" * 63},
]
TEST_DB_NAMES = [
@@ -74,6 +84,43 @@ TEST_DB_NAMES = [
"name": "db name$x$",
"owner": "role$x$",
},
{
"name": "x",
"owner": "x",
},
{
"name": "xx",
"owner": "xx",
},
{
"name": "$x",
"owner": "$x",
},
{
"name": "x$",
"owner": "x$",
},
{
"name": "$x$",
"owner": "$x$",
},
{
"name": "xx$",
"owner": "xx$",
},
{
"name": "$xx",
"owner": "$xx",
},
{
"name": "$xx$",
"owner": "$xx$",
},
# 63 bytes is the limit for role/DB names in Postgres
{
"name": "x" * 63,
"owner": "x" * 63,
},
]
@@ -146,6 +193,10 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
"""
Test that compute_ctl can create and work with databases and roles
with special characters (whitespaces, %, tabs, etc.) in the name.
Also use `drop_subscriptions_before_start: true`. We do not actually
have any subscriptions in this test, so it should be no-op, but it
i) simulates the case when we create a second dev branch together with
a new project creation, and ii) just generally stresses more code paths.
"""
env = neon_simple_env
@@ -159,6 +210,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
**{
"spec": {
"skip_pg_catalog_updates": False,
"drop_subscriptions_before_start": True,
"cluster": {
"roles": TEST_ROLE_NAMES,
"databases": TEST_DB_NAMES,
@@ -202,6 +254,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
**{
"spec": {
"skip_pg_catalog_updates": False,
"drop_subscriptions_before_start": True,
"cluster": {
"roles": [],
"databases": [],

View File

@@ -27,8 +27,9 @@ from contextlib import closing
import psycopg2
import pytest
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
from fixtures.neon_fixtures import NeonEnv, PgBin, wait_for_last_flush_lsn, wait_replica_caughtup
from fixtures.pg_version import PgVersion
from fixtures.utils import query_scalar, skip_on_postgres, wait_until
@@ -695,3 +696,110 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv):
with secondary.cursor() as secondary_cur:
secondary_cur.execute("select count(*) from t")
assert secondary_cur.fetchone() == (n_restarts,)
def test_ephemeral_endpoints_vacuum(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
sql = """
CREATE TABLE CHAR_TBL(f1 char(4));
CREATE TABLE FLOAT8_TBL(f1 float8);
CREATE TABLE INT2_TBL(f1 int2);
CREATE TABLE INT4_TBL(f1 int4);
CREATE TABLE INT8_TBL(q1 int8, q2 int8);
CREATE TABLE POINT_TBL(f1 point);
CREATE TABLE TEXT_TBL (f1 text);
CREATE TABLE VARCHAR_TBL(f1 varchar(4));
CREATE TABLE onek (unique1 int4);
CREATE TABLE onek2 AS SELECT * FROM onek;
CREATE TABLE tenk1 (unique1 int4);
CREATE TABLE tenk2 AS SELECT * FROM tenk1;
CREATE TABLE person (name text, age int4,location point);
CREATE TABLE emp (salary int4, manager name) INHERITS (person);
CREATE TABLE student (gpa float8) INHERITS (person);
CREATE TABLE stud_emp ( percent int4) INHERITS (emp, student);
CREATE TABLE road (name text,thepath path);
CREATE TABLE ihighway () INHERITS (road);
CREATE TABLE shighway(surface text) INHERITS (road);
CREATE TABLE BOOLTBL3 (d text, b bool, o int);
CREATE TABLE booltbl4(isfalse bool, istrue bool, isnul bool);
DROP TABLE BOOLTBL3;
DROP TABLE BOOLTBL4;
CREATE TABLE ceil_floor_round (a numeric);
DROP TABLE ceil_floor_round;
CREATE TABLE width_bucket_test (operand_num numeric, operand_f8 float8);
DROP TABLE width_bucket_test;
CREATE TABLE num_input_test (n1 numeric);
CREATE TABLE num_variance (a numeric);
INSERT INTO num_variance VALUES (0);
CREATE TABLE snapshot_test (nr integer, snap txid_snapshot);
CREATE TABLE guid1(guid_field UUID, text_field TEXT DEFAULT(now()));
CREATE TABLE guid2(guid_field UUID, text_field TEXT DEFAULT(now()));
CREATE INDEX guid1_btree ON guid1 USING BTREE (guid_field);
CREATE INDEX guid1_hash ON guid1 USING HASH (guid_field);
TRUNCATE guid1;
DROP TABLE guid1;
DROP TABLE guid2 CASCADE;
CREATE TABLE numrange_test (nr NUMRANGE);
CREATE INDEX numrange_test_btree on numrange_test(nr);
CREATE TABLE numrange_test2(nr numrange);
CREATE INDEX numrange_test2_hash_idx on numrange_test2 using hash (nr);
INSERT INTO numrange_test2 VALUES('[, 5)');
CREATE TABLE textrange_test (tr text);
CREATE INDEX textrange_test_btree on textrange_test(tr);
CREATE TABLE test_range_gist(ir int4range);
CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir);
DROP INDEX test_range_gist_idx;
CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir);
CREATE TABLE test_range_spgist(ir int4range);
CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir);
DROP INDEX test_range_spgist_idx;
CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir);
CREATE TABLE test_range_elem(i int4);
CREATE INDEX test_range_elem_idx on test_range_elem (i);
CREATE INDEX ON test_range_elem using spgist(int4range(i,i+10));
DROP TABLE test_range_elem;
CREATE TABLE test_range_excl(room int4range, speaker int4range, during tsrange, exclude using gist (room with =, during with &&), exclude using gist (speaker with =, during with &&));
CREATE TABLE f_test(f text, i int);
CREATE TABLE i8r_array (f1 int, f2 text);
CREATE TYPE arrayrange as range (subtype=int4[]);
CREATE TYPE two_ints as (a int, b int);
DROP TYPE two_ints cascade;
CREATE TABLE text_support_test (t text);
CREATE TABLE TEMP_FLOAT (f1 FLOAT8);
CREATE TABLE TEMP_INT4 (f1 INT4);
CREATE TABLE TEMP_INT2 (f1 INT2);
CREATE TABLE TEMP_GROUP (f1 INT4, f2 INT4, f3 FLOAT8);
CREATE TABLE POLYGON_TBL(f1 polygon);
CREATE TABLE quad_poly_tbl (id int, p polygon);
INSERT INTO quad_poly_tbl SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10)) FROM generate_series(1, 200) x, generate_series(1, 100) y;
CREATE TABLE quad_poly_tbl_ord_seq2 AS SELECT 1 FROM quad_poly_tbl;
CREATE TABLE quad_poly_tbl_ord_idx2 AS SELECT 1 FROM quad_poly_tbl;
"""
with endpoint.cursor() as cur:
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
env.endpoints.create_start(branch_name="main", lsn=lsn)
log.info(f"lsn: {lsn}")
for line in sql.split("\n"):
if len(line.strip()) == 0 or line.startswith("--"):
continue
cur.execute(line)
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
env.endpoints.create_start(branch_name="main", lsn=lsn)
log.info(f"lsn: {lsn}")
cur.execute("VACUUM FULL pg_class;")
for ep in env.endpoints.endpoints:
log.info(f"{ep.endpoint_id} / {ep.pg_port}")
pg_dump_command = ["pg_dumpall", "-f", f"/tmp/dump-{ep.endpoint_id}.sql"]
env_vars = {
"PGPORT": str(ep.pg_port),
"PGUSER": endpoint.default_options["user"],
"PGHOST": endpoint.default_options["host"],
}
pg_bin.run_capture(pg_dump_command, env=env_vars)

View File

@@ -193,6 +193,11 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
"test_ancestor_branch_archive_branch1", tenant_id, "test_ancestor_branch_archive_parent"
)
offloaded_count = ps_http.get_metric_value(
"pageserver_tenant_offloaded_timelines", {"tenant_id": f"{tenant_id}"}
)
assert offloaded_count == 0
ps_http.timeline_archival_config(
tenant_id,
leaf_timeline_id,
@@ -244,6 +249,11 @@ def test_timeline_offloading(neon_env_builder: NeonEnvBuilder, manual_offload: b
wait_until(leaf_offloaded)
wait_until(parent_offloaded)
offloaded_count = ps_http.get_metric_value(
"pageserver_tenant_offloaded_timelines", {"tenant_id": f"{tenant_id}"}
)
assert offloaded_count == 2
# Offloaded child timelines should still prevent deletion
with pytest.raises(
PageserverApiException,

View File

@@ -107,6 +107,7 @@ tower = { version = "0.4", default-features = false, features = ["balance", "buf
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
tracing-log = { version = "0.2" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["serde", "v4", "v7"] }
zeroize = { version = "1", features = ["derive", "serde"] }