mirror of
https://github.com/neondatabase/neon.git
synced 2026-04-26 02:40:38 +00:00
Compare commits
16 Commits
problame/f
...
arpad/lsn_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
96b3171799 | ||
|
|
2caa795139 | ||
|
|
52017eff68 | ||
|
|
090246cad7 | ||
|
|
d7715531da | ||
|
|
7a38352865 | ||
|
|
b06f36ebca | ||
|
|
06a9692956 | ||
|
|
804508ac17 | ||
|
|
3d2266c842 | ||
|
|
61783d6000 | ||
|
|
bae377ed81 | ||
|
|
3b4872eea7 | ||
|
|
57095b98c1 | ||
|
|
c205e96d89 | ||
|
|
ed923b6102 |
156
libs/utils/src/exp_counter.rs
Normal file
156
libs/utils/src/exp_counter.rs
Normal file
@@ -0,0 +1,156 @@
|
||||
/// An exhausting, injective counter with exponential search properties
|
||||
///
|
||||
/// The type implements an [`Iterator`] that yields numbers from the range
|
||||
/// from 0 to a pre-defined maximum.
|
||||
///
|
||||
/// * It is *exhausting* in that it visits all numbers inside `0..max`.
|
||||
/// * It is *injective* in that all numbers are visited only once.
|
||||
/// * It has *exponential search properties* in that it iterates over the
|
||||
/// number range in a fractal pattern.
|
||||
///
|
||||
/// This iterator is well suited for finding a pivot in algorithms that
|
||||
/// want centered pivots: its output is heavily biased towards starting
|
||||
/// with small numbers.
|
||||
///
|
||||
/// If the specified maximum is close to an exponent
|
||||
/// of two we will quickly reach that exponent and therefore we will
|
||||
/// quickly close in on the maximum, contrary to the claim of the counter
|
||||
/// providing centered pivots. For most algorithms, this is not a problem
|
||||
/// though as long as this is only a limited number of times, which it does,
|
||||
/// as the next maximum will be an exponent of two, for which the next
|
||||
/// smaller one is precisely its half of. So we will be able to perform a
|
||||
/// binary search in all instances.
|
||||
pub struct ExpCounter {
|
||||
/// The (exclusive) upper limit of our search
|
||||
max: u64,
|
||||
/// The base that increases after each round trip
|
||||
base: u64,
|
||||
/// An increasing offset, always a power of two
|
||||
offs: u64,
|
||||
/// Iteration counter
|
||||
i: u64,
|
||||
}
|
||||
|
||||
impl ExpCounter {
|
||||
/// Creates an iterator that uses `ExpCounter` for the first half of the
|
||||
/// range and a linear range counter for the second half.
|
||||
pub fn with_max_and_linear_search(max: u64) -> impl Iterator<Item = u64> {
|
||||
let linear_start = max / 2;
|
||||
ExpCounter::with_max(linear_start).chain(linear_start..max)
|
||||
}
|
||||
/// Creates a new `ExpCounter` instance that counts to the (exclusive) maximum
|
||||
pub fn with_max(max: u64) -> Self {
|
||||
Self {
|
||||
max,
|
||||
base: 0,
|
||||
offs: 1,
|
||||
i: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for ExpCounter {
|
||||
type Item = u64;
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.i >= self.max {
|
||||
return None;
|
||||
}
|
||||
if self.i == 0 {
|
||||
// Special casing this is easier than adding 0 as the first in some other fashion
|
||||
self.i += 1;
|
||||
return Some(0);
|
||||
}
|
||||
let to_yield = self.base + self.offs;
|
||||
self.offs *= 2;
|
||||
|
||||
if self.base + self.offs >= self.max {
|
||||
self.base += 1;
|
||||
self.offs = 1;
|
||||
while self.base > self.offs {
|
||||
self.offs *= 2;
|
||||
}
|
||||
if (self.base + self.offs).count_ones() == 1 {
|
||||
self.offs *= 2;
|
||||
}
|
||||
}
|
||||
self.i += 1;
|
||||
|
||||
Some(to_yield)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn dupes_and_missing(list: &mut [u64], max: u64) -> (usize, usize) {
|
||||
let mut contained_in_list = HashSet::<u64>::new();
|
||||
let mut dupes = 0;
|
||||
let mut missing = 0;
|
||||
|
||||
for v in list.iter() {
|
||||
println!("{v:4 } = {v:010b}");
|
||||
}
|
||||
println!("Yielded {} items", list.len());
|
||||
println!("The duplicates:");
|
||||
list.sort();
|
||||
let mut prev = None;
|
||||
for v in list.iter() {
|
||||
contained_in_list.insert(*v);
|
||||
if Some(*v) == prev {
|
||||
dupes += 1;
|
||||
println!("{v:3 } = {v:08b}");
|
||||
}
|
||||
prev = Some(*v);
|
||||
}
|
||||
println!("The missing numbers:");
|
||||
for v in 0..max {
|
||||
if !contained_in_list.contains(&v) {
|
||||
println!("{v:3 } = {v:010b}");
|
||||
missing += 1;
|
||||
}
|
||||
}
|
||||
(dupes, missing)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn to_40() {
|
||||
let max = 40;
|
||||
let list = ExpCounter::with_max(max).collect::<Vec<_>>();
|
||||
let expected = [
|
||||
0, 1, 2, 4, 8, 16, 32, 3, 5, 9, 17, 33, 6, 10, 18, 34, 7, 11, 19, 35, 12, 20, 36, 13,
|
||||
21, 37, 14, 22, 38, 15, 23, 39, 24, 25, 26, 27, 28, 29, 30, 31,
|
||||
];
|
||||
assert_eq!(list, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn to_64() {
|
||||
let max = 64;
|
||||
let mut list = ExpCounter::with_max(max).collect::<Vec<_>>();
|
||||
assert_eq!(dupes_and_missing(&mut list, max), (0, 0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn to_100() {
|
||||
let max = 100;
|
||||
let mut list = ExpCounter::with_max(max).collect::<Vec<_>>();
|
||||
assert_eq!(dupes_and_missing(&mut list, max), (0, 0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn to_127() {
|
||||
let max = 127;
|
||||
let mut list = ExpCounter::with_max(max).collect::<Vec<_>>();
|
||||
assert_eq!(dupes_and_missing(&mut list, max), (0, 0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn to_12345() {
|
||||
let max = 12345;
|
||||
let mut list = ExpCounter::with_max(max).collect::<Vec<_>>();
|
||||
assert_eq!(dupes_and_missing(&mut list, max), (0, 0));
|
||||
}
|
||||
}
|
||||
@@ -78,6 +78,7 @@ pub mod completion;
|
||||
/// Reporting utilities
|
||||
pub mod error;
|
||||
|
||||
pub mod exp_counter;
|
||||
/// async timeout helper
|
||||
pub mod timeout;
|
||||
|
||||
|
||||
@@ -604,7 +604,7 @@ async fn get_timestamp_of_lsn_handler(
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?;
|
||||
let result = timeline.get_max_timestamp_for_lsn(lsn, &ctx).await?;
|
||||
|
||||
match result {
|
||||
Some(time) => {
|
||||
|
||||
@@ -23,6 +23,7 @@ use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use tracing::{debug, trace, warn};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
use utils::exp_counter::ExpCounter;
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
|
||||
/// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
|
||||
@@ -382,24 +383,57 @@ impl Timeline {
|
||||
|
||||
let mut found_smaller = false;
|
||||
let mut found_larger = false;
|
||||
|
||||
// This is a search budget to not make the search too expensive
|
||||
let mut budget = 1_000_000u32;
|
||||
while low < high {
|
||||
// cannot overflow, high and low are both smaller than u64::MAX / 2
|
||||
let mid = (high + low) / 2;
|
||||
// this always holds: low <= mid_start < high
|
||||
let mid_start = (high + low) / 2;
|
||||
let mut mid = mid_start;
|
||||
|
||||
let cmp = self
|
||||
.is_latest_commit_timestamp_ge_than(
|
||||
search_timestamp,
|
||||
Lsn(mid * 8),
|
||||
&mut found_smaller,
|
||||
&mut found_larger,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let mut max = None;
|
||||
|
||||
if cmp {
|
||||
high = mid;
|
||||
// Search for an lsn that has a commit timestamp.
|
||||
// The search with `ExpCounter` will eventually try all LSNs
|
||||
// in the range between mid_start and high, which is important
|
||||
// when we set high to mid if we have exhausted all possibilities.
|
||||
// We don't do an explosive search as we want to prove that
|
||||
// every single lsn up to high is giving inconclusive results.
|
||||
// This can only be done by trying all lsns.
|
||||
for offs in ExpCounter::with_max_and_linear_search(high - mid_start) {
|
||||
mid = offs + mid_start;
|
||||
|
||||
// Do the query for mid + 1 instead of mid so that we can make definite statements
|
||||
// about low (low's invariant: always points to commit before or at search_timestamp).
|
||||
max = self
|
||||
.get_max_timestamp_for_lsn(Lsn((mid + 1) * 8), ctx)
|
||||
.await?;
|
||||
if max.is_some() {
|
||||
break;
|
||||
}
|
||||
// Do some limiting to make sure the query does not become too expensive.
|
||||
if let Some(new_budget) = budget.checked_sub(1) {
|
||||
budget = new_budget;
|
||||
} else {
|
||||
return Ok(LsnForTimestamp::NoData(min_lsn));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(max) = max {
|
||||
if max <= search_timestamp {
|
||||
// We found a commit with timestamp before (or at) `search_timestamp`.
|
||||
found_smaller = true;
|
||||
low = mid + 1;
|
||||
} else {
|
||||
// We found a commit with timestamp after `search_timestamp`.
|
||||
found_larger = true;
|
||||
high = mid;
|
||||
}
|
||||
} else {
|
||||
low = mid + 1;
|
||||
// max is None so we have proof of a chain of None's from mid_start all up to high.
|
||||
// Thus we can safely set high to mid_start
|
||||
high = mid_start;
|
||||
}
|
||||
}
|
||||
// If `found_smaller == true`, `low` is the LSN of the last commit record
|
||||
@@ -409,7 +443,7 @@ impl Timeline {
|
||||
// Otherwise, if you restore to the returned LSN, the database will
|
||||
// include physical changes from later commits that will be marked
|
||||
// as aborted, and will need to be vacuumed away.
|
||||
let commit_lsn = Lsn((low - 1) * 8);
|
||||
let commit_lsn = Lsn(low * 8);
|
||||
match (found_smaller, found_larger) {
|
||||
(false, false) => {
|
||||
// This can happen if no commit records have been processed yet, e.g.
|
||||
@@ -431,36 +465,10 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
|
||||
/// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
|
||||
/// Obtain the timestamp for the given lsn.
|
||||
///
|
||||
/// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
|
||||
/// with a smaller/larger timestamp.
|
||||
///
|
||||
pub async fn is_latest_commit_timestamp_ge_than(
|
||||
&self,
|
||||
search_timestamp: TimestampTz,
|
||||
probe_lsn: Lsn,
|
||||
found_smaller: &mut bool,
|
||||
found_larger: &mut bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, PageReconstructError> {
|
||||
self.map_all_timestamps(probe_lsn, ctx, |timestamp| {
|
||||
if timestamp >= search_timestamp {
|
||||
*found_larger = true;
|
||||
return ControlFlow::Break(true);
|
||||
} else {
|
||||
*found_smaller = true;
|
||||
}
|
||||
ControlFlow::Continue(())
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Obtain the possible timestamp range for the given lsn.
|
||||
///
|
||||
/// If the lsn has no timestamps, returns None. returns `(min, max, median)` if it has timestamps.
|
||||
pub async fn get_timestamp_for_lsn(
|
||||
/// If the lsn has no timestamps, returns None. Returns the maximum such timestamp otherwise.
|
||||
pub(crate) async fn get_max_timestamp_for_lsn(
|
||||
&self,
|
||||
probe_lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
|
||||
@@ -101,7 +101,6 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
start_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_lsn()"))
|
||||
|
||||
# Create table, and insert rows, each in a separate transaction
|
||||
# Disable synchronous_commit to make this initialization go faster.
|
||||
# Disable `synchronous_commit` to make this initialization go faster.
|
||||
# XXX: on my laptop this test takes 7s, and setting `synchronous_commit=off`
|
||||
# doesn't change anything.
|
||||
@@ -132,7 +131,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
|
||||
result = client.timeline_get_lsn_by_timestamp(
|
||||
tenant_id, timeline_id, f"{probe_timestamp.isoformat()}Z", 2
|
||||
)
|
||||
assert result["kind"] == "future"
|
||||
assert result["kind"] in ["present", "future"]
|
||||
# make sure that we return a well advanced lsn here
|
||||
assert Lsn(result["lsn"]) > start_lsn
|
||||
|
||||
|
||||
Reference in New Issue
Block a user