mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
Merge branch 'main' into thesuhas/patched-pgrx-14
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4286,6 +4286,7 @@ dependencies = [
|
||||
"enumset",
|
||||
"fail",
|
||||
"futures",
|
||||
"hashlink",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"http-utils",
|
||||
|
||||
@@ -213,8 +213,10 @@ impl Escaping for PgIdent {
|
||||
|
||||
// Find the first suitable tag that is not present in the string.
|
||||
// Postgres' max role/DB name length is 63 bytes, so even in the
|
||||
// worst case it won't take long.
|
||||
while self.contains(&format!("${tag}$")) || self.contains(&format!("${outer_tag}$")) {
|
||||
// worst case it won't take long. Outer tag is always `tag + "x"`,
|
||||
// so if `tag` is not present in the string, `outer_tag` is not
|
||||
// present in the string either.
|
||||
while self.contains(&tag.to_string()) {
|
||||
tag += "x";
|
||||
outer_tag = tag.clone() + "x";
|
||||
}
|
||||
|
||||
@@ -71,6 +71,14 @@ test.escaping = 'here''s a backslash \\ and a quote '' and a double-quote " hoor
|
||||
("name$$$", ("$x$name$$$$x$", "xx")),
|
||||
("name$$$$", ("$x$name$$$$$x$", "xx")),
|
||||
("name$x$", ("$xx$name$x$$xx$", "xxx")),
|
||||
("x", ("$xx$x$xx$", "xxx")),
|
||||
("xx", ("$xxx$xx$xxx$", "xxxx")),
|
||||
("$x", ("$xx$$x$xx$", "xxx")),
|
||||
("x$", ("$xx$x$$xx$", "xxx")),
|
||||
("$x$", ("$xx$$x$$xx$", "xxx")),
|
||||
("xx$", ("$xxx$xx$$xxx$", "xxxx")),
|
||||
("$xx", ("$xxx$$xx$xxx$", "xxxx")),
|
||||
("$xx$", ("$xxx$$xx$$xxx$", "xxxx")),
|
||||
];
|
||||
|
||||
for (input, expected) in test_cases {
|
||||
|
||||
@@ -546,6 +546,11 @@ impl PageServerNode {
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Falied to parse 'sampling_ratio'")?,
|
||||
relsize_snapshot_cache_capacity: settings
|
||||
.remove("relsize snapshot cache capacity")
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Falied to parse 'relsize_snapshot_cache_capacity' as integer")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
|
||||
@@ -491,6 +491,8 @@ pub struct TenantConfigToml {
|
||||
/// Tenant level performance sampling ratio override. Controls the ratio of get page requests
|
||||
/// that will get perf sampling for the tenant.
|
||||
pub sampling_ratio: Option<Ratio>,
|
||||
/// Capacity of relsize snapshot cache (used by replicas).
|
||||
pub relsize_snapshot_cache_capacity: usize,
|
||||
}
|
||||
|
||||
pub mod defaults {
|
||||
@@ -730,6 +732,7 @@ pub mod tenant_conf_defaults {
|
||||
pub const DEFAULT_GC_COMPACTION_VERIFICATION: bool = true;
|
||||
pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB: u64 = 5 * 1024 * 1024; // 5GB
|
||||
pub const DEFAULT_GC_COMPACTION_RATIO_PERCENT: u64 = 100;
|
||||
pub const DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY: usize = 1000;
|
||||
}
|
||||
|
||||
impl Default for TenantConfigToml {
|
||||
@@ -787,6 +790,7 @@ impl Default for TenantConfigToml {
|
||||
gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB,
|
||||
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,
|
||||
sampling_ratio: None,
|
||||
relsize_snapshot_cache_capacity: DEFAULT_RELSIZE_SNAPSHOT_CACHE_CAPACITY,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -630,6 +630,8 @@ pub struct TenantConfigPatch {
|
||||
pub gc_compaction_ratio_percent: FieldPatch<u64>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub sampling_ratio: FieldPatch<Option<Ratio>>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub relsize_snapshot_cache_capacity: FieldPatch<usize>,
|
||||
}
|
||||
|
||||
/// Like [`crate::config::TenantConfigToml`], but preserves the information
|
||||
@@ -759,6 +761,9 @@ pub struct TenantConfig {
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub sampling_ratio: Option<Option<Ratio>>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub relsize_snapshot_cache_capacity: Option<usize>,
|
||||
}
|
||||
|
||||
impl TenantConfig {
|
||||
@@ -804,6 +809,7 @@ impl TenantConfig {
|
||||
mut gc_compaction_initial_threshold_kb,
|
||||
mut gc_compaction_ratio_percent,
|
||||
mut sampling_ratio,
|
||||
mut relsize_snapshot_cache_capacity,
|
||||
} = self;
|
||||
|
||||
patch.checkpoint_distance.apply(&mut checkpoint_distance);
|
||||
@@ -905,6 +911,9 @@ impl TenantConfig {
|
||||
.gc_compaction_ratio_percent
|
||||
.apply(&mut gc_compaction_ratio_percent);
|
||||
patch.sampling_ratio.apply(&mut sampling_ratio);
|
||||
patch
|
||||
.relsize_snapshot_cache_capacity
|
||||
.apply(&mut relsize_snapshot_cache_capacity);
|
||||
|
||||
Ok(Self {
|
||||
checkpoint_distance,
|
||||
@@ -944,6 +953,7 @@ impl TenantConfig {
|
||||
gc_compaction_initial_threshold_kb,
|
||||
gc_compaction_ratio_percent,
|
||||
sampling_ratio,
|
||||
relsize_snapshot_cache_capacity,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1052,6 +1062,9 @@ impl TenantConfig {
|
||||
.gc_compaction_ratio_percent
|
||||
.unwrap_or(global_conf.gc_compaction_ratio_percent),
|
||||
sampling_ratio: self.sampling_ratio.unwrap_or(global_conf.sampling_ratio),
|
||||
relsize_snapshot_cache_capacity: self
|
||||
.relsize_snapshot_cache_capacity
|
||||
.unwrap_or(global_conf.relsize_snapshot_cache_capacity),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ crc32c.workspace = true
|
||||
either.workspace = true
|
||||
fail.workspace = true
|
||||
futures.workspace = true
|
||||
hashlink.workspace = true
|
||||
hex.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
|
||||
@@ -343,7 +343,7 @@ where
|
||||
// Gather non-relational files from object storage pages.
|
||||
let slru_partitions = self
|
||||
.timeline
|
||||
.get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
|
||||
.get_slru_keyspace(Version::at(self.lsn), self.ctx)
|
||||
.await?
|
||||
.partition(
|
||||
self.timeline.get_shard_identity(),
|
||||
@@ -378,7 +378,7 @@ where
|
||||
// Otherwise only include init forks of unlogged relations.
|
||||
let rels = self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
|
||||
.list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx)
|
||||
.await?;
|
||||
for &rel in rels.iter() {
|
||||
// Send init fork as main fork to provide well formed empty
|
||||
@@ -517,7 +517,7 @@ where
|
||||
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> Result<(), BasebackupError> {
|
||||
let nblocks = self
|
||||
.timeline
|
||||
.get_rel_size(src, Version::Lsn(self.lsn), self.ctx)
|
||||
.get_rel_size(src, Version::at(self.lsn), self.ctx)
|
||||
.await?;
|
||||
|
||||
// If the relation is empty, create an empty file
|
||||
@@ -577,7 +577,7 @@ where
|
||||
let relmap_img = if has_relmap_file {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
|
||||
.get_relmap_file(spcnode, dbnode, Version::at(self.lsn), self.ctx)
|
||||
.await?;
|
||||
|
||||
if img.len()
|
||||
@@ -631,7 +631,7 @@ where
|
||||
if !has_relmap_file
|
||||
&& self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
|
||||
.list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx)
|
||||
.await?
|
||||
.is_empty()
|
||||
{
|
||||
|
||||
@@ -843,23 +843,50 @@ pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy<IntCounter> = Lazy::new(|
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
pub(crate) static RELSIZE_LATEST_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_relsize_cache_entries",
|
||||
"Number of entries in the relation size cache",
|
||||
"pageserver_relsize_latest_cache_entries",
|
||||
"Number of entries in the latest relation size cache",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!("pageserver_relsize_cache_hits", "Relation size cache hits",)
|
||||
.expect("failed to define a metric")
|
||||
pub(crate) static RELSIZE_LATEST_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_latest_cache_hits",
|
||||
"Latest relation size cache hits",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
pub(crate) static RELSIZE_LATEST_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_cache_misses",
|
||||
"Relation size cache misses",
|
||||
"pageserver_relsize_latest_cache_misses",
|
||||
"Relation size latest cache misses",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_SNAPSHOT_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_relsize_snapshot_cache_entries",
|
||||
"Number of entries in the pitr relation size cache",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_SNAPSHOT_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_snapshot_cache_hits",
|
||||
"Pitr relation size cache hits",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_SNAPSHOT_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_snapshot_cache_misses",
|
||||
"Relation size snapshot cache misses",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
@@ -62,7 +62,7 @@ use crate::metrics::{
|
||||
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
|
||||
SmgrOpTimer, TimelineMetrics,
|
||||
};
|
||||
use crate::pgdatadir_mapping::Version;
|
||||
use crate::pgdatadir_mapping::{LsnRange, Version};
|
||||
use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
|
||||
@@ -642,7 +642,7 @@ impl std::fmt::Display for BatchedPageStreamError {
|
||||
struct BatchedGetPageRequest {
|
||||
req: PagestreamGetPageRequest,
|
||||
timer: SmgrOpTimer,
|
||||
effective_request_lsn: Lsn,
|
||||
lsn_range: LsnRange,
|
||||
ctx: RequestContext,
|
||||
}
|
||||
|
||||
@@ -764,12 +764,12 @@ impl BatchedFeMessage {
|
||||
match batching_strategy {
|
||||
PageServiceProtocolPipelinedBatchingStrategy::UniformLsn => {
|
||||
if let Some(last_in_batch) = accum_pages.last() {
|
||||
if last_in_batch.effective_request_lsn
|
||||
!= this_pages[0].effective_request_lsn
|
||||
if last_in_batch.lsn_range.effective_lsn
|
||||
!= this_pages[0].lsn_range.effective_lsn
|
||||
{
|
||||
trace!(
|
||||
accum_lsn = %last_in_batch.effective_request_lsn,
|
||||
this_lsn = %this_pages[0].effective_request_lsn,
|
||||
accum_lsn = %last_in_batch.lsn_range.effective_lsn,
|
||||
this_lsn = %this_pages[0].lsn_range.effective_lsn,
|
||||
"stopping batching because LSN changed"
|
||||
);
|
||||
|
||||
@@ -784,15 +784,15 @@ impl BatchedFeMessage {
|
||||
let same_page_different_lsn = accum_pages.iter().any(|batched| {
|
||||
batched.req.rel == this_pages[0].req.rel
|
||||
&& batched.req.blkno == this_pages[0].req.blkno
|
||||
&& batched.effective_request_lsn
|
||||
!= this_pages[0].effective_request_lsn
|
||||
&& batched.lsn_range.effective_lsn
|
||||
!= this_pages[0].lsn_range.effective_lsn
|
||||
});
|
||||
|
||||
if same_page_different_lsn {
|
||||
trace!(
|
||||
rel=%this_pages[0].req.rel,
|
||||
blkno=%this_pages[0].req.blkno,
|
||||
lsn=%this_pages[0].effective_request_lsn,
|
||||
lsn=%this_pages[0].lsn_range.effective_lsn,
|
||||
"stopping batching because same page was requested at different LSNs"
|
||||
);
|
||||
|
||||
@@ -1158,7 +1158,7 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
// We're holding the Handle
|
||||
let effective_request_lsn = match Self::effective_request_lsn(
|
||||
let effective_lsn = match Self::effective_request_lsn(
|
||||
&shard,
|
||||
shard.get_last_record_lsn(),
|
||||
req.hdr.request_lsn,
|
||||
@@ -1177,7 +1177,10 @@ impl PageServerHandler {
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest {
|
||||
req,
|
||||
timer,
|
||||
effective_request_lsn,
|
||||
lsn_range: LsnRange {
|
||||
effective_lsn,
|
||||
request_lsn: req.hdr.request_lsn
|
||||
},
|
||||
ctx,
|
||||
}],
|
||||
// The executor grabs the batch when it becomes idle.
|
||||
@@ -2127,7 +2130,14 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
let exists = timeline
|
||||
.get_rel_exists(req.rel, Version::Lsn(lsn), ctx)
|
||||
.get_rel_exists(
|
||||
req.rel,
|
||||
Version::LsnRange(LsnRange {
|
||||
effective_lsn: lsn,
|
||||
request_lsn: req.hdr.request_lsn,
|
||||
}),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
|
||||
@@ -2154,7 +2164,14 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
let n_blocks = timeline
|
||||
.get_rel_size(req.rel, Version::Lsn(lsn), ctx)
|
||||
.get_rel_size(
|
||||
req.rel,
|
||||
Version::LsnRange(LsnRange {
|
||||
effective_lsn: lsn,
|
||||
request_lsn: req.hdr.request_lsn,
|
||||
}),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
|
||||
@@ -2181,7 +2198,15 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
let total_blocks = timeline
|
||||
.get_db_size(DEFAULTTABLESPACE_OID, req.dbnode, Version::Lsn(lsn), ctx)
|
||||
.get_db_size(
|
||||
DEFAULTTABLESPACE_OID,
|
||||
req.dbnode,
|
||||
Version::LsnRange(LsnRange {
|
||||
effective_lsn: lsn,
|
||||
request_lsn: req.hdr.request_lsn,
|
||||
}),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let db_size = total_blocks as i64 * BLCKSZ as i64;
|
||||
|
||||
@@ -2214,7 +2239,7 @@ impl PageServerHandler {
|
||||
// Ignore error (trace buffer may be full or tracer may have disconnected).
|
||||
_ = page_trace.try_send(PageTraceEvent {
|
||||
key,
|
||||
effective_lsn: batch.effective_request_lsn,
|
||||
effective_lsn: batch.lsn_range.effective_lsn,
|
||||
time,
|
||||
});
|
||||
}
|
||||
@@ -2229,7 +2254,7 @@ impl PageServerHandler {
|
||||
perf_instrument = true;
|
||||
}
|
||||
|
||||
req.effective_request_lsn
|
||||
req.lsn_range.effective_lsn
|
||||
})
|
||||
.max()
|
||||
.expect("batch is never empty");
|
||||
@@ -2283,7 +2308,7 @@ impl PageServerHandler {
|
||||
(
|
||||
&p.req.rel,
|
||||
&p.req.blkno,
|
||||
p.effective_request_lsn,
|
||||
p.lsn_range,
|
||||
p.ctx.attached_child(),
|
||||
)
|
||||
}),
|
||||
|
||||
@@ -43,7 +43,9 @@ use crate::aux_file;
|
||||
use crate::context::{PerfInstrumentFutureExt, RequestContext, RequestContextBuilder};
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_OLD,
|
||||
RELSIZE_CACHE_MISSES_OLD, RELSIZE_LATEST_CACHE_ENTRIES, RELSIZE_LATEST_CACHE_HITS,
|
||||
RELSIZE_LATEST_CACHE_MISSES, RELSIZE_SNAPSHOT_CACHE_ENTRIES, RELSIZE_SNAPSHOT_CACHE_HITS,
|
||||
RELSIZE_SNAPSHOT_CACHE_MISSES,
|
||||
};
|
||||
use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
@@ -90,6 +92,28 @@ pub enum LsnForTimestamp {
|
||||
NoData(Lsn),
|
||||
}
|
||||
|
||||
/// Each request to page server contains LSN range: `not_modified_since..request_lsn`.
|
||||
/// See comments libs/pageserver_api/src/models.rs.
|
||||
/// Based on this range and `last_record_lsn` PS calculates `effective_lsn`.
|
||||
/// But to distinguish requests from primary and replicas we need also to pass `request_lsn`.
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub struct LsnRange {
|
||||
pub effective_lsn: Lsn,
|
||||
pub request_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl LsnRange {
|
||||
pub fn at(lsn: Lsn) -> LsnRange {
|
||||
LsnRange {
|
||||
effective_lsn: lsn,
|
||||
request_lsn: lsn,
|
||||
}
|
||||
}
|
||||
pub fn is_latest(&self) -> bool {
|
||||
self.request_lsn == Lsn::MAX
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum CalculateLogicalSizeError {
|
||||
#[error("cancelled")]
|
||||
@@ -202,13 +226,13 @@ impl Timeline {
|
||||
io_concurrency: IoConcurrency,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
match version {
|
||||
Version::Lsn(effective_lsn) => {
|
||||
Version::LsnRange(lsns) => {
|
||||
let pages: smallvec::SmallVec<[_; 1]> = smallvec::smallvec![(tag, blknum)];
|
||||
let res = self
|
||||
.get_rel_page_at_lsn_batched(
|
||||
pages.iter().map(|(tag, blknum)| {
|
||||
(tag, blknum, effective_lsn, ctx.attached_child())
|
||||
}),
|
||||
pages
|
||||
.iter()
|
||||
.map(|(tag, blknum)| (tag, blknum, lsns, ctx.attached_child())),
|
||||
io_concurrency.clone(),
|
||||
ctx,
|
||||
)
|
||||
@@ -246,7 +270,7 @@ impl Timeline {
|
||||
/// The ordering of the returned vec corresponds to the ordering of `pages`.
|
||||
pub(crate) async fn get_rel_page_at_lsn_batched(
|
||||
&self,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, Lsn, RequestContext)>,
|
||||
pages: impl ExactSizeIterator<Item = (&RelTag, &BlockNumber, LsnRange, RequestContext)>,
|
||||
io_concurrency: IoConcurrency,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<Bytes, PageReconstructError>> {
|
||||
@@ -265,7 +289,7 @@ impl Timeline {
|
||||
let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
|
||||
HashMap::with_capacity(pages.len());
|
||||
|
||||
for (response_slot_idx, (tag, blknum, lsn, ctx)) in pages.enumerate() {
|
||||
for (response_slot_idx, (tag, blknum, lsns, ctx)) in pages.enumerate() {
|
||||
if tag.relnode == 0 {
|
||||
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
@@ -274,7 +298,7 @@ impl Timeline {
|
||||
slots_filled += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
let lsn = lsns.effective_lsn;
|
||||
let nblocks = {
|
||||
let ctx = RequestContextBuilder::from(&ctx)
|
||||
.perf_span(|crnt_perf_span| {
|
||||
@@ -289,7 +313,7 @@ impl Timeline {
|
||||
.attached_child();
|
||||
|
||||
match self
|
||||
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
|
||||
.get_rel_size(*tag, Version::LsnRange(lsns), &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
|
||||
.await
|
||||
{
|
||||
@@ -470,7 +494,7 @@ impl Timeline {
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
|
||||
if let Some(nblocks) = self.get_cached_rel_size(&tag, version) {
|
||||
return Ok(nblocks);
|
||||
}
|
||||
|
||||
@@ -488,7 +512,7 @@ impl Timeline {
|
||||
let mut buf = version.get(self, key, ctx).await?;
|
||||
let nblocks = buf.get_u32_le();
|
||||
|
||||
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
|
||||
self.update_cached_rel_size(tag, version, nblocks);
|
||||
|
||||
Ok(nblocks)
|
||||
}
|
||||
@@ -510,7 +534,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// first try to lookup relation in cache
|
||||
if let Some(_nblocks) = self.get_cached_rel_size(&tag, version.get_lsn()) {
|
||||
if let Some(_nblocks) = self.get_cached_rel_size(&tag, version) {
|
||||
return Ok(true);
|
||||
}
|
||||
// then check if the database was already initialized.
|
||||
@@ -632,7 +656,7 @@ impl Timeline {
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
assert!(self.tenant_shard_id.is_shard_zero());
|
||||
let n_blocks = self
|
||||
.get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx)
|
||||
.get_slru_segment_size(kind, segno, Version::at(lsn), ctx)
|
||||
.await?;
|
||||
|
||||
let keyspace = KeySpace::single(
|
||||
@@ -867,11 +891,11 @@ impl Timeline {
|
||||
mut f: impl FnMut(TimestampTz) -> ControlFlow<T>,
|
||||
) -> Result<T, PageReconstructError> {
|
||||
for segno in self
|
||||
.list_slru_segments(SlruKind::Clog, Version::Lsn(probe_lsn), ctx)
|
||||
.list_slru_segments(SlruKind::Clog, Version::at(probe_lsn), ctx)
|
||||
.await?
|
||||
{
|
||||
let nblocks = self
|
||||
.get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx)
|
||||
.get_slru_segment_size(SlruKind::Clog, segno, Version::at(probe_lsn), ctx)
|
||||
.await?;
|
||||
|
||||
let keyspace = KeySpace::single(
|
||||
@@ -1137,7 +1161,7 @@ impl Timeline {
|
||||
let mut total_size: u64 = 0;
|
||||
for (spcnode, dbnode) in dbdir.dbdirs.keys() {
|
||||
for rel in self
|
||||
.list_rels(*spcnode, *dbnode, Version::Lsn(lsn), ctx)
|
||||
.list_rels(*spcnode, *dbnode, Version::at(lsn), ctx)
|
||||
.await?
|
||||
{
|
||||
if self.cancel.is_cancelled() {
|
||||
@@ -1212,7 +1236,7 @@ impl Timeline {
|
||||
result.add_key(rel_dir_to_key(spcnode, dbnode));
|
||||
|
||||
let mut rels: Vec<RelTag> = self
|
||||
.list_rels(spcnode, dbnode, Version::Lsn(lsn), ctx)
|
||||
.list_rels(spcnode, dbnode, Version::at(lsn), ctx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect();
|
||||
@@ -1329,59 +1353,75 @@ impl Timeline {
|
||||
Ok((dense_keyspace, sparse_keyspace))
|
||||
}
|
||||
|
||||
/// Get cached size of relation if it not updated after specified LSN
|
||||
pub fn get_cached_rel_size(&self, tag: &RelTag, lsn: Lsn) -> Option<BlockNumber> {
|
||||
let rel_size_cache = self.rel_size_cache.read().unwrap();
|
||||
if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
|
||||
if lsn >= *cached_lsn {
|
||||
RELSIZE_CACHE_HITS.inc();
|
||||
return Some(*nblocks);
|
||||
/// Get cached size of relation. There are two caches: one for primary updates, it captures the latest state of
|
||||
/// of the timeline and snapshot cache, which key includes LSN and so can be used by replicas to get relation size
|
||||
/// at the particular LSN (snapshot).
|
||||
pub fn get_cached_rel_size(&self, tag: &RelTag, version: Version<'_>) -> Option<BlockNumber> {
|
||||
let lsn = version.get_lsn();
|
||||
{
|
||||
let rel_size_cache = self.rel_size_latest_cache.read().unwrap();
|
||||
if let Some((cached_lsn, nblocks)) = rel_size_cache.get(tag) {
|
||||
if lsn >= *cached_lsn {
|
||||
RELSIZE_LATEST_CACHE_HITS.inc();
|
||||
return Some(*nblocks);
|
||||
}
|
||||
RELSIZE_CACHE_MISSES_OLD.inc();
|
||||
}
|
||||
RELSIZE_CACHE_MISSES_OLD.inc();
|
||||
}
|
||||
RELSIZE_CACHE_MISSES.inc();
|
||||
{
|
||||
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
|
||||
if let Some(nblock) = rel_size_cache.get(&(lsn, *tag)) {
|
||||
RELSIZE_SNAPSHOT_CACHE_HITS.inc();
|
||||
return Some(*nblock);
|
||||
}
|
||||
}
|
||||
if version.is_latest() {
|
||||
RELSIZE_LATEST_CACHE_MISSES.inc();
|
||||
} else {
|
||||
RELSIZE_SNAPSHOT_CACHE_MISSES.inc();
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Update cached relation size if there is no more recent update
|
||||
pub fn update_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
|
||||
if lsn < rel_size_cache.complete_as_of {
|
||||
// Do not cache old values. It's safe to cache the size on read, as long as
|
||||
// the read was at an LSN since we started the WAL ingestion. Reasoning: we
|
||||
// never evict values from the cache, so if the relation size changed after
|
||||
// 'lsn', the new value is already in the cache.
|
||||
return;
|
||||
}
|
||||
|
||||
match rel_size_cache.map.entry(tag) {
|
||||
hash_map::Entry::Occupied(mut entry) => {
|
||||
let cached_lsn = entry.get_mut();
|
||||
if lsn >= cached_lsn.0 {
|
||||
*cached_lsn = (lsn, nblocks);
|
||||
pub fn update_cached_rel_size(&self, tag: RelTag, version: Version<'_>, nblocks: BlockNumber) {
|
||||
let lsn = version.get_lsn();
|
||||
if version.is_latest() {
|
||||
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
|
||||
match rel_size_cache.entry(tag) {
|
||||
hash_map::Entry::Occupied(mut entry) => {
|
||||
let cached_lsn = entry.get_mut();
|
||||
if lsn >= cached_lsn.0 {
|
||||
*cached_lsn = (lsn, nblocks);
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert((lsn, nblocks));
|
||||
RELSIZE_LATEST_CACHE_ENTRIES.inc();
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert((lsn, nblocks));
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
} else {
|
||||
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
|
||||
if rel_size_cache.capacity() != 0 {
|
||||
rel_size_cache.insert((lsn, tag), nblocks);
|
||||
RELSIZE_SNAPSHOT_CACHE_ENTRIES.set(rel_size_cache.len() as u64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Store cached relation size
|
||||
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
|
||||
if rel_size_cache.insert(tag, (lsn, nblocks)).is_none() {
|
||||
RELSIZE_LATEST_CACHE_ENTRIES.inc();
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove cached relation size
|
||||
pub fn remove_cached_rel_size(&self, tag: &RelTag) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
if rel_size_cache.map.remove(tag).is_some() {
|
||||
RELSIZE_CACHE_ENTRIES.dec();
|
||||
let mut rel_size_cache = self.rel_size_latest_cache.write().unwrap();
|
||||
if rel_size_cache.remove(tag).is_some() {
|
||||
RELSIZE_LATEST_CACHE_ENTRIES.dec();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1585,7 +1625,10 @@ impl DatadirModification<'_> {
|
||||
// check the cache too. This is because eagerly checking the cache results in
|
||||
// less work overall and 10% better performance. It's more work on cache miss
|
||||
// but cache miss is rare.
|
||||
if let Some(nblocks) = self.tline.get_cached_rel_size(&rel, self.get_lsn()) {
|
||||
if let Some(nblocks) = self
|
||||
.tline
|
||||
.get_cached_rel_size(&rel, Version::Modified(self))
|
||||
{
|
||||
Ok(nblocks)
|
||||
} else if !self
|
||||
.tline
|
||||
@@ -2667,7 +2710,7 @@ pub struct DatadirModificationStats {
|
||||
/// timeline to not miss the latest updates.
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum Version<'a> {
|
||||
Lsn(Lsn),
|
||||
LsnRange(LsnRange),
|
||||
Modified(&'a DatadirModification<'a>),
|
||||
}
|
||||
|
||||
@@ -2679,7 +2722,7 @@ impl Version<'_> {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
match self {
|
||||
Version::Lsn(lsn) => timeline.get(key, *lsn, ctx).await,
|
||||
Version::LsnRange(lsns) => timeline.get(key, lsns.effective_lsn, ctx).await,
|
||||
Version::Modified(modification) => modification.get(key, ctx).await,
|
||||
}
|
||||
}
|
||||
@@ -2701,12 +2744,26 @@ impl Version<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_lsn(&self) -> Lsn {
|
||||
pub fn is_latest(&self) -> bool {
|
||||
match self {
|
||||
Version::Lsn(lsn) => *lsn,
|
||||
Version::LsnRange(lsns) => lsns.is_latest(),
|
||||
Version::Modified(_) => true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_lsn(&self) -> Lsn {
|
||||
match self {
|
||||
Version::LsnRange(lsns) => lsns.effective_lsn,
|
||||
Version::Modified(modification) => modification.lsn,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn at(lsn: Lsn) -> Self {
|
||||
Version::LsnRange(LsnRange {
|
||||
effective_lsn: lsn,
|
||||
request_lsn: lsn,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
//--- Metadata structs stored in key-value pairs in the repository.
|
||||
|
||||
@@ -14,6 +14,7 @@ pub mod span;
|
||||
pub mod uninit;
|
||||
mod walreceiver;
|
||||
|
||||
use hashlink::LruCache;
|
||||
use std::array;
|
||||
use std::cmp::{max, min};
|
||||
use std::collections::btree_map::Entry;
|
||||
@@ -197,16 +198,6 @@ pub struct TimelineResources {
|
||||
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
|
||||
}
|
||||
|
||||
/// The relation size cache caches relation sizes at the end of the timeline. It speeds up WAL
|
||||
/// ingestion considerably, because WAL ingestion needs to check on most records if the record
|
||||
/// implicitly extends the relation. At startup, `complete_as_of` is initialized to the current end
|
||||
/// of the timeline (disk_consistent_lsn). It's used on reads of relation sizes to check if the
|
||||
/// value can be used to also update the cache, see [`Timeline::update_cached_rel_size`].
|
||||
pub(crate) struct RelSizeCache {
|
||||
pub(crate) complete_as_of: Lsn,
|
||||
pub(crate) map: HashMap<RelTag, (Lsn, BlockNumber)>,
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
pub(crate) conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<ArcSwap<AttachedTenantConf>>,
|
||||
@@ -365,7 +356,8 @@ pub struct Timeline {
|
||||
pub walreceiver: Mutex<Option<WalReceiver>>,
|
||||
|
||||
/// Relation size cache
|
||||
pub(crate) rel_size_cache: RwLock<RelSizeCache>,
|
||||
pub(crate) rel_size_latest_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
|
||||
pub(crate) rel_size_snapshot_cache: Mutex<LruCache<(Lsn, RelTag), BlockNumber>>,
|
||||
|
||||
download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
|
||||
|
||||
@@ -2820,6 +2812,13 @@ impl Timeline {
|
||||
|
||||
self.remote_client.update_config(&new_conf.location);
|
||||
|
||||
let mut rel_size_cache = self.rel_size_snapshot_cache.lock().unwrap();
|
||||
if let Some(new_capacity) = new_conf.tenant_conf.relsize_snapshot_cache_capacity {
|
||||
if new_capacity != rel_size_cache.capacity() {
|
||||
rel_size_cache.set_capacity(new_capacity);
|
||||
}
|
||||
}
|
||||
|
||||
self.metrics
|
||||
.evictions_with_low_residence_duration
|
||||
.write()
|
||||
@@ -2878,6 +2877,14 @@ impl Timeline {
|
||||
ancestor_gc_info.insert_child(timeline_id, metadata.ancestor_lsn(), is_offloaded);
|
||||
}
|
||||
|
||||
let relsize_snapshot_cache_capacity = {
|
||||
let loaded_tenant_conf = tenant_conf.load();
|
||||
loaded_tenant_conf
|
||||
.tenant_conf
|
||||
.relsize_snapshot_cache_capacity
|
||||
.unwrap_or(conf.default_tenant_conf.relsize_snapshot_cache_capacity)
|
||||
};
|
||||
|
||||
Arc::new_cyclic(|myself| {
|
||||
let metrics = Arc::new(TimelineMetrics::new(
|
||||
&tenant_shard_id,
|
||||
@@ -2969,10 +2976,8 @@ impl Timeline {
|
||||
last_image_layer_creation_check_instant: Mutex::new(None),
|
||||
|
||||
last_received_wal: Mutex::new(None),
|
||||
rel_size_cache: RwLock::new(RelSizeCache {
|
||||
complete_as_of: disk_consistent_lsn,
|
||||
map: HashMap::new(),
|
||||
}),
|
||||
rel_size_latest_cache: RwLock::new(HashMap::new()),
|
||||
rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)),
|
||||
|
||||
download_all_remote_layers_task_info: RwLock::new(None),
|
||||
|
||||
|
||||
@@ -1684,31 +1684,31 @@ mod tests {
|
||||
// The relation was created at LSN 2, not visible at LSN 1 yet.
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
|
||||
.await?,
|
||||
false
|
||||
);
|
||||
assert!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
|
||||
.await?,
|
||||
3
|
||||
);
|
||||
@@ -1719,7 +1719,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
0,
|
||||
Version::Lsn(Lsn(0x20)),
|
||||
Version::at(Lsn(0x20)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1733,7 +1733,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
0,
|
||||
Version::Lsn(Lsn(0x30)),
|
||||
Version::at(Lsn(0x30)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1747,7 +1747,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
0,
|
||||
Version::Lsn(Lsn(0x40)),
|
||||
Version::at(Lsn(0x40)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1760,7 +1760,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
1,
|
||||
Version::Lsn(Lsn(0x40)),
|
||||
Version::at(Lsn(0x40)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1774,7 +1774,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
0,
|
||||
Version::Lsn(Lsn(0x50)),
|
||||
Version::at(Lsn(0x50)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1787,7 +1787,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
1,
|
||||
Version::Lsn(Lsn(0x50)),
|
||||
Version::at(Lsn(0x50)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1800,7 +1800,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
2,
|
||||
Version::Lsn(Lsn(0x50)),
|
||||
Version::at(Lsn(0x50)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1820,7 +1820,7 @@ mod tests {
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
|
||||
.await?,
|
||||
2
|
||||
);
|
||||
@@ -1829,7 +1829,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
0,
|
||||
Version::Lsn(Lsn(0x60)),
|
||||
Version::at(Lsn(0x60)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1842,7 +1842,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
1,
|
||||
Version::Lsn(Lsn(0x60)),
|
||||
Version::at(Lsn(0x60)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1854,7 +1854,7 @@ mod tests {
|
||||
// should still see the truncated block with older LSN
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
|
||||
.await?,
|
||||
3
|
||||
);
|
||||
@@ -1863,7 +1863,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
2,
|
||||
Version::Lsn(Lsn(0x50)),
|
||||
Version::at(Lsn(0x50)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1880,7 +1880,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x68)), &ctx)
|
||||
.await?,
|
||||
0
|
||||
);
|
||||
@@ -1893,7 +1893,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x70)), &ctx)
|
||||
.await?,
|
||||
2
|
||||
);
|
||||
@@ -1902,7 +1902,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
0,
|
||||
Version::Lsn(Lsn(0x70)),
|
||||
Version::at(Lsn(0x70)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1915,7 +1915,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
1,
|
||||
Version::Lsn(Lsn(0x70)),
|
||||
Version::at(Lsn(0x70)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1932,7 +1932,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
|
||||
.await?,
|
||||
1501
|
||||
);
|
||||
@@ -1942,7 +1942,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
blk,
|
||||
Version::Lsn(Lsn(0x80)),
|
||||
Version::at(Lsn(0x80)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1956,7 +1956,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
1500,
|
||||
Version::Lsn(Lsn(0x80)),
|
||||
Version::at(Lsn(0x80)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -1990,13 +1990,13 @@ mod tests {
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
@@ -2011,7 +2011,7 @@ mod tests {
|
||||
// Check that rel is not visible anymore
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x30)), &ctx)
|
||||
.await?,
|
||||
false
|
||||
);
|
||||
@@ -2029,13 +2029,13 @@ mod tests {
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x40)), &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
@@ -2077,26 +2077,26 @@ mod tests {
|
||||
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
|
||||
.await?,
|
||||
false
|
||||
);
|
||||
assert!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x10)), &ctx)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x20)), &ctx)
|
||||
.await?,
|
||||
relsize
|
||||
);
|
||||
@@ -2110,7 +2110,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
blkno,
|
||||
Version::Lsn(lsn),
|
||||
Version::at(lsn),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -2131,7 +2131,7 @@ mod tests {
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x60)), &ctx)
|
||||
.await?,
|
||||
1
|
||||
);
|
||||
@@ -2144,7 +2144,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
blkno,
|
||||
Version::Lsn(Lsn(0x60)),
|
||||
Version::at(Lsn(0x60)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -2157,7 +2157,7 @@ mod tests {
|
||||
// should still see all blocks with older LSN
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x50)), &ctx)
|
||||
.await?,
|
||||
relsize
|
||||
);
|
||||
@@ -2169,7 +2169,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
blkno,
|
||||
Version::Lsn(Lsn(0x50)),
|
||||
Version::at(Lsn(0x50)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -2193,13 +2193,13 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
|
||||
.get_rel_exists(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
|
||||
.await?,
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(0x80)), &ctx)
|
||||
.await?,
|
||||
relsize
|
||||
);
|
||||
@@ -2212,7 +2212,7 @@ mod tests {
|
||||
.get_rel_page_at_lsn(
|
||||
TESTREL_A,
|
||||
blkno,
|
||||
Version::Lsn(Lsn(0x80)),
|
||||
Version::at(Lsn(0x80)),
|
||||
&ctx,
|
||||
io_concurrency.clone()
|
||||
)
|
||||
@@ -2250,7 +2250,7 @@ mod tests {
|
||||
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
|
||||
.await?,
|
||||
RELSEG_SIZE + 1
|
||||
);
|
||||
@@ -2264,7 +2264,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
|
||||
.await?,
|
||||
RELSEG_SIZE
|
||||
);
|
||||
@@ -2279,7 +2279,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
|
||||
.await?,
|
||||
RELSEG_SIZE - 1
|
||||
);
|
||||
@@ -2297,7 +2297,7 @@ mod tests {
|
||||
m.commit(&ctx).await?;
|
||||
assert_eq!(
|
||||
tline
|
||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), &ctx)
|
||||
.get_rel_size(TESTREL_A, Version::at(Lsn(lsn)), &ctx)
|
||||
.await?,
|
||||
size as BlockNumber
|
||||
);
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::hash::BuildHasher;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::{array, env, fmt, io};
|
||||
use std::{env, io};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use indexmap::IndexSet;
|
||||
use opentelemetry::trace::TraceContextExt;
|
||||
use scopeguard::defer;
|
||||
use serde::ser::{SerializeMap, Serializer};
|
||||
use tracing::subscriber::Interest;
|
||||
use tracing::{Event, Metadata, Span, Subscriber, callsite, span};
|
||||
@@ -19,7 +17,6 @@ use tracing_subscriber::fmt::{FormatEvent, FormatFields};
|
||||
use tracing_subscriber::layer::{Context, Layer};
|
||||
use tracing_subscriber::prelude::*;
|
||||
use tracing_subscriber::registry::{LookupSpan, SpanRef};
|
||||
use try_lock::TryLock;
|
||||
|
||||
/// Initialize logging and OpenTelemetry tracing and exporter.
|
||||
///
|
||||
@@ -55,7 +52,7 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
|
||||
StderrWriter {
|
||||
stderr: std::io::stderr(),
|
||||
},
|
||||
["request_id", "session_id", "conn_id"],
|
||||
&["request_id", "session_id", "conn_id"],
|
||||
))
|
||||
} else {
|
||||
None
|
||||
@@ -183,50 +180,65 @@ impl Clock for RealClock {
|
||||
/// Name of the field used by tracing crate to store the event message.
|
||||
const MESSAGE_FIELD: &str = "message";
|
||||
|
||||
/// Tracing used to enforce that spans/events have no more than 32 fields.
|
||||
/// It seems this is no longer the case, but it's still documented in some places.
|
||||
/// Generally, we shouldn't expect more than 32 fields anyway, so we can try and
|
||||
/// rely on it for some (minor) performance gains.
|
||||
const MAX_TRACING_FIELDS: usize = 32;
|
||||
|
||||
thread_local! {
|
||||
/// Protects against deadlocks and double panics during log writing.
|
||||
/// The current panic handler will use tracing to log panic information.
|
||||
static REENTRANCY_GUARD: Cell<bool> = const { Cell::new(false) };
|
||||
/// Thread-local instance with per-thread buffer for log writing.
|
||||
static EVENT_FORMATTER: RefCell<EventFormatter> = RefCell::new(EventFormatter::new());
|
||||
static EVENT_FORMATTER: RefCell<EventFormatter> = const { RefCell::new(EventFormatter::new()) };
|
||||
/// Cached OS thread ID.
|
||||
static THREAD_ID: u64 = gettid::gettid();
|
||||
}
|
||||
|
||||
/// Map for values fixed at callsite registration.
|
||||
// We use papaya here because registration rarely happens post-startup.
|
||||
// papaya is good for read-heavy workloads.
|
||||
//
|
||||
// We use rustc_hash here because callsite::Identifier will always be an integer with low-bit entropy,
|
||||
// since it's always a pointer to static mutable data. rustc_hash was designed for low-bit entropy.
|
||||
type CallsiteMap<T> =
|
||||
papaya::HashMap<callsite::Identifier, T, std::hash::BuildHasherDefault<rustc_hash::FxHasher>>;
|
||||
|
||||
/// Implements tracing layer to handle events specific to logging.
|
||||
struct JsonLoggingLayer<C: Clock, W: MakeWriter, const F: usize> {
|
||||
struct JsonLoggingLayer<C: Clock, W: MakeWriter> {
|
||||
clock: C,
|
||||
skipped_field_indices: papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
|
||||
callsite_ids: papaya::HashMap<callsite::Identifier, CallsiteId>,
|
||||
writer: W,
|
||||
// We use a const generic and arrays to bypass one heap allocation.
|
||||
extract_fields: IndexSet<&'static str>,
|
||||
_marker: std::marker::PhantomData<[&'static str; F]>,
|
||||
|
||||
/// tracks which fields of each **event** are duplicates
|
||||
skipped_field_indices: CallsiteMap<SkippedFieldIndices>,
|
||||
|
||||
span_info: CallsiteMap<CallsiteSpanInfo>,
|
||||
|
||||
/// Fields we want to keep track of in a separate json object.
|
||||
extract_fields: &'static [&'static str],
|
||||
}
|
||||
|
||||
impl<C: Clock, W: MakeWriter, const F: usize> JsonLoggingLayer<C, W, F> {
|
||||
fn new(clock: C, writer: W, extract_fields: [&'static str; F]) -> Self {
|
||||
impl<C: Clock, W: MakeWriter> JsonLoggingLayer<C, W> {
|
||||
fn new(clock: C, writer: W, extract_fields: &'static [&'static str]) -> Self {
|
||||
JsonLoggingLayer {
|
||||
clock,
|
||||
skipped_field_indices: papaya::HashMap::default(),
|
||||
callsite_ids: papaya::HashMap::default(),
|
||||
skipped_field_indices: CallsiteMap::default(),
|
||||
span_info: CallsiteMap::default(),
|
||||
writer,
|
||||
extract_fields: IndexSet::from_iter(extract_fields),
|
||||
_marker: std::marker::PhantomData,
|
||||
extract_fields,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn callsite_id(&self, cs: callsite::Identifier) -> CallsiteId {
|
||||
*self
|
||||
.callsite_ids
|
||||
fn span_info(&self, metadata: &'static Metadata<'static>) -> CallsiteSpanInfo {
|
||||
self.span_info
|
||||
.pin()
|
||||
.get_or_insert_with(cs, CallsiteId::next)
|
||||
.get_or_insert_with(metadata.callsite(), || {
|
||||
CallsiteSpanInfo::new(metadata, self.extract_fields)
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, C: Clock + 'static, W: MakeWriter + 'static, const F: usize> Layer<S>
|
||||
for JsonLoggingLayer<C, W, F>
|
||||
impl<S, C: Clock + 'static, W: MakeWriter + 'static> Layer<S> for JsonLoggingLayer<C, W>
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
{
|
||||
@@ -237,35 +249,25 @@ where
|
||||
// early, before OTel machinery, and add as event extension.
|
||||
let now = self.clock.now();
|
||||
|
||||
let res: io::Result<()> = REENTRANCY_GUARD.with(move |entered| {
|
||||
if entered.get() {
|
||||
let mut formatter = EventFormatter::new();
|
||||
formatter.format::<S, F>(
|
||||
now,
|
||||
event,
|
||||
&ctx,
|
||||
&self.skipped_field_indices,
|
||||
&self.callsite_ids,
|
||||
&self.extract_fields,
|
||||
)?;
|
||||
self.writer.make_writer().write_all(formatter.buffer())
|
||||
} else {
|
||||
entered.set(true);
|
||||
defer!(entered.set(false););
|
||||
let res: io::Result<()> = EVENT_FORMATTER.with(|f| {
|
||||
let mut borrow = f.try_borrow_mut();
|
||||
let formatter = match borrow.as_deref_mut() {
|
||||
Ok(formatter) => formatter,
|
||||
// If the thread local formatter is borrowed,
|
||||
// then we likely hit an edge case were we panicked during formatting.
|
||||
// We allow the logging to proceed with an uncached formatter.
|
||||
Err(_) => &mut EventFormatter::new(),
|
||||
};
|
||||
|
||||
EVENT_FORMATTER.with_borrow_mut(move |formatter| {
|
||||
formatter.reset();
|
||||
formatter.format::<S, F>(
|
||||
now,
|
||||
event,
|
||||
&ctx,
|
||||
&self.skipped_field_indices,
|
||||
&self.callsite_ids,
|
||||
&self.extract_fields,
|
||||
)?;
|
||||
self.writer.make_writer().write_all(formatter.buffer())
|
||||
})
|
||||
}
|
||||
formatter.reset();
|
||||
formatter.format(
|
||||
now,
|
||||
event,
|
||||
&ctx,
|
||||
&self.skipped_field_indices,
|
||||
self.extract_fields,
|
||||
)?;
|
||||
self.writer.make_writer().write_all(formatter.buffer())
|
||||
});
|
||||
|
||||
// In case logging fails we generate a simpler JSON object.
|
||||
@@ -287,50 +289,48 @@ where
|
||||
/// Registers a SpanFields instance as span extension.
|
||||
fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
|
||||
let span = ctx.span(id).expect("span must exist");
|
||||
let fields = SpanFields::default();
|
||||
fields.record_fields(attrs);
|
||||
|
||||
// This could deadlock when there's a panic somewhere in the tracing
|
||||
// event handling and a read or write guard is still held. This includes
|
||||
// the OTel subscriber.
|
||||
let mut exts = span.extensions_mut();
|
||||
let mut fields = SpanFields::new(self.span_info(span.metadata()));
|
||||
attrs.record(&mut fields);
|
||||
|
||||
exts.insert(fields);
|
||||
// This is a new span: the extensions should not be locked
|
||||
// unless some layer spawned a thread to process this span.
|
||||
// I don't think any layers do that.
|
||||
span.extensions_mut().insert(fields);
|
||||
}
|
||||
|
||||
fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
|
||||
let span = ctx.span(id).expect("span must exist");
|
||||
let ext = span.extensions();
|
||||
if let Some(data) = ext.get::<SpanFields>() {
|
||||
data.record_fields(values);
|
||||
|
||||
// assumption: `on_record` is rarely called.
|
||||
// assumption: a span being updated by one thread,
|
||||
// and formatted by another thread is even rarer.
|
||||
let mut ext = span.extensions_mut();
|
||||
if let Some(fields) = ext.get_mut::<SpanFields>() {
|
||||
values.record(fields);
|
||||
}
|
||||
}
|
||||
|
||||
/// Called (lazily) whenever a new log call is executed. We quickly check
|
||||
/// for duplicate field names and record duplicates as skippable. Last one
|
||||
/// wins.
|
||||
/// Called (lazily) roughly once per event/span instance. We quickly check
|
||||
/// for duplicate field names and record duplicates as skippable. Last field wins.
|
||||
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
|
||||
debug_assert!(
|
||||
metadata.fields().len() <= MAX_TRACING_FIELDS,
|
||||
"callsite {metadata:?} has too many fields."
|
||||
);
|
||||
|
||||
if !metadata.is_event() {
|
||||
self.callsite_id(metadata.callsite());
|
||||
// register the span info.
|
||||
self.span_info(metadata);
|
||||
// Must not be never because we wouldn't get trace and span data.
|
||||
return Interest::always();
|
||||
}
|
||||
|
||||
let mut field_indices = SkippedFieldIndices::default();
|
||||
let mut seen_fields = HashMap::<&'static str, usize>::new();
|
||||
let mut seen_fields = HashMap::new();
|
||||
for field in metadata.fields() {
|
||||
use std::collections::hash_map::Entry;
|
||||
match seen_fields.entry(field.name()) {
|
||||
Entry::Vacant(entry) => {
|
||||
// field not seen yet
|
||||
entry.insert(field.index());
|
||||
}
|
||||
Entry::Occupied(mut entry) => {
|
||||
// replace currently stored index
|
||||
let old_index = entry.insert(field.index());
|
||||
// ... and append it to list of skippable indices
|
||||
field_indices.push(old_index);
|
||||
}
|
||||
if let Some(old_index) = seen_fields.insert(field.name(), field.index()) {
|
||||
field_indices.set(old_index);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -344,110 +344,113 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Default)]
|
||||
#[repr(transparent)]
|
||||
struct CallsiteId(u32);
|
||||
/// Any span info that is fixed to a particular callsite. Not variable between span instances.
|
||||
#[derive(Clone)]
|
||||
struct CallsiteSpanInfo {
|
||||
/// index of each field to extract. usize::MAX if not found.
|
||||
extract: Arc<[usize]>,
|
||||
|
||||
impl CallsiteId {
|
||||
#[inline]
|
||||
fn next() -> Self {
|
||||
// Start at 1 to reserve 0 for default.
|
||||
static COUNTER: AtomicU32 = AtomicU32::new(1);
|
||||
CallsiteId(COUNTER.fetch_add(1, Ordering::Relaxed))
|
||||
}
|
||||
/// tracks the fixed "callsite ID" for each span.
|
||||
/// note: this is not stable between runs.
|
||||
normalized_name: Arc<str>,
|
||||
}
|
||||
|
||||
impl fmt::Display for CallsiteId {
|
||||
#[inline]
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.0.fmt(f)
|
||||
impl CallsiteSpanInfo {
|
||||
fn new(metadata: &'static Metadata<'static>, extract_fields: &[&'static str]) -> Self {
|
||||
// Start at 1 to reserve 0 for default.
|
||||
static COUNTER: AtomicU32 = AtomicU32::new(1);
|
||||
|
||||
let names: Vec<&'static str> = metadata.fields().iter().map(|f| f.name()).collect();
|
||||
|
||||
// get all the indices of span fields we want to focus
|
||||
let extract = extract_fields
|
||||
.iter()
|
||||
// use rposition, since we want last match wins.
|
||||
.map(|f1| names.iter().rposition(|f2| f1 == f2).unwrap_or(usize::MAX))
|
||||
.collect();
|
||||
|
||||
// normalized_name is unique for each callsite, but it is not
|
||||
// unified across separate proxy instances.
|
||||
// todo: can we do better here?
|
||||
let cid = COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
let normalized_name = format!("{}#{cid}", metadata.name()).into();
|
||||
|
||||
Self {
|
||||
extract,
|
||||
normalized_name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stores span field values recorded during the spans lifetime.
|
||||
#[derive(Default)]
|
||||
struct SpanFields {
|
||||
// TODO: Switch to custom enum with lasso::Spur for Strings?
|
||||
fields: papaya::HashMap<&'static str, serde_json::Value>,
|
||||
values: [serde_json::Value; MAX_TRACING_FIELDS],
|
||||
|
||||
/// cached span info so we can avoid extra hashmap lookups in the hot path.
|
||||
span_info: CallsiteSpanInfo,
|
||||
}
|
||||
|
||||
impl SpanFields {
|
||||
#[inline]
|
||||
fn record_fields<R: tracing_subscriber::field::RecordFields>(&self, fields: R) {
|
||||
fields.record(&mut SpanFieldsRecorder {
|
||||
fields: self.fields.pin(),
|
||||
});
|
||||
fn new(span_info: CallsiteSpanInfo) -> Self {
|
||||
Self {
|
||||
span_info,
|
||||
values: [const { serde_json::Value::Null }; MAX_TRACING_FIELDS],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements a tracing field visitor to convert and store values.
|
||||
struct SpanFieldsRecorder<'m, S, G> {
|
||||
fields: papaya::HashMapRef<'m, &'static str, serde_json::Value, S, G>,
|
||||
}
|
||||
|
||||
impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecorder<'_, S, G> {
|
||||
impl tracing::field::Visit for SpanFields {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_i128(&mut self, field: &tracing::field::Field, value: i128) {
|
||||
if let Ok(value) = i64::try_from(value) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
} else {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value}")));
|
||||
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_u128(&mut self, field: &tracing::field::Field, value: u128) {
|
||||
if let Ok(value) = u64::try_from(value) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
} else {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value}")));
|
||||
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_bytes(&mut self, field: &tracing::field::Field, value: &[u8]) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(value));
|
||||
self.values[field.index()] = serde_json::Value::from(value);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value:?}")));
|
||||
self.values[field.index()] = serde_json::Value::from(format!("{value:?}"));
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -456,38 +459,33 @@ impl<S: BuildHasher, G: papaya::Guard> tracing::field::Visit for SpanFieldsRecor
|
||||
field: &tracing::field::Field,
|
||||
value: &(dyn std::error::Error + 'static),
|
||||
) {
|
||||
self.fields
|
||||
.insert(field.name(), serde_json::Value::from(format!("{value}")));
|
||||
self.values[field.index()] = serde_json::Value::from(format!("{value}"));
|
||||
}
|
||||
}
|
||||
|
||||
/// List of field indices skipped during logging. Can list duplicate fields or
|
||||
/// metafields not meant to be logged.
|
||||
#[derive(Clone, Default)]
|
||||
#[derive(Copy, Clone, Default)]
|
||||
struct SkippedFieldIndices {
|
||||
bits: u64,
|
||||
// 32-bits is large enough for `MAX_TRACING_FIELDS`
|
||||
bits: u32,
|
||||
}
|
||||
|
||||
impl SkippedFieldIndices {
|
||||
#[inline]
|
||||
fn is_empty(&self) -> bool {
|
||||
fn is_empty(self) -> bool {
|
||||
self.bits == 0
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn push(&mut self, index: usize) {
|
||||
self.bits |= 1u64
|
||||
.checked_shl(index as u32)
|
||||
.expect("field index too large");
|
||||
fn set(&mut self, index: usize) {
|
||||
debug_assert!(index <= 32, "index out of bounds of 32-bit set");
|
||||
self.bits |= 1 << index;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn contains(&self, index: usize) -> bool {
|
||||
self.bits
|
||||
& 1u64
|
||||
.checked_shl(index as u32)
|
||||
.expect("field index too large")
|
||||
!= 0
|
||||
fn contains(self, index: usize) -> bool {
|
||||
self.bits & (1 << index) != 0
|
||||
}
|
||||
}
|
||||
|
||||
@@ -499,7 +497,7 @@ struct EventFormatter {
|
||||
|
||||
impl EventFormatter {
|
||||
#[inline]
|
||||
fn new() -> Self {
|
||||
const fn new() -> Self {
|
||||
EventFormatter {
|
||||
logline_buffer: Vec::new(),
|
||||
}
|
||||
@@ -515,14 +513,13 @@ impl EventFormatter {
|
||||
self.logline_buffer.clear();
|
||||
}
|
||||
|
||||
fn format<S, const F: usize>(
|
||||
fn format<S>(
|
||||
&mut self,
|
||||
now: DateTime<Utc>,
|
||||
event: &Event<'_>,
|
||||
ctx: &Context<'_, S>,
|
||||
skipped_field_indices: &papaya::HashMap<callsite::Identifier, SkippedFieldIndices>,
|
||||
callsite_ids: &papaya::HashMap<callsite::Identifier, CallsiteId>,
|
||||
extract_fields: &IndexSet<&'static str>,
|
||||
skipped_field_indices: &CallsiteMap<SkippedFieldIndices>,
|
||||
extract_fields: &'static [&'static str],
|
||||
) -> io::Result<()>
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
@@ -533,8 +530,11 @@ impl EventFormatter {
|
||||
let normalized_meta = event.normalized_metadata();
|
||||
let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata());
|
||||
|
||||
let skipped_field_indices = skipped_field_indices.pin();
|
||||
let skipped_field_indices = skipped_field_indices.get(&meta.callsite());
|
||||
let skipped_field_indices = skipped_field_indices
|
||||
.pin()
|
||||
.get(&meta.callsite())
|
||||
.copied()
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut serialize = || {
|
||||
let mut serializer = serde_json::Serializer::new(&mut self.logline_buffer);
|
||||
@@ -565,9 +565,11 @@ impl EventFormatter {
|
||||
}
|
||||
|
||||
let spans = SerializableSpans {
|
||||
ctx,
|
||||
callsite_ids,
|
||||
extract: ExtractedSpanFields::<'_, F>::new(extract_fields),
|
||||
// collect all spans from parent to root.
|
||||
spans: ctx
|
||||
.event_span(event)
|
||||
.map_or(vec![], |parent| parent.scope().collect()),
|
||||
extracted: ExtractedSpanFields::new(extract_fields),
|
||||
};
|
||||
serializer.serialize_entry("spans", &spans)?;
|
||||
|
||||
@@ -620,9 +622,9 @@ impl EventFormatter {
|
||||
}
|
||||
}
|
||||
|
||||
if spans.extract.has_values() {
|
||||
if spans.extracted.has_values() {
|
||||
// TODO: add fields from event, too?
|
||||
serializer.serialize_entry("extract", &spans.extract)?;
|
||||
serializer.serialize_entry("extract", &spans.extracted)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
@@ -635,15 +637,15 @@ impl EventFormatter {
|
||||
}
|
||||
|
||||
/// Extracts the message field that's mixed will other fields.
|
||||
struct MessageFieldExtractor<'a, S: serde::ser::SerializeMap> {
|
||||
struct MessageFieldExtractor<S: serde::ser::SerializeMap> {
|
||||
serializer: S,
|
||||
skipped_field_indices: Option<&'a SkippedFieldIndices>,
|
||||
skipped_field_indices: SkippedFieldIndices,
|
||||
state: Option<Result<(), S::Error>>,
|
||||
}
|
||||
|
||||
impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
|
||||
impl<S: serde::ser::SerializeMap> MessageFieldExtractor<S> {
|
||||
#[inline]
|
||||
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
|
||||
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self {
|
||||
Self {
|
||||
serializer,
|
||||
skipped_field_indices,
|
||||
@@ -665,13 +667,11 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldExtractor<'a, S> {
|
||||
fn accept_field(&self, field: &tracing::field::Field) -> bool {
|
||||
self.state.is_none()
|
||||
&& field.name() == MESSAGE_FIELD
|
||||
&& !self
|
||||
.skipped_field_indices
|
||||
.is_some_and(|i| i.contains(field.index()))
|
||||
&& !self.skipped_field_indices.contains(field.index())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<'_, S> {
|
||||
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtractor<S> {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
if self.accept_field(field) {
|
||||
@@ -751,14 +751,14 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldExtracto
|
||||
/// can be skipped.
|
||||
// This is entirely optional and only cosmetic, though maybe helps a
|
||||
// bit during log parsing in dashboards when there's no field with empty object.
|
||||
struct FieldsPresent<'a>(pub bool, Option<&'a SkippedFieldIndices>);
|
||||
struct FieldsPresent(pub bool, SkippedFieldIndices);
|
||||
|
||||
// Even though some methods have an overhead (error, bytes) it is assumed the
|
||||
// compiler won't include this since we ignore the value entirely.
|
||||
impl tracing::field::Visit for FieldsPresent<'_> {
|
||||
impl tracing::field::Visit for FieldsPresent {
|
||||
#[inline]
|
||||
fn record_debug(&mut self, field: &tracing::field::Field, _: &dyn std::fmt::Debug) {
|
||||
if !self.1.is_some_and(|i| i.contains(field.index()))
|
||||
if !self.1.contains(field.index())
|
||||
&& field.name() != MESSAGE_FIELD
|
||||
&& !field.name().starts_with("log.")
|
||||
{
|
||||
@@ -768,10 +768,7 @@ impl tracing::field::Visit for FieldsPresent<'_> {
|
||||
}
|
||||
|
||||
/// Serializes the fields directly supplied with a log event.
|
||||
struct SerializableEventFields<'a, 'event>(
|
||||
&'a tracing::Event<'event>,
|
||||
Option<&'a SkippedFieldIndices>,
|
||||
);
|
||||
struct SerializableEventFields<'a, 'event>(&'a tracing::Event<'event>, SkippedFieldIndices);
|
||||
|
||||
impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
@@ -788,15 +785,15 @@ impl serde::ser::Serialize for SerializableEventFields<'_, '_> {
|
||||
}
|
||||
|
||||
/// A tracing field visitor that skips the message field.
|
||||
struct MessageFieldSkipper<'a, S: serde::ser::SerializeMap> {
|
||||
struct MessageFieldSkipper<S: serde::ser::SerializeMap> {
|
||||
serializer: S,
|
||||
skipped_field_indices: Option<&'a SkippedFieldIndices>,
|
||||
skipped_field_indices: SkippedFieldIndices,
|
||||
state: Result<(), S::Error>,
|
||||
}
|
||||
|
||||
impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
|
||||
impl<S: serde::ser::SerializeMap> MessageFieldSkipper<S> {
|
||||
#[inline]
|
||||
fn new(serializer: S, skipped_field_indices: Option<&'a SkippedFieldIndices>) -> Self {
|
||||
fn new(serializer: S, skipped_field_indices: SkippedFieldIndices) -> Self {
|
||||
Self {
|
||||
serializer,
|
||||
skipped_field_indices,
|
||||
@@ -809,9 +806,7 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
|
||||
self.state.is_ok()
|
||||
&& field.name() != MESSAGE_FIELD
|
||||
&& !field.name().starts_with("log.")
|
||||
&& !self
|
||||
.skipped_field_indices
|
||||
.is_some_and(|i| i.contains(field.index()))
|
||||
&& !self.skipped_field_indices.contains(field.index())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
@@ -821,7 +816,7 @@ impl<'a, S: serde::ser::SerializeMap> MessageFieldSkipper<'a, S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<'_, S> {
|
||||
impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<S> {
|
||||
#[inline]
|
||||
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
|
||||
if self.accept_field(field) {
|
||||
@@ -905,18 +900,17 @@ impl<S: serde::ser::SerializeMap> tracing::field::Visit for MessageFieldSkipper<
|
||||
/// with the span names as keys. To prevent collision we append a numberic value
|
||||
/// to the name. Also, collects any span fields we're interested in. Last one
|
||||
/// wins.
|
||||
struct SerializableSpans<'a, 'ctx, Span, const F: usize>
|
||||
struct SerializableSpans<'ctx, S>
|
||||
where
|
||||
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
|
||||
S: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
ctx: &'a Context<'ctx, Span>,
|
||||
callsite_ids: &'a papaya::HashMap<callsite::Identifier, CallsiteId>,
|
||||
extract: ExtractedSpanFields<'a, F>,
|
||||
spans: Vec<SpanRef<'ctx, S>>,
|
||||
extracted: ExtractedSpanFields,
|
||||
}
|
||||
|
||||
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpans<'_, '_, Span, F>
|
||||
impl<S> serde::ser::Serialize for SerializableSpans<'_, S>
|
||||
where
|
||||
Span: Subscriber + for<'lookup> LookupSpan<'lookup>,
|
||||
S: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
fn serialize<Ser>(&self, serializer: Ser) -> Result<Ser::Ok, Ser::Error>
|
||||
where
|
||||
@@ -924,25 +918,22 @@ where
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
if let Some(leaf_span) = self.ctx.lookup_current() {
|
||||
for span in leaf_span.scope().from_root() {
|
||||
// Append a numeric callsite ID to the span name to keep the name unique
|
||||
// in the JSON object.
|
||||
let cid = self
|
||||
.callsite_ids
|
||||
.pin()
|
||||
.get(&span.metadata().callsite())
|
||||
.copied()
|
||||
.unwrap_or_default();
|
||||
for span in self.spans.iter().rev() {
|
||||
let ext = span.extensions();
|
||||
|
||||
// Loki turns the # into an underscore during field name concatenation.
|
||||
serializer.serialize_key(&format_args!("{}#{}", span.metadata().name(), &cid))?;
|
||||
// all spans should have this extension.
|
||||
let Some(fields) = ext.get() else { continue };
|
||||
|
||||
serializer.serialize_value(&SerializableSpanFields {
|
||||
span: &span,
|
||||
extract: &self.extract,
|
||||
})?;
|
||||
}
|
||||
self.extracted.layer_span(fields);
|
||||
|
||||
let SpanFields { values, span_info } = fields;
|
||||
serializer.serialize_entry(
|
||||
&*span_info.normalized_name,
|
||||
&SerializableSpanFields {
|
||||
fields: span.metadata().fields(),
|
||||
values,
|
||||
},
|
||||
)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
@@ -950,80 +941,77 @@ where
|
||||
}
|
||||
|
||||
/// Serializes the span fields as object.
|
||||
struct SerializableSpanFields<'a, 'span, Span, const F: usize>
|
||||
where
|
||||
Span: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
span: &'a SpanRef<'span, Span>,
|
||||
extract: &'a ExtractedSpanFields<'a, F>,
|
||||
struct SerializableSpanFields<'span> {
|
||||
fields: &'span tracing::field::FieldSet,
|
||||
values: &'span [serde_json::Value; MAX_TRACING_FIELDS],
|
||||
}
|
||||
|
||||
impl<Span, const F: usize> serde::ser::Serialize for SerializableSpanFields<'_, '_, Span, F>
|
||||
where
|
||||
Span: for<'lookup> LookupSpan<'lookup>,
|
||||
{
|
||||
impl serde::ser::Serialize for SerializableSpanFields<'_> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
let ext = self.span.extensions();
|
||||
if let Some(data) = ext.get::<SpanFields>() {
|
||||
for (name, value) in &data.fields.pin() {
|
||||
serializer.serialize_entry(name, value)?;
|
||||
// TODO: replace clone with reference, if possible.
|
||||
self.extract.set(name, value.clone());
|
||||
for (field, value) in std::iter::zip(self.fields, self.values) {
|
||||
if value.is_null() {
|
||||
continue;
|
||||
}
|
||||
serializer.serialize_entry(field.name(), value)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
}
|
||||
}
|
||||
|
||||
struct ExtractedSpanFields<'a, const F: usize> {
|
||||
names: &'a IndexSet<&'static str>,
|
||||
// TODO: replace TryLock with something local thread and interior mutability.
|
||||
// serde API doesn't let us use `mut`.
|
||||
values: TryLock<([Option<serde_json::Value>; F], bool)>,
|
||||
struct ExtractedSpanFields {
|
||||
names: &'static [&'static str],
|
||||
values: RefCell<Vec<serde_json::Value>>,
|
||||
}
|
||||
|
||||
impl<'a, const F: usize> ExtractedSpanFields<'a, F> {
|
||||
fn new(names: &'a IndexSet<&'static str>) -> Self {
|
||||
impl ExtractedSpanFields {
|
||||
fn new(names: &'static [&'static str]) -> Self {
|
||||
ExtractedSpanFields {
|
||||
names,
|
||||
values: TryLock::new((array::from_fn(|_| Option::default()), false)),
|
||||
values: RefCell::new(vec![serde_json::Value::Null; names.len()]),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn set(&self, name: &'static str, value: serde_json::Value) {
|
||||
if let Some((index, _)) = self.names.get_full(name) {
|
||||
let mut fields = self.values.try_lock().expect("thread-local use");
|
||||
fields.0[index] = Some(value);
|
||||
fields.1 = true;
|
||||
fn layer_span(&self, fields: &SpanFields) {
|
||||
let mut v = self.values.borrow_mut();
|
||||
let SpanFields { values, span_info } = fields;
|
||||
|
||||
// extract the fields
|
||||
for (i, &j) in span_info.extract.iter().enumerate() {
|
||||
let Some(value) = values.get(j) else { continue };
|
||||
|
||||
if !value.is_null() {
|
||||
// TODO: replace clone with reference, if possible.
|
||||
v[i] = value.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn has_values(&self) -> bool {
|
||||
self.values.try_lock().expect("thread-local use").1
|
||||
self.values.borrow().iter().any(|v| !v.is_null())
|
||||
}
|
||||
}
|
||||
|
||||
impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
|
||||
impl serde::ser::Serialize for ExtractedSpanFields {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::ser::Serializer,
|
||||
{
|
||||
let mut serializer = serializer.serialize_map(None)?;
|
||||
|
||||
let values = self.values.try_lock().expect("thread-local use");
|
||||
for (i, value) in values.0.iter().enumerate() {
|
||||
if let Some(value) = value {
|
||||
let key = self.names[i];
|
||||
serializer.serialize_entry(key, value)?;
|
||||
let values = self.values.borrow();
|
||||
for (key, value) in std::iter::zip(self.names, &*values) {
|
||||
if value.is_null() {
|
||||
continue;
|
||||
}
|
||||
|
||||
serializer.serialize_entry(key, value)?;
|
||||
}
|
||||
|
||||
serializer.end()
|
||||
@@ -1032,7 +1020,6 @@ impl<const F: usize> serde::ser::Serialize for ExtractedSpanFields<'_, F> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
|
||||
use assert_json_diff::assert_json_eq;
|
||||
@@ -1081,10 +1068,9 @@ mod tests {
|
||||
let log_layer = JsonLoggingLayer {
|
||||
clock: clock.clone(),
|
||||
skipped_field_indices: papaya::HashMap::default(),
|
||||
callsite_ids: papaya::HashMap::default(),
|
||||
span_info: papaya::HashMap::default(),
|
||||
writer: buffer.clone(),
|
||||
extract_fields: IndexSet::from_iter(["x"]),
|
||||
_marker: PhantomData::<[&'static str; 1]>,
|
||||
extract_fields: &["x"],
|
||||
};
|
||||
|
||||
let registry = tracing_subscriber::Registry::default().with(log_layer);
|
||||
|
||||
@@ -13,22 +13,19 @@ pub(crate) struct Pbkdf2 {
|
||||
// inspired from <https://github.com/neondatabase/rust-postgres/blob/20031d7a9ee1addeae6e0968e3899ae6bf01cee2/postgres-protocol/src/authentication/sasl.rs#L36-L61>
|
||||
impl Pbkdf2 {
|
||||
pub(crate) fn start(str: &[u8], salt: &[u8], iterations: u32) -> Self {
|
||||
let hmac =
|
||||
// key the HMAC and derive the first block in-place
|
||||
let mut hmac =
|
||||
Hmac::<Sha256>::new_from_slice(str).expect("HMAC is able to accept all key sizes");
|
||||
|
||||
let prev = hmac
|
||||
.clone()
|
||||
.chain_update(salt)
|
||||
.chain_update(1u32.to_be_bytes())
|
||||
.finalize()
|
||||
.into_bytes();
|
||||
hmac.update(salt);
|
||||
hmac.update(&1u32.to_be_bytes());
|
||||
let init_block = hmac.finalize_reset().into_bytes();
|
||||
|
||||
Self {
|
||||
hmac,
|
||||
// one consumed for the hash above
|
||||
// one iteration spent above
|
||||
iterations: iterations - 1,
|
||||
hi: prev,
|
||||
prev,
|
||||
hi: init_block,
|
||||
prev: init_block,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,14 +41,17 @@ impl Pbkdf2 {
|
||||
iterations,
|
||||
} = self;
|
||||
|
||||
// only do 4096 iterations per turn before sharing the thread for fairness
|
||||
// only do up to 4096 iterations per turn for fairness
|
||||
let n = (*iterations).clamp(0, 4096);
|
||||
for _ in 0..n {
|
||||
*prev = hmac.clone().chain_update(*prev).finalize().into_bytes();
|
||||
hmac.update(prev);
|
||||
let block = hmac.finalize_reset().into_bytes();
|
||||
|
||||
for (hi, prev) in hi.iter_mut().zip(*prev) {
|
||||
*hi ^= prev;
|
||||
for (hi_byte, &b) in hi.iter_mut().zip(block.iter()) {
|
||||
*hi_byte ^= b;
|
||||
}
|
||||
|
||||
*prev = block;
|
||||
}
|
||||
|
||||
*iterations -= n;
|
||||
|
||||
@@ -187,6 +187,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"args": {"format": "bincode", "compression": {"zstd": {"level": 1}}},
|
||||
},
|
||||
"rel_size_v2_enabled": True,
|
||||
"relsize_snapshot_cache_capacity": 10000,
|
||||
"gc_compaction_enabled": True,
|
||||
"gc_compaction_verification": False,
|
||||
"gc_compaction_initial_threshold_kb": 1024000,
|
||||
|
||||
@@ -19,6 +19,16 @@ TEST_ROLE_NAMES = [
|
||||
{"name": "role$"},
|
||||
{"name": "role$$"},
|
||||
{"name": "role$x$"},
|
||||
{"name": "x"},
|
||||
{"name": "xx"},
|
||||
{"name": "$x"},
|
||||
{"name": "x$"},
|
||||
{"name": "$x$"},
|
||||
{"name": "xx$"},
|
||||
{"name": "$xx"},
|
||||
{"name": "$xx$"},
|
||||
# 63 bytes is the limit for role/DB names in Postgres
|
||||
{"name": "x" * 63},
|
||||
]
|
||||
|
||||
TEST_DB_NAMES = [
|
||||
@@ -74,6 +84,43 @@ TEST_DB_NAMES = [
|
||||
"name": "db name$x$",
|
||||
"owner": "role$x$",
|
||||
},
|
||||
{
|
||||
"name": "x",
|
||||
"owner": "x",
|
||||
},
|
||||
{
|
||||
"name": "xx",
|
||||
"owner": "xx",
|
||||
},
|
||||
{
|
||||
"name": "$x",
|
||||
"owner": "$x",
|
||||
},
|
||||
{
|
||||
"name": "x$",
|
||||
"owner": "x$",
|
||||
},
|
||||
{
|
||||
"name": "$x$",
|
||||
"owner": "$x$",
|
||||
},
|
||||
{
|
||||
"name": "xx$",
|
||||
"owner": "xx$",
|
||||
},
|
||||
{
|
||||
"name": "$xx",
|
||||
"owner": "$xx",
|
||||
},
|
||||
{
|
||||
"name": "$xx$",
|
||||
"owner": "$xx$",
|
||||
},
|
||||
# 63 bytes is the limit for role/DB names in Postgres
|
||||
{
|
||||
"name": "x" * 63,
|
||||
"owner": "x" * 63,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
@@ -146,6 +193,10 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that compute_ctl can create and work with databases and roles
|
||||
with special characters (whitespaces, %, tabs, etc.) in the name.
|
||||
Also use `drop_subscriptions_before_start: true`. We do not actually
|
||||
have any subscriptions in this test, so it should be no-op, but it
|
||||
i) simulates the case when we create a second dev branch together with
|
||||
a new project creation, and ii) just generally stresses more code paths.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
@@ -159,6 +210,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"drop_subscriptions_before_start": True,
|
||||
"cluster": {
|
||||
"roles": TEST_ROLE_NAMES,
|
||||
"databases": TEST_DB_NAMES,
|
||||
@@ -202,6 +254,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"drop_subscriptions_before_start": True,
|
||||
"cluster": {
|
||||
"roles": [],
|
||||
"databases": [],
|
||||
|
||||
@@ -27,8 +27,9 @@ from contextlib import closing
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
from fixtures.common_types import Lsn
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_for_last_flush_lsn, wait_replica_caughtup
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin, wait_for_last_flush_lsn, wait_replica_caughtup
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.utils import query_scalar, skip_on_postgres, wait_until
|
||||
|
||||
@@ -695,3 +696,110 @@ def test_replica_start_with_too_many_unused_xids(neon_simple_env: NeonEnv):
|
||||
with secondary.cursor() as secondary_cur:
|
||||
secondary_cur.execute("select count(*) from t")
|
||||
assert secondary_cur.fetchone() == (n_restarts,)
|
||||
|
||||
|
||||
def test_ephemeral_endpoints_vacuum(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
sql = """
|
||||
CREATE TABLE CHAR_TBL(f1 char(4));
|
||||
CREATE TABLE FLOAT8_TBL(f1 float8);
|
||||
CREATE TABLE INT2_TBL(f1 int2);
|
||||
CREATE TABLE INT4_TBL(f1 int4);
|
||||
CREATE TABLE INT8_TBL(q1 int8, q2 int8);
|
||||
CREATE TABLE POINT_TBL(f1 point);
|
||||
CREATE TABLE TEXT_TBL (f1 text);
|
||||
CREATE TABLE VARCHAR_TBL(f1 varchar(4));
|
||||
CREATE TABLE onek (unique1 int4);
|
||||
CREATE TABLE onek2 AS SELECT * FROM onek;
|
||||
CREATE TABLE tenk1 (unique1 int4);
|
||||
CREATE TABLE tenk2 AS SELECT * FROM tenk1;
|
||||
CREATE TABLE person (name text, age int4,location point);
|
||||
CREATE TABLE emp (salary int4, manager name) INHERITS (person);
|
||||
CREATE TABLE student (gpa float8) INHERITS (person);
|
||||
CREATE TABLE stud_emp ( percent int4) INHERITS (emp, student);
|
||||
CREATE TABLE road (name text,thepath path);
|
||||
CREATE TABLE ihighway () INHERITS (road);
|
||||
CREATE TABLE shighway(surface text) INHERITS (road);
|
||||
CREATE TABLE BOOLTBL3 (d text, b bool, o int);
|
||||
CREATE TABLE booltbl4(isfalse bool, istrue bool, isnul bool);
|
||||
DROP TABLE BOOLTBL3;
|
||||
DROP TABLE BOOLTBL4;
|
||||
CREATE TABLE ceil_floor_round (a numeric);
|
||||
DROP TABLE ceil_floor_round;
|
||||
CREATE TABLE width_bucket_test (operand_num numeric, operand_f8 float8);
|
||||
DROP TABLE width_bucket_test;
|
||||
CREATE TABLE num_input_test (n1 numeric);
|
||||
CREATE TABLE num_variance (a numeric);
|
||||
INSERT INTO num_variance VALUES (0);
|
||||
CREATE TABLE snapshot_test (nr integer, snap txid_snapshot);
|
||||
CREATE TABLE guid1(guid_field UUID, text_field TEXT DEFAULT(now()));
|
||||
CREATE TABLE guid2(guid_field UUID, text_field TEXT DEFAULT(now()));
|
||||
CREATE INDEX guid1_btree ON guid1 USING BTREE (guid_field);
|
||||
CREATE INDEX guid1_hash ON guid1 USING HASH (guid_field);
|
||||
TRUNCATE guid1;
|
||||
DROP TABLE guid1;
|
||||
DROP TABLE guid2 CASCADE;
|
||||
CREATE TABLE numrange_test (nr NUMRANGE);
|
||||
CREATE INDEX numrange_test_btree on numrange_test(nr);
|
||||
CREATE TABLE numrange_test2(nr numrange);
|
||||
CREATE INDEX numrange_test2_hash_idx on numrange_test2 using hash (nr);
|
||||
INSERT INTO numrange_test2 VALUES('[, 5)');
|
||||
CREATE TABLE textrange_test (tr text);
|
||||
CREATE INDEX textrange_test_btree on textrange_test(tr);
|
||||
CREATE TABLE test_range_gist(ir int4range);
|
||||
CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir);
|
||||
DROP INDEX test_range_gist_idx;
|
||||
CREATE INDEX test_range_gist_idx on test_range_gist using gist (ir);
|
||||
CREATE TABLE test_range_spgist(ir int4range);
|
||||
CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir);
|
||||
DROP INDEX test_range_spgist_idx;
|
||||
CREATE INDEX test_range_spgist_idx on test_range_spgist using spgist (ir);
|
||||
CREATE TABLE test_range_elem(i int4);
|
||||
CREATE INDEX test_range_elem_idx on test_range_elem (i);
|
||||
CREATE INDEX ON test_range_elem using spgist(int4range(i,i+10));
|
||||
DROP TABLE test_range_elem;
|
||||
CREATE TABLE test_range_excl(room int4range, speaker int4range, during tsrange, exclude using gist (room with =, during with &&), exclude using gist (speaker with =, during with &&));
|
||||
CREATE TABLE f_test(f text, i int);
|
||||
CREATE TABLE i8r_array (f1 int, f2 text);
|
||||
CREATE TYPE arrayrange as range (subtype=int4[]);
|
||||
CREATE TYPE two_ints as (a int, b int);
|
||||
DROP TYPE two_ints cascade;
|
||||
CREATE TABLE text_support_test (t text);
|
||||
CREATE TABLE TEMP_FLOAT (f1 FLOAT8);
|
||||
CREATE TABLE TEMP_INT4 (f1 INT4);
|
||||
CREATE TABLE TEMP_INT2 (f1 INT2);
|
||||
CREATE TABLE TEMP_GROUP (f1 INT4, f2 INT4, f3 FLOAT8);
|
||||
CREATE TABLE POLYGON_TBL(f1 polygon);
|
||||
CREATE TABLE quad_poly_tbl (id int, p polygon);
|
||||
INSERT INTO quad_poly_tbl SELECT (x - 1) * 100 + y, polygon(circle(point(x * 10, y * 10), 1 + (x + y) % 10)) FROM generate_series(1, 200) x, generate_series(1, 100) y;
|
||||
CREATE TABLE quad_poly_tbl_ord_seq2 AS SELECT 1 FROM quad_poly_tbl;
|
||||
CREATE TABLE quad_poly_tbl_ord_idx2 AS SELECT 1 FROM quad_poly_tbl;
|
||||
"""
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
env.endpoints.create_start(branch_name="main", lsn=lsn)
|
||||
log.info(f"lsn: {lsn}")
|
||||
|
||||
for line in sql.split("\n"):
|
||||
if len(line.strip()) == 0 or line.startswith("--"):
|
||||
continue
|
||||
cur.execute(line)
|
||||
|
||||
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
env.endpoints.create_start(branch_name="main", lsn=lsn)
|
||||
log.info(f"lsn: {lsn}")
|
||||
|
||||
cur.execute("VACUUM FULL pg_class;")
|
||||
|
||||
for ep in env.endpoints.endpoints:
|
||||
log.info(f"{ep.endpoint_id} / {ep.pg_port}")
|
||||
pg_dump_command = ["pg_dumpall", "-f", f"/tmp/dump-{ep.endpoint_id}.sql"]
|
||||
env_vars = {
|
||||
"PGPORT": str(ep.pg_port),
|
||||
"PGUSER": endpoint.default_options["user"],
|
||||
"PGHOST": endpoint.default_options["host"],
|
||||
}
|
||||
pg_bin.run_capture(pg_dump_command, env=env_vars)
|
||||
|
||||
Reference in New Issue
Block a user