Compare commits

..

1 Commits

Author SHA1 Message Date
Anastasia Lubennikova
e0013251df TEST ONLY Use branch fix_path_for_extension_request for vendor/postgres-v16 2024-09-19 10:15:47 +01:00
44 changed files with 299 additions and 596 deletions

View File

@@ -173,6 +173,40 @@ impl Default for EvictionOrder {
}
}
#[derive(
Eq,
PartialEq,
Debug,
Copy,
Clone,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
)]
#[strum(serialize_all = "kebab-case")]
pub enum GetVectoredImpl {
Sequential,
Vectored,
}
#[derive(
Eq,
PartialEq,
Debug,
Copy,
Clone,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
)]
#[strum(serialize_all = "kebab-case")]
pub enum GetImpl {
Legacy,
Vectored,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct MaxVectoredReadBytes(pub NonZeroUsize);
@@ -304,6 +338,8 @@ pub mod defaults {
pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm =
ImageCompressionAlgorithm::Zstd { level: Some(1) };
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = false;
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
@@ -340,10 +376,7 @@ impl Default for ConfigToml {
concurrent_tenant_warmup: (NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
.expect("Invalid default constant")),
concurrent_tenant_size_logical_size_queries: NonZeroUsize::new(
DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES,
)
.unwrap(),
concurrent_tenant_size_logical_size_queries: NonZeroUsize::new(1).unwrap(),
metric_collection_interval: (humantime::parse_duration(
DEFAULT_METRIC_COLLECTION_INTERVAL,
)
@@ -434,6 +467,8 @@ pub mod tenant_conf_defaults {
// By default ingest enough WAL for two new L0 layers before checking if new image
// image layers should be created.
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
}
impl Default for TenantConfigToml {

View File

@@ -26,7 +26,6 @@ use bytes::{Buf, Bytes};
use log::*;
use serde::Serialize;
use std::ffi::OsStr;
use std::fs::File;
use std::io::prelude::*;
use std::io::ErrorKind;
@@ -79,34 +78,19 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize
)
}
pub fn XLogFromFileName(
fname: &OsStr,
wal_seg_size: usize,
) -> anyhow::Result<(XLogSegNo, TimeLineID)> {
if let Some(fname_str) = fname.to_str() {
let tli = u32::from_str_radix(&fname_str[0..8], 16)?;
let log = u32::from_str_radix(&fname_str[8..16], 16)? as XLogSegNo;
let seg = u32::from_str_radix(&fname_str[16..24], 16)? as XLogSegNo;
Ok((log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli))
} else {
anyhow::bail!("non-ut8 filename: {:?}", fname);
}
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
}
pub fn IsXLogFileName(fname: &OsStr) -> bool {
if let Some(fname) = fname.to_str() {
fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit())
} else {
false
}
pub fn IsXLogFileName(fname: &str) -> bool {
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
}
pub fn IsPartialXLogFileName(fname: &OsStr) -> bool {
if let Some(fname) = fname.to_str() {
fname.ends_with(".partial") && IsXLogFileName(OsStr::new(&fname[0..fname.len() - 8]))
} else {
false
}
pub fn IsPartialXLogFileName(fname: &str) -> bool {
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
}
/// If LSN points to the beginning of the page, then shift it to first record,

View File

@@ -7,7 +7,6 @@ use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
use postgres_ffi::{
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
@@ -136,8 +135,8 @@ impl Conf {
pub fn pg_waldump(
&self,
first_segment_name: &OsStr,
last_segment_name: &OsStr,
first_segment_name: &str,
last_segment_name: &str,
) -> anyhow::Result<std::process::Output> {
let first_segment_file = self.datadir.join(first_segment_name);
let last_segment_file = self.datadir.join(last_segment_name);

View File

@@ -4,7 +4,6 @@ use super::*;
use crate::{error, info};
use regex::Regex;
use std::cmp::min;
use std::ffi::OsStr;
use std::fs::{self, File};
use std::io::Write;
use std::{env, str::FromStr};
@@ -55,7 +54,7 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
.wal_dir()
.read_dir()
.unwrap()
.map(|f| f.unwrap().file_name())
.map(|f| f.unwrap().file_name().into_string().unwrap())
.filter(|fname| IsXLogFileName(fname))
.max()
.unwrap();
@@ -71,11 +70,11 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
start_lsn
);
for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
let fname = file.file_name();
let fname = file.file_name().into_string().unwrap();
if !IsXLogFileName(&fname) {
continue;
}
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE).unwrap();
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE);
let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
if seg_start_lsn > u64::from(*start_lsn) {
continue;
@@ -94,10 +93,10 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
}
}
fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &OsStr) -> Lsn {
fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &str) -> Lsn {
// Get the actual end of WAL by pg_waldump
let waldump_output = cfg
.pg_waldump(OsStr::new("000000010000000000000001"), last_segment)
.pg_waldump("000000010000000000000001", last_segment)
.unwrap()
.stderr;
let waldump_output = std::str::from_utf8(&waldump_output).unwrap();
@@ -118,7 +117,7 @@ fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &OsStr) -> Lsn {
fn check_end_of_wal(
cfg: &crate::Conf,
last_segment: &OsStr,
last_segment: &str,
start_lsn: Lsn,
expected_end_of_wal: Lsn,
) {
@@ -133,8 +132,7 @@ fn check_end_of_wal(
// Rename file to partial to actually find last valid lsn, then rename it back.
fs::rename(
cfg.wal_dir().join(last_segment),
cfg.wal_dir()
.join(format!("{}.partial", last_segment.to_str().unwrap())),
cfg.wal_dir().join(format!("{}.partial", last_segment)),
)
.unwrap();
let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
@@ -144,8 +142,7 @@ fn check_end_of_wal(
);
assert_eq!(wal_end, expected_end_of_wal);
fs::rename(
cfg.wal_dir()
.join(format!("{}.partial", last_segment.to_str().unwrap())),
cfg.wal_dir().join(format!("{}.partial", last_segment)),
cfg.wal_dir().join(last_segment),
)
.unwrap();

View File

@@ -479,6 +479,11 @@ pub struct ConfigurableSemaphore {
}
impl ConfigurableSemaphore {
pub const DEFAULT_INITIAL: NonZeroUsize = match NonZeroUsize::new(1) {
Some(x) => x,
None => panic!("const unwrap is not yet stable"),
};
/// Initializse using a non-zero amount of permits.
///
/// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a
@@ -499,6 +504,12 @@ impl ConfigurableSemaphore {
}
}
impl Default for ConfigurableSemaphore {
fn default() -> Self {
Self::new(Self::DEFAULT_INITIAL)
}
}
impl PartialEq for ConfigurableSemaphore {
fn eq(&self, other: &Self) -> bool {
// the number of permits can be increased at runtime, so we cannot really fulfill the

View File

@@ -140,7 +140,6 @@ pub mod metadata;
pub mod remote_timeline_client;
pub mod storage_layer;
pub mod checks;
pub mod config;
pub mod mgr;
pub mod secondary;
@@ -1574,9 +1573,6 @@ impl Tenant {
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
end_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
use checks::check_valid_layermap;
use itertools::Itertools;
let tline = self
.create_test_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
.await?;
@@ -1591,18 +1587,6 @@ impl Tenant {
.force_create_image_layer(lsn, images, Some(initdb_lsn), ctx)
.await?;
}
let layer_names = tline
.layers
.read()
.await
.layer_map()
.unwrap()
.iter_historic_layers()
.map(|layer| layer.layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
bail!("invalid layermap: {err}");
}
Ok(tline)
}
@@ -3213,9 +3197,6 @@ impl Tenant {
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
end_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
use checks::check_valid_layermap;
use itertools::Itertools;
let tline = self
.branch_timeline_test(src_timeline, dst_id, ancestor_lsn, ctx)
.await?;
@@ -3236,18 +3217,6 @@ impl Tenant {
.force_create_image_layer(lsn, images, Some(ancestor_lsn), ctx)
.await?;
}
let layer_names = tline
.layers
.read()
.await
.layer_map()
.unwrap()
.iter_historic_layers()
.map(|layer| layer.layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
bail!("invalid layermap: {err}");
}
Ok(tline)
}
@@ -4195,18 +4164,9 @@ pub(crate) mod harness {
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
if records_neon {
// For Neon wal records, we can decode without spawning postgres, so do so.
let mut page = match (base_img, records.first()) {
(Some((_lsn, img)), _) => {
let mut page = BytesMut::new();
page.extend_from_slice(&img);
page
}
(_, Some((_lsn, rec))) if rec.will_init() => BytesMut::new(),
_ => {
panic!("Neon WAL redo requires base image or will init record");
}
};
let base_img = base_img.expect("Neon WAL redo requires base image").1;
let mut page = BytesMut::new();
page.extend_from_slice(&base_img);
for (record_lsn, record) in records {
apply_neon::apply_in_neon(&record, record_lsn, key, &mut page)?;
}
@@ -8510,135 +8470,4 @@ mod tests {
Ok(())
}
// Regression test for https://github.com/neondatabase/neon/issues/9012
// Create an image arrangement where we have to read at different LSN ranges
// from a delta layer. This is achieved by overlapping an image layer on top of
// a delta layer. Like so:
//
// A B
// +----------------+ -> delta_layer
// | | ^ lsn
// | =========|-> nested_image_layer |
// | C | |
// +----------------+ |
// ======== -> baseline_image_layer +-------> key
//
//
// When querying the key range [A, B) we need to read at different LSN ranges
// for [A, C) and [C, B). This test checks that the described edge case is handled correctly.
#[tokio::test]
async fn test_vectored_read_with_nested_image_layer() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_read_with_nested_image_layer").await?;
let (tenant, ctx) = harness.load().await;
let will_init_keys = [2, 6];
fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("110000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
let mut expected_key_values = HashMap::new();
let baseline_image_layer_lsn = Lsn(0x10);
let mut baseline_img_layer = Vec::new();
for i in 0..5 {
let key = get_key(i);
let value = format!("value {i}@{baseline_image_layer_lsn}");
let removed = expected_key_values.insert(key, value.clone());
assert!(removed.is_none());
baseline_img_layer.push((key, Bytes::from(value)));
}
let nested_image_layer_lsn = Lsn(0x50);
let mut nested_img_layer = Vec::new();
for i in 5..10 {
let key = get_key(i);
let value = format!("value {i}@{nested_image_layer_lsn}");
let removed = expected_key_values.insert(key, value.clone());
assert!(removed.is_none());
nested_img_layer.push((key, Bytes::from(value)));
}
let mut delta_layer_spec = Vec::default();
let delta_layer_start_lsn = Lsn(0x20);
let mut delta_layer_end_lsn = delta_layer_start_lsn;
for i in 0..10 {
let key = get_key(i);
let key_in_nested = nested_img_layer
.iter()
.any(|(key_with_img, _)| *key_with_img == key);
let lsn = {
if key_in_nested {
Lsn(nested_image_layer_lsn.0 + 0x10)
} else {
delta_layer_start_lsn
}
};
let will_init = will_init_keys.contains(&i);
if will_init {
delta_layer_spec.push((key, lsn, Value::WalRecord(NeonWalRecord::wal_init())));
expected_key_values.insert(key, "".to_string());
} else {
let delta = format!("@{lsn}");
delta_layer_spec.push((
key,
lsn,
Value::WalRecord(NeonWalRecord::wal_append(&delta)),
));
expected_key_values
.get_mut(&key)
.expect("An image exists for each key")
.push_str(delta.as_str());
}
delta_layer_end_lsn = std::cmp::max(delta_layer_start_lsn, lsn);
}
delta_layer_end_lsn = Lsn(delta_layer_end_lsn.0 + 1);
assert!(
nested_image_layer_lsn > delta_layer_start_lsn
&& nested_image_layer_lsn < delta_layer_end_lsn
);
let tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
baseline_image_layer_lsn,
DEFAULT_PG_VERSION,
&ctx,
vec![DeltaLayerTestDesc::new_with_inferred_key_range(
delta_layer_start_lsn..delta_layer_end_lsn,
delta_layer_spec,
)], // delta layers
vec![
(baseline_image_layer_lsn, baseline_img_layer),
(nested_image_layer_lsn, nested_img_layer),
], // image layers
delta_layer_end_lsn,
)
.await?;
let keyspace = KeySpace::single(get_key(0)..get_key(10));
let results = tline
.get_vectored(keyspace, delta_layer_end_lsn, &ctx)
.await
.expect("No vectored errors");
for (key, res) in results {
let value = res.expect("No key errors");
let expected_value = expected_key_values.remove(&key).expect("No unknown keys");
assert_eq!(value, Bytes::from(expected_value));
}
Ok(())
}
}

View File

@@ -1,55 +0,0 @@
use std::collections::BTreeSet;
use itertools::Itertools;
use super::storage_layer::LayerName;
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
///
/// ```plain
/// | | | |
/// | 1 | | 2 | | 3 |
/// | | | | | |
/// ```
///
/// This is not a valid layer map because the LSN range of layer 1 intersects with the LSN range of layer 2. 1 and 2 should have
/// the same LSN range.
///
/// The exception is that when layer 2 only contains a single key, it could be split over the LSN range. For example,
///
/// ```plain
/// | | | 2 | | |
/// | 1 | |-------| | 3 |
/// | | | 4 | | |
///
/// If layer 2 and 4 contain the same single key, this is also a valid layer map.
pub fn check_valid_layermap(metadata: &[LayerName]) -> Option<String> {
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
let mut all_delta_layers = Vec::new();
for name in metadata {
if let LayerName::Delta(layer) = name {
if layer.key_range.start.next() != layer.key_range.end {
all_delta_layers.push(layer.clone());
}
}
}
for layer in &all_delta_layers {
let lsn_range = &layer.lsn_range;
lsn_split_point.insert(lsn_range.start);
lsn_split_point.insert(lsn_range.end);
}
for layer in &all_delta_layers {
let lsn_range = layer.lsn_range.clone();
let intersects = lsn_split_point.range(lsn_range).collect_vec();
if intersects.len() > 1 {
let err = format!(
"layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
layer,
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
);
return Some(err);
}
}
None
}

View File

@@ -1,29 +1,11 @@
use std::{collections::HashMap, time::Duration};
use std::collections::HashMap;
use super::remote_timeline_client::index::GcBlockingReason;
use tokio::time::Instant;
use utils::id::TimelineId;
type TimelinesBlocked = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
use super::remote_timeline_client::index::GcBlockingReason;
#[derive(Default)]
struct Storage {
timelines_blocked: TimelinesBlocked,
/// The deadline before which we are blocked from GC so that
/// leases have a chance to be renewed.
lsn_lease_deadline: Option<Instant>,
}
type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
impl Storage {
fn is_blocked_by_lsn_lease_deadline(&self) -> bool {
self.lsn_lease_deadline
.map(|d| Instant::now() < d)
.unwrap_or(false)
}
}
/// GcBlock provides persistent (per-timeline) gc blocking and facilitates transient time based gc
/// blocking.
#[derive(Default)]
pub(crate) struct GcBlock {
/// The timelines which have current reasons to block gc.
@@ -31,12 +13,6 @@ pub(crate) struct GcBlock {
/// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done
/// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`.
reasons: std::sync::Mutex<Storage>,
/// GC background task or manually run `Tenant::gc_iteration` holds a lock on this.
///
/// Do not add any more features taking and forbidding taking this lock. It should be
/// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
/// synchronizes with gc attempts by locking and unlocking this mutex.
blocking: tokio::sync::Mutex<()>,
}
@@ -66,20 +42,6 @@ impl GcBlock {
}
}
/// Sets a deadline before which we cannot proceed to GC due to lsn lease.
///
/// We do this as the leases mapping are not persisted to disk. By delaying GC by lease
/// length, we guarantee that all the leases we granted before will have a chance to renew
/// when we run GC for the first time after restart / transition from AttachedMulti to AttachedSingle.
pub(super) fn set_lsn_lease_deadline(&self, lsn_lease_length: Duration) {
let deadline = Instant::now() + lsn_lease_length;
let mut g = self.reasons.lock().unwrap();
g.lsn_lease_deadline = Some(deadline);
}
/// Describe the current gc blocking reasons.
///
/// TODO: make this json serializable.
pub(crate) fn summary(&self) -> Option<BlockingReasons> {
let g = self.reasons.lock().unwrap();
@@ -102,7 +64,7 @@ impl GcBlock {
) -> anyhow::Result<bool> {
let (added, uploaded) = {
let mut g = self.reasons.lock().unwrap();
let set = g.timelines_blocked.entry(timeline.timeline_id).or_default();
let set = g.entry(timeline.timeline_id).or_default();
let added = set.insert(reason);
// LOCK ORDER: intentionally hold the lock, see self.reasons.
@@ -133,7 +95,7 @@ impl GcBlock {
let (remaining_blocks, uploaded) = {
let mut g = self.reasons.lock().unwrap();
match g.timelines_blocked.entry(timeline.timeline_id) {
match g.entry(timeline.timeline_id) {
Entry::Occupied(mut oe) => {
let set = oe.get_mut();
set.remove(reason);
@@ -147,7 +109,7 @@ impl GcBlock {
}
}
let remaining_blocks = g.timelines_blocked.len();
let remaining_blocks = g.len();
// LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
let uploaded = timeline
@@ -172,11 +134,11 @@ impl GcBlock {
pub(crate) fn before_delete(&self, timeline: &super::Timeline) {
let unblocked = {
let mut g = self.reasons.lock().unwrap();
if g.timelines_blocked.is_empty() {
if g.is_empty() {
return;
}
g.timelines_blocked.remove(&timeline.timeline_id);
g.remove(&timeline.timeline_id);
BlockingReasons::clean_and_summarize(g).is_none()
};
@@ -187,11 +149,10 @@ impl GcBlock {
}
/// Initialize with the non-deleted timelines of this tenant.
pub(crate) fn set_scanned(&self, scanned: TimelinesBlocked) {
pub(crate) fn set_scanned(&self, scanned: Storage) {
let mut g = self.reasons.lock().unwrap();
assert!(g.timelines_blocked.is_empty());
g.timelines_blocked
.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
assert!(g.is_empty());
g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
tracing::info!(summary=?reasons, "initialized with gc blocked");
@@ -205,7 +166,6 @@ pub(super) struct Guard<'a> {
#[derive(Debug)]
pub(crate) struct BlockingReasons {
tenant_blocked_by_lsn_lease_deadline: bool,
timelines: usize,
reasons: enumset::EnumSet<GcBlockingReason>,
}
@@ -214,8 +174,8 @@ impl std::fmt::Display for BlockingReasons {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"tenant_blocked_by_lsn_lease_deadline: {}, {} timelines block for {:?}",
self.tenant_blocked_by_lsn_lease_deadline, self.timelines, self.reasons
"{} timelines block for {:?}",
self.timelines, self.reasons
)
}
}
@@ -223,15 +183,13 @@ impl std::fmt::Display for BlockingReasons {
impl BlockingReasons {
fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
let mut reasons = enumset::EnumSet::empty();
g.timelines_blocked.retain(|_key, value| {
g.retain(|_key, value| {
reasons = reasons.union(*value);
!value.is_empty()
});
let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline();
if !g.timelines_blocked.is_empty() || blocked_by_lsn_lease_deadline {
if !g.is_empty() {
Some(BlockingReasons {
tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline,
timelines: g.timelines_blocked.len(),
timelines: g.len(),
reasons,
})
} else {
@@ -240,17 +198,14 @@ impl BlockingReasons {
}
fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline();
if g.timelines_blocked.is_empty() && !blocked_by_lsn_lease_deadline {
if g.is_empty() {
None
} else {
let reasons = g
.timelines_blocked
.values()
.fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
Some(BlockingReasons {
tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline,
timelines: g.timelines_blocked.len(),
timelines: g.len(),
reasons,
})
}

View File

@@ -949,12 +949,6 @@ impl TenantManager {
(LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
match attach_conf.generation.cmp(&tenant.generation) {
Ordering::Equal => {
if attach_conf.attach_mode == AttachmentMode::Single {
tenant
.gc_block
.set_lsn_lease_deadline(tenant.get_lsn_lease_length());
}
// A transition from Attached to Attached in the same generation, we may
// take our fast path and just provide the updated configuration
// to the tenant.

View File

@@ -276,16 +276,6 @@ pub(crate) enum LayerId {
InMemoryLayerId(InMemoryLayerFileId),
}
/// Uniquely identify a layer visit by the layer
/// and LSN floor (or start LSN) of the reads.
/// The layer itself is not enough since we may
/// have different LSN lower bounds for delta layer reads.
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
struct LayerToVisitId {
layer_id: LayerId,
lsn_floor: Lsn,
}
/// Layer wrapper for the read path. Note that it is valid
/// to use these layers even after external operations have
/// been performed on them (compaction, freeze, etc.).
@@ -297,9 +287,9 @@ pub(crate) enum ReadableLayer {
/// A partial description of a read to be done.
#[derive(Debug, Clone)]
struct LayerVisit {
struct ReadDesc {
/// An id used to resolve the readable layer within the fringe
layer_to_visit_id: LayerToVisitId,
layer_id: LayerId,
/// Lsn range for the read, used for selecting the next read
lsn_range: Range<Lsn>,
}
@@ -313,12 +303,12 @@ struct LayerVisit {
/// a two layer indexing scheme.
#[derive(Debug)]
pub(crate) struct LayerFringe {
planned_visits_by_lsn: BinaryHeap<LayerVisit>,
visit_reads: HashMap<LayerToVisitId, LayerVisitReads>,
planned_reads_by_lsn: BinaryHeap<ReadDesc>,
layers: HashMap<LayerId, LayerKeyspace>,
}
#[derive(Debug)]
struct LayerVisitReads {
struct LayerKeyspace {
layer: ReadableLayer,
target_keyspace: KeySpaceRandomAccum,
}
@@ -326,23 +316,23 @@ struct LayerVisitReads {
impl LayerFringe {
pub(crate) fn new() -> Self {
LayerFringe {
planned_visits_by_lsn: BinaryHeap::new(),
visit_reads: HashMap::new(),
planned_reads_by_lsn: BinaryHeap::new(),
layers: HashMap::new(),
}
}
pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
let read_desc = match self.planned_visits_by_lsn.pop() {
let read_desc = match self.planned_reads_by_lsn.pop() {
Some(desc) => desc,
None => return None,
};
let removed = self.visit_reads.remove_entry(&read_desc.layer_to_visit_id);
let removed = self.layers.remove_entry(&read_desc.layer_id);
match removed {
Some((
_,
LayerVisitReads {
LayerKeyspace {
layer,
mut target_keyspace,
},
@@ -361,24 +351,20 @@ impl LayerFringe {
keyspace: KeySpace,
lsn_range: Range<Lsn>,
) {
let layer_to_visit_id = LayerToVisitId {
layer_id: layer.id(),
lsn_floor: lsn_range.start,
};
let entry = self.visit_reads.entry(layer_to_visit_id.clone());
let layer_id = layer.id();
let entry = self.layers.entry(layer_id.clone());
match entry {
Entry::Occupied(mut entry) => {
entry.get_mut().target_keyspace.add_keyspace(keyspace);
}
Entry::Vacant(entry) => {
self.planned_visits_by_lsn.push(LayerVisit {
self.planned_reads_by_lsn.push(ReadDesc {
lsn_range,
layer_to_visit_id: layer_to_visit_id.clone(),
layer_id: layer_id.clone(),
});
let mut accum = KeySpaceRandomAccum::new();
accum.add_keyspace(keyspace);
entry.insert(LayerVisitReads {
entry.insert(LayerKeyspace {
layer,
target_keyspace: accum,
});
@@ -393,7 +379,7 @@ impl Default for LayerFringe {
}
}
impl Ord for LayerVisit {
impl Ord for ReadDesc {
fn cmp(&self, other: &Self) -> Ordering {
let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
if ord == std::cmp::Ordering::Equal {
@@ -404,19 +390,19 @@ impl Ord for LayerVisit {
}
}
impl PartialOrd for LayerVisit {
impl PartialOrd for ReadDesc {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for LayerVisit {
impl PartialEq for ReadDesc {
fn eq(&self, other: &Self) -> bool {
self.lsn_range == other.lsn_range
}
}
impl Eq for LayerVisit {}
impl Eq for ReadDesc {}
impl ReadableLayer {
pub(crate) fn id(&self) -> LayerId {

View File

@@ -346,7 +346,6 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let mut first = true;
tenant.gc_block.set_lsn_lease_deadline(tenant.get_lsn_lease_length());
loop {
tokio::select! {
_ = cancel.cancelled() => {
@@ -364,6 +363,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
first = false;
let delays = async {
delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel).await?;
random_init_delay(period, &cancel).await?;
Ok::<_, Cancelled>(())
};
@@ -538,12 +538,28 @@ pub(crate) async fn random_init_delay(
let mut rng = rand::thread_rng();
rng.gen_range(Duration::ZERO..=period)
};
match tokio::time::timeout(d, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
}
}
/// Delays GC by defaul lease length at restart.
///
/// We do this as the leases mapping are not persisted to disk. By delaying GC by default
/// length, we gurantees that all the leases we granted before the restart will expire
/// when we run GC for the first time after the restart.
pub(crate) async fn delay_by_lease_length(
length: Duration,
cancel: &CancellationToken,
) -> Result<(), Cancelled> {
match tokio::time::timeout(length, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
}
}
struct Iteration {
started_at: Instant,
period: Duration,

View File

@@ -4015,7 +4015,6 @@ impl Timeline {
// partition, so flush it to disk.
let (desc, path) = image_layer_writer.finish(ctx).await?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("created image layer for rel {}", image_layer.local_path());
Ok(ImageLayerCreationOutcome {
image: Some(image_layer),
next_start_key: img_range.end,
@@ -4105,10 +4104,6 @@ impl Timeline {
// partition, so flush it to disk.
let (desc, path) = image_layer_writer.finish(ctx).await?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!(
"created image layer for metadata {}",
image_layer.local_path()
);
Ok(ImageLayerCreationOutcome {
image: Some(image_layer),
next_start_key: img_range.end,
@@ -5378,8 +5373,7 @@ impl Timeline {
/// Force create an image layer and place it into the layer map.
///
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are
/// placed into the layer map in one run AND be validated.
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are placed into the layer map in one run.
#[cfg(test)]
pub(super) async fn force_create_image_layer(
self: &Arc<Timeline>,
@@ -5413,7 +5407,7 @@ impl Timeline {
}
let (desc, path) = image_layer_writer.finish(ctx).await?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created image layer {}", image_layer.local_path());
{
let mut guard = self.layers.write().await;
guard.open_mut().unwrap().force_insert_layer(image_layer);
@@ -5425,8 +5419,7 @@ impl Timeline {
/// Force create a delta layer and place it into the layer map.
///
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are
/// placed into the layer map in one run AND be validated.
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are placed into the layer map in one run.
#[cfg(test)]
pub(super) async fn force_create_delta_layer(
self: &Arc<Timeline>,
@@ -5452,6 +5445,33 @@ impl Timeline {
if let Some(check_start_lsn) = check_start_lsn {
assert!(deltas.lsn_range.start >= check_start_lsn);
}
// check if the delta layer does not violate the LSN invariant, the legacy compaction should always produce a batch of
// layers of the same start/end LSN, and so should the force inserted layer
{
/// Checks if a overlaps with b, assume a/b = [start, end).
pub fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
!(a.end <= b.start || b.end <= a.start)
}
if deltas.key_range.start.next() != deltas.key_range.end {
let guard = self.layers.read().await;
let mut invalid_layers =
guard.layer_map()?.iter_historic_layers().filter(|layer| {
layer.is_delta()
&& overlaps_with(&layer.lsn_range, &deltas.lsn_range)
&& layer.lsn_range != deltas.lsn_range
// skip single-key layer files
&& layer.key_range.start.next() != layer.key_range.end
});
if let Some(layer) = invalid_layers.next() {
// If a delta layer overlaps with another delta layer AND their LSN range is not the same, panic
panic!(
"inserted layer violates delta layer LSN invariant: current_lsn_range={}..{}, conflict_lsn_range={}..{}",
deltas.lsn_range.start, deltas.lsn_range.end, layer.lsn_range.start, layer.lsn_range.end
);
}
}
}
let mut delta_layer_writer = DeltaLayerWriter::new(
self.conf,
self.timeline_id,
@@ -5466,7 +5486,7 @@ impl Timeline {
}
let (desc, path) = delta_layer_writer.finish(deltas.key_range.end, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created delta layer {}", delta_layer.local_path());
{
let mut guard = self.layers.write().await;
guard.open_mut().unwrap().force_insert_layer(delta_layer);

View File

@@ -29,7 +29,6 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::split_writer::{
@@ -1789,12 +1788,20 @@ impl Timeline {
stat.visit_image_layer(desc.file_size());
}
}
let layer_names: Vec<crate::tenant::storage_layer::LayerName> = layer_selection
.iter()
.map(|layer| layer.layer_desc().layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
bail!("cannot run gc-compaction because {}", err);
for layer in &layer_selection {
let desc = layer.layer_desc();
let key_range = &desc.key_range;
if desc.is_delta() && key_range.start.next() != key_range.end {
let lsn_range = desc.lsn_range.clone();
let intersects = lsn_split_point.range(lsn_range).collect_vec();
if intersects.len() > 1 {
bail!(
"cannot run gc-compaction because it violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
desc.key(),
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
);
}
}
}
// The maximum LSN we are processing in this compaction loop
let end_lsn = layer_selection

View File

@@ -163,7 +163,6 @@ impl ComputeUserInfo {
}
pub(crate) enum ComputeCredentialKeys {
#[cfg(any(test, feature = "testing"))]
Password(Vec<u8>),
AuthKeys(AuthKeys),
None,
@@ -294,10 +293,16 @@ async fn auth_quirks(
// We now expect to see a very specific payload in the place of password.
let (info, unauthenticated_password) = match user_info.try_into() {
Err(info) => {
let (info, password) =
hacks::password_hack_no_authentication(ctx, info, client).await?;
ctx.set_endpoint_id(info.endpoint.clone());
(info, Some(password))
let res = hacks::password_hack_no_authentication(ctx, info, client).await?;
ctx.set_endpoint_id(res.info.endpoint.clone());
let password = match res.keys {
ComputeCredentialKeys::Password(p) => p,
ComputeCredentialKeys::AuthKeys(_) | ComputeCredentialKeys::None => {
unreachable!("password hack should return a password")
}
};
(res.info, Some(password))
}
Ok(info) => (info, None),
};

View File

@@ -1,4 +1,6 @@
use super::{ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint};
use super::{
ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint,
};
use crate::{
auth::{self, AuthFlow},
config::AuthenticationConfig,
@@ -61,7 +63,7 @@ pub(crate) async fn password_hack_no_authentication(
ctx: &RequestMonitoring,
info: ComputeUserInfoNoEndpoint,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
) -> auth::Result<(ComputeUserInfo, Vec<u8>)> {
) -> auth::Result<ComputeCredentials> {
warn!("project not specified, resorting to the password hack auth flow");
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
@@ -77,12 +79,12 @@ pub(crate) async fn password_hack_no_authentication(
info!(project = &*payload.endpoint, "received missing parameter");
// Report tentative success; compute node will check the password anyway.
Ok((
ComputeUserInfo {
Ok(ComputeCredentials {
info: ComputeUserInfo {
user: info.user,
options: info.options,
endpoint: payload.endpoint,
},
payload.password,
))
keys: ComputeCredentialKeys::Password(payload.password),
})
}

View File

@@ -25,8 +25,6 @@ const MAX_JWK_BODY_SIZE: usize = 64 * 1024;
pub(crate) trait FetchAuthRules: Clone + Send + Sync + 'static {
fn fetch_auth_rules(
&self,
ctx: &RequestMonitoring,
endpoint: EndpointId,
role_name: RoleName,
) -> impl Future<Output = anyhow::Result<Vec<AuthRule>>> + Send;
}
@@ -103,9 +101,7 @@ impl JwkCacheEntryLock {
async fn renew_jwks<F: FetchAuthRules>(
&self,
_permit: JwkRenewalPermit<'_>,
ctx: &RequestMonitoring,
client: &reqwest::Client,
endpoint: EndpointId,
role_name: RoleName,
auth_rules: &F,
) -> anyhow::Result<Arc<JwkCacheEntry>> {
@@ -119,9 +115,7 @@ impl JwkCacheEntryLock {
}
}
let rules = auth_rules
.fetch_auth_rules(ctx, endpoint, role_name)
.await?;
let rules = auth_rules.fetch_auth_rules(role_name).await?;
let mut key_sets =
ahash::HashMap::with_capacity_and_hasher(rules.len(), ahash::RandomState::new());
// TODO(conrad): run concurrently
@@ -172,7 +166,6 @@ impl JwkCacheEntryLock {
self: &Arc<Self>,
ctx: &RequestMonitoring,
client: &reqwest::Client,
endpoint: EndpointId,
role_name: RoleName,
fetch: &F,
) -> Result<Arc<JwkCacheEntry>, anyhow::Error> {
@@ -183,9 +176,7 @@ impl JwkCacheEntryLock {
let Some(cached) = guard else {
let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let permit = self.acquire_permit().await;
return self
.renew_jwks(permit, ctx, client, endpoint, role_name, fetch)
.await;
return self.renew_jwks(permit, client, role_name, fetch).await;
};
let last_update = now.duration_since(cached.last_retrieved);
@@ -196,9 +187,7 @@ impl JwkCacheEntryLock {
let permit = self.acquire_permit().await;
// it's been too long since we checked the keys. wait for them to update.
return self
.renew_jwks(permit, ctx, client, endpoint, role_name, fetch)
.await;
return self.renew_jwks(permit, client, role_name, fetch).await;
}
// every 5 minutes we should spawn a job to eagerly update the token.
@@ -209,12 +198,8 @@ impl JwkCacheEntryLock {
let entry = self.clone();
let client = client.clone();
let fetch = fetch.clone();
let ctx = ctx.clone();
tokio::spawn(async move {
if let Err(e) = entry
.renew_jwks(permit, &ctx, &client, endpoint, role_name, &fetch)
.await
{
if let Err(e) = entry.renew_jwks(permit, &client, role_name, &fetch).await {
tracing::warn!(error=?e, "could not fetch JWKs in background job");
}
});
@@ -231,7 +216,6 @@ impl JwkCacheEntryLock {
ctx: &RequestMonitoring,
jwt: &str,
client: &reqwest::Client,
endpoint: EndpointId,
role_name: RoleName,
fetch: &F,
) -> Result<(), anyhow::Error> {
@@ -258,7 +242,7 @@ impl JwkCacheEntryLock {
let kid = header.key_id.context("missing key id")?;
let mut guard = self
.get_or_update_jwk_cache(ctx, client, endpoint.clone(), role_name.clone(), fetch)
.get_or_update_jwk_cache(ctx, client, role_name.clone(), fetch)
.await?;
// get the key from the JWKs if possible. If not, wait for the keys to update.
@@ -270,14 +254,7 @@ impl JwkCacheEntryLock {
let permit = self.acquire_permit().await;
guard = self
.renew_jwks(
permit,
ctx,
client,
endpoint.clone(),
role_name.clone(),
fetch,
)
.renew_jwks(permit, client, role_name.clone(), fetch)
.await?;
}
_ => {
@@ -341,7 +318,7 @@ impl JwkCache {
jwt: &str,
) -> Result<(), anyhow::Error> {
// try with just a read lock first
let key = (endpoint.clone(), role_name.clone());
let key = (endpoint, role_name.clone());
let entry = self.map.get(&key).as_deref().map(Arc::clone);
let entry = entry.unwrap_or_else(|| {
// acquire a write lock after to insert.
@@ -350,7 +327,7 @@ impl JwkCache {
});
entry
.check_jwt(ctx, jwt, &self.client, endpoint, role_name, fetch)
.check_jwt(ctx, jwt, &self.client, role_name, fetch)
.await
}
}
@@ -711,8 +688,6 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
impl FetchAuthRules for Fetch {
async fn fetch_auth_rules(
&self,
_ctx: &RequestMonitoring,
_endpoint: EndpointId,
_role_name: RoleName,
) -> anyhow::Result<Vec<AuthRule>> {
Ok(vec![
@@ -731,7 +706,6 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
}
let role_name = RoleName::from("user");
let endpoint = EndpointId::from("ep");
let jwk_cache = Arc::new(JwkCacheEntryLock::default());
@@ -741,7 +715,6 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
&RequestMonitoring::test(),
&token,
&client,
endpoint.clone(),
role_name.clone(),
&Fetch(addr),
)

View File

@@ -9,9 +9,8 @@ use crate::{
messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo},
NodeInfo,
},
context::RequestMonitoring,
intern::{BranchIdInt, BranchIdTag, EndpointIdTag, InternId, ProjectIdInt, ProjectIdTag},
EndpointId, RoleName,
RoleName,
};
use super::jwt::{AuthRule, FetchAuthRules, JwkCache};
@@ -58,12 +57,7 @@ pub struct JwksRoleSettings {
}
impl FetchAuthRules for StaticAuthRules {
async fn fetch_auth_rules(
&self,
_ctx: &RequestMonitoring,
_endpoint: EndpointId,
role_name: RoleName,
) -> anyhow::Result<Vec<AuthRule>> {
async fn fetch_auth_rules(&self, role_name: RoleName) -> anyhow::Result<Vec<AuthRule>> {
let mappings = JWKS_ROLE_MAP.load();
let role_mappings = mappings
.as_deref()

View File

@@ -303,7 +303,6 @@ impl NodeInfo {
pub(crate) fn set_keys(&mut self, keys: &ComputeCredentialKeys) {
match keys {
#[cfg(any(test, feature = "testing"))]
ComputeCredentialKeys::Password(password) => self.config.password(password),
ComputeCredentialKeys::AuthKeys(auth_keys) => self.config.auth_keys(*auth_keys),
ComputeCredentialKeys::None => &mut self.config,

View File

@@ -79,40 +79,6 @@ pub(crate) enum AuthMethod {
Cleartext,
}
impl Clone for RequestMonitoring {
fn clone(&self) -> Self {
let inner = self.0.try_lock().expect("should not deadlock");
let new = RequestMonitoringInner {
peer_addr: inner.peer_addr,
session_id: inner.session_id,
protocol: inner.protocol,
first_packet: inner.first_packet,
region: inner.region,
span: info_span!("background_task"),
project: inner.project,
branch: inner.branch,
endpoint_id: inner.endpoint_id.clone(),
dbname: inner.dbname.clone(),
user: inner.user.clone(),
application: inner.application.clone(),
error_kind: inner.error_kind,
auth_method: inner.auth_method.clone(),
success: inner.success,
rejected: inner.rejected,
cold_start_info: inner.cold_start_info,
pg_options: inner.pg_options.clone(),
sender: None,
disconnect_sender: None,
latency_timer: LatencyTimer::noop(inner.protocol),
disconnect_timestamp: inner.disconnect_timestamp,
};
Self(TryLock::new(new))
}
}
impl RequestMonitoring {
pub fn new(
session_id: Uuid,

View File

@@ -397,8 +397,6 @@ pub struct LatencyTimer {
protocol: Protocol,
cold_start_info: ColdStartInfo,
outcome: ConnectOutcome,
skip_reporting: bool,
}
impl LatencyTimer {
@@ -411,20 +409,6 @@ impl LatencyTimer {
cold_start_info: ColdStartInfo::Unknown,
// assume failed unless otherwise specified
outcome: ConnectOutcome::Failed,
skip_reporting: false,
}
}
pub(crate) fn noop(protocol: Protocol) -> Self {
Self {
start: time::Instant::now(),
stop: None,
accumulated: Accumulated::default(),
protocol,
cold_start_info: ColdStartInfo::Unknown,
// assume failed unless otherwise specified
outcome: ConnectOutcome::Failed,
skip_reporting: true,
}
}
@@ -459,10 +443,6 @@ pub enum ConnectOutcome {
impl Drop for LatencyTimer {
fn drop(&mut self) {
if self.skip_reporting {
return;
}
let duration = self
.stop
.unwrap_or_else(time::Instant::now)

View File

@@ -27,7 +27,7 @@ use crate::{
Host,
};
use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool};
use super::conn_pool::{poll_client, AuthData, Client, ConnInfo, GlobalConnPool};
pub(crate) struct PoolingBackend {
pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
@@ -274,6 +274,13 @@ impl ConnectMechanism for TokioMechanism {
.dbname(&self.conn_info.dbname)
.connect_timeout(timeout);
match &self.conn_info.auth {
AuthData::Jwt(_) => {}
AuthData::Password(pw) => {
config.password(pw);
}
}
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let res = config.connect(tokio_postgres::NoTls).await;
drop(pause);

View File

@@ -29,16 +29,11 @@ use tracing::{info, info_span, Instrument};
use super::backend::HttpConnError;
#[derive(Debug, Clone)]
pub(crate) struct ConnInfoWithAuth {
pub(crate) conn_info: ConnInfo,
pub(crate) auth: AuthData,
}
#[derive(Debug, Clone)]
pub(crate) struct ConnInfo {
pub(crate) user_info: ComputeUserInfo,
pub(crate) dbname: DbName,
pub(crate) auth: AuthData,
}
#[derive(Debug, Clone)]
@@ -792,6 +787,7 @@ mod tests {
options: NeonOptions::default(),
},
dbname: "dbname".into(),
auth: AuthData::Password("password".as_bytes().into()),
};
let ep_pool = Arc::downgrade(
&pool.get_or_create_endpoint_pool(&conn_info.endpoint_cache_key().unwrap()),
@@ -849,6 +845,7 @@ mod tests {
options: NeonOptions::default(),
},
dbname: "dbname".into(),
auth: AuthData::Password("password".as_bytes().into()),
};
let ep_pool = Arc::downgrade(
&pool.get_or_create_endpoint_pool(&conn_info.endpoint_cache_key().unwrap()),

View File

@@ -60,7 +60,6 @@ use super::backend::PoolingBackend;
use super::conn_pool::AuthData;
use super::conn_pool::Client;
use super::conn_pool::ConnInfo;
use super::conn_pool::ConnInfoWithAuth;
use super::http_util::json_response;
use super::json::json_to_pg_text;
use super::json::pg_text_row_to_json;
@@ -149,7 +148,7 @@ fn get_conn_info(
ctx: &RequestMonitoring,
headers: &HeaderMap,
tls: Option<&TlsConfig>,
) -> Result<ConnInfoWithAuth, ConnInfoError> {
) -> Result<ConnInfo, ConnInfoError> {
// HTTP only uses cleartext (for now and likely always)
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
@@ -236,8 +235,11 @@ fn get_conn_info(
options: options.unwrap_or_default(),
};
let conn_info = ConnInfo { user_info, dbname };
Ok(ConnInfoWithAuth { conn_info, auth })
Ok(ConnInfo {
user_info,
dbname,
auth,
})
}
// TODO: return different http error codes
@@ -521,10 +523,7 @@ async fn handle_inner(
// TLS config should be there.
let conn_info = get_conn_info(ctx, headers, config.tls_config.as_ref())?;
info!(
user = conn_info.conn_info.user_info.user.as_str(),
"credentials"
);
info!(user = conn_info.user_info.user.as_str(), "credentials");
// Allow connection pooling only if explicitly requested
// or if we have decided that http pool is no longer opt-in
@@ -569,20 +568,20 @@ async fn handle_inner(
.authenticate_with_password(
ctx,
&config.authentication_config,
&conn_info.conn_info.user_info,
&conn_info.user_info,
pw,
)
.await?
}
AuthData::Jwt(jwt) => {
backend
.authenticate_with_jwt(ctx, &conn_info.conn_info.user_info, jwt)
.authenticate_with_jwt(ctx, &conn_info.user_info, jwt)
.await?
}
};
let client = backend
.connect_to_compute(ctx, conn_info.conn_info, keys, !allow_pool)
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
.await?;
// not strictly necessary to mark success here,
// but it's just insurance for if we forget it somewhere else

View File

@@ -17,7 +17,6 @@ use postgres_ffi::MAX_SEND_SIZE;
use serde::Deserialize;
use serde::Serialize;
use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName};
use sha2::{Digest, Sha256};
use utils::id::NodeId;
use utils::id::TenantTimelineId;
@@ -52,9 +51,6 @@ pub struct Args {
/// Dump full term history. True by default.
pub dump_term_history: bool,
/// Dump last modified time of WAL segments. Uses value of `dump_all` by default.
pub dump_wal_last_modified: bool,
/// Filter timelines by tenant_id.
pub tenant_id: Option<TenantId>,
@@ -132,19 +128,12 @@ async fn build_from_tli_dump(
None
};
let wal_last_modified = if args.dump_wal_last_modified {
get_wal_last_modified(timeline_dir).ok().flatten()
} else {
None
};
Timeline {
tenant_id: timeline.ttid.tenant_id,
timeline_id: timeline.ttid.timeline_id,
control_file,
memory,
disk_content,
wal_last_modified,
}
}
@@ -167,7 +156,6 @@ pub struct Timeline {
pub control_file: Option<TimelinePersistentState>,
pub memory: Option<Memory>,
pub disk_content: Option<DiskContent>,
pub wal_last_modified: Option<DateTime<Utc>>,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -314,27 +302,6 @@ fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
})
}
/// Get highest modified time of WAL segments in the directory.
fn get_wal_last_modified(path: &Utf8Path) -> Result<Option<DateTime<Utc>>> {
let mut res = None;
for entry in fs::read_dir(path)? {
if entry.is_err() {
continue;
}
let entry = entry?;
/* Ignore files that are not XLOG segments */
let fname = entry.file_name();
if !IsXLogFileName(&fname) && !IsPartialXLogFileName(&fname) {
continue;
}
let metadata = entry.metadata()?;
let modified: DateTime<Utc> = DateTime::from(metadata.modified()?);
res = std::cmp::max(res, Some(modified));
}
Ok(res)
}
/// Converts SafeKeeperConf to Config, filtering out the fields that are not
/// supposed to be exposed.
fn build_config(config: SafeKeeperConf) -> Config {

View File

@@ -481,7 +481,6 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
let mut dump_memory: Option<bool> = None;
let mut dump_disk_content: Option<bool> = None;
let mut dump_term_history: Option<bool> = None;
let mut dump_wal_last_modified: Option<bool> = None;
let mut tenant_id: Option<TenantId> = None;
let mut timeline_id: Option<TimelineId> = None;
@@ -495,7 +494,6 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
"dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
"dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
"dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
"dump_wal_last_modified" => dump_wal_last_modified = Some(parse_kv_str(&k, &v)?),
"tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
"timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
_ => Err(ApiError::BadRequest(anyhow::anyhow!(
@@ -510,7 +508,6 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
let dump_memory = dump_memory.unwrap_or(dump_all);
let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
let dump_term_history = dump_term_history.unwrap_or(true);
let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all);
let args = debug_dump::Args {
dump_all,
@@ -518,7 +515,6 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
dump_memory,
dump_disk_content,
dump_term_history,
dump_wal_last_modified,
tenant_id,
timeline_id,
};

View File

@@ -539,17 +539,20 @@ async fn remove_segments_from_disk(
while let Some(entry) = entries.next_entry().await? {
let entry_path = entry.path();
let fname = entry_path.file_name().unwrap();
/* Ignore files that are not XLOG segments */
if !IsXLogFileName(fname) && !IsPartialXLogFileName(fname) {
continue;
}
let (segno, _) = XLogFromFileName(fname, wal_seg_size)?;
if remove_predicate(segno) {
remove_file(entry_path).await?;
n_removed += 1;
min_removed = min(min_removed, segno);
max_removed = max(max_removed, segno);
REMOVED_WAL_SEGMENTS.inc();
if let Some(fname_str) = fname.to_str() {
/* Ignore files that are not XLOG segments */
if !IsXLogFileName(fname_str) && !IsPartialXLogFileName(fname_str) {
continue;
}
let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
if remove_predicate(segno) {
remove_file(entry_path).await?;
n_removed += 1;
min_removed = min(min_removed, segno);
max_removed = max(max_removed, segno);
REMOVED_WAL_SEGMENTS.inc();
}
}
}

View File

@@ -1,8 +1,7 @@
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeSet, HashMap, HashSet};
use anyhow::Context;
use itertools::Itertools;
use pageserver::tenant::checks::check_valid_layermap;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver_api::shard::ShardIndex;
@@ -49,6 +48,56 @@ impl TimelineAnalysis {
}
}
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
///
/// ```plain
/// | | | |
/// | 1 | | 2 | | 3 |
/// | | | | | |
/// ```
///
/// This is not a valid layer map because the LSN range of layer 1 intersects with the LSN range of layer 2. 1 and 2 should have
/// the same LSN range.
///
/// The exception is that when layer 2 only contains a single key, it could be split over the LSN range. For example,
///
/// ```plain
/// | | | 2 | | |
/// | 1 | |-------| | 3 |
/// | | | 4 | | |
///
/// If layer 2 and 4 contain the same single key, this is also a valid layer map.
fn check_valid_layermap(metadata: &HashMap<LayerName, LayerFileMetadata>) -> Option<String> {
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
let mut all_delta_layers = Vec::new();
for (name, _) in metadata.iter() {
if let LayerName::Delta(layer) = name {
if layer.key_range.start.next() != layer.key_range.end {
all_delta_layers.push(layer.clone());
}
}
}
for layer in &all_delta_layers {
let lsn_range = &layer.lsn_range;
lsn_split_point.insert(lsn_range.start);
lsn_split_point.insert(lsn_range.end);
}
for layer in &all_delta_layers {
let lsn_range = layer.lsn_range.clone();
let intersects = lsn_split_point.range(lsn_range).collect_vec();
if intersects.len() > 1 {
let err = format!(
"layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
layer,
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
);
return Some(err);
}
}
None
}
pub(crate) async fn branch_cleanup_and_check_errors(
remote_client: &GenericRemoteStorage,
id: &TenantShardTimelineId,
@@ -128,8 +177,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
}
}
let layer_names = index_part.layer_metadata.keys().cloned().collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
if let Some(err) = check_valid_layermap(&index_part.layer_metadata) {
result.errors.push(format!(
"index_part.json contains invalid layer map structure: {err}"
));

View File

@@ -3863,6 +3863,9 @@ def static_proxy(
dbname = vanilla_pg.default_options["dbname"]
auth_endpoint = f"postgres://proxy:password@{host}:{port}/{dbname}"
# require password for 'http_auth' user
vanilla_pg.edit_hba([f"host {dbname} http_auth {host} password"])
# For simplicity, we use the same user for both `--auth-endpoint` and `safe_psql`
vanilla_pg.start()
vanilla_pg.safe_psql("create user proxy with login superuser password 'password'")

View File

@@ -142,7 +142,6 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
"image_creation_threshold": "1",
# set PITR interval to be small, so we can do GC
"pitr_interval": "0 s",
"lsn_lease_length": "0s",
}
)

View File

@@ -11,9 +11,7 @@ from fixtures.utils import print_gc_result, query_scalar
#
def test_branch_behind(neon_env_builder: NeonEnvBuilder):
# Disable pitr, because here we want to test branch creation after GC
env = neon_env_builder.init_start(
initial_tenant_conf={"pitr_interval": "0 sec", "lsn_lease_length": "0s"}
)
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
error_regexes = [
".*invalid branch start lsn.*",

View File

@@ -419,7 +419,7 @@ def test_duplicate_creation(neon_env_builder: NeonEnvBuilder):
def test_branching_while_stuck_find_gc_cutoffs(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
env = neon_env_builder.init_start()
client = env.pageserver.http_client()

View File

@@ -240,7 +240,6 @@ def test_uploads_and_deletions(
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"compaction_algorithm": json.dumps({"kind": compaction_algorithm.value}),
"lsn_lease_length": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)

View File

@@ -222,7 +222,7 @@ def pgbench_accounts_initialized(ep):
# Without hs feedback enabled we'd see 'User query might have needed to see row
# versions that must be removed.' errors.
def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
env = neon_env_builder.init_start()
agressive_vacuum_conf = [
"log_autovacuum_min_duration = 0",
"autovacuum_naptime = 10s",

View File

@@ -173,7 +173,6 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
# "image_creation_threshold": set at runtime
"compaction_target_size": f"{128 * (1024**2)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
"image_layer_creation_check_threshold": "0", # always check if a new image layer can be created
"lsn_lease_length": "0s",
}
def tenant_update_config(changes):

View File

@@ -53,7 +53,6 @@ TENANT_CONF = {
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"lsn_lease_length": "0s",
}

View File

@@ -139,7 +139,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
# Short lease length to fit test.
"lsn_lease_length": "8s",
"lsn_lease_length": "3s",
},
initial_tenant_shard_count=2,
)
@@ -205,7 +205,7 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
generate_updates_on_main(env, ep_main, i, end=100)
# Now trigger GC again, layers should be removed.
time.sleep(8)
time.sleep(4)
for shard, ps in tenant_get_shards(env, env.initial_tenant):
client = ps.http_client()
gc_result = client.timeline_gc(shard, env.initial_timeline, 0)

View File

@@ -244,7 +244,6 @@ def test_remote_storage_upload_queue_retries(
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"lsn_lease_length": "0s",
}
)
@@ -392,7 +391,6 @@ def test_remote_timeline_client_calls_started_metric(
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
"lsn_lease_length": "0s",
}
)

View File

@@ -200,7 +200,6 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
# Disable automatic creation of image layers, as we will create them explicitly when we want them
"image_creation_threshold": 9999,
"image_layer_creation_check_threshold": 0,
"lsn_lease_length": "0s",
}
neon_env_builder.storage_controller_config = {

View File

@@ -485,7 +485,7 @@ def test_storage_controller_compute_hook(
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
# Start running
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
env = neon_env_builder.init_start()
# Initial notification from tenant creation
assert len(notifications) == 1

View File

@@ -204,7 +204,6 @@ def test_scrubber_physical_gc_ancestors(
# No PITR, so that as soon as child shards generate an image layer, it covers ancestor deltas
# and makes them GC'able
"pitr_interval": "0s",
"lsn_lease_length": "0s",
},
)

View File

@@ -266,13 +266,13 @@ def test_tenant_reattach_while_busy(
def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)
# create new nenant
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
tenant_id, timeline_id = env.neon_cli.create_tenant()
# assert tenant exists on disk
assert env.pageserver.tenant_dir(tenant_id).exists()

View File

@@ -45,10 +45,7 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool
tenant_after = http.tenant_status(env.initial_tenant)
assert tenant_before != tenant_after
gc_blocking = tenant_after["gc_blocking"]
assert (
gc_blocking
== "BlockingReasons { tenant_blocked_by_lsn_lease_deadline: false, timelines: 1, reasons: EnumSet(Manual) }"
)
assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }"
wait_for_another_gc_round()
pss.assert_log_contains(gc_skipped_line)

View File

@@ -892,7 +892,6 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
log.info(f"debug_dump before reboot {debug_dump_0}")
assert debug_dump_0["timelines_count"] == 1
assert debug_dump_0["timelines"][0]["timeline_id"] == str(timeline_id)
assert debug_dump_0["timelines"][0]["wal_last_modified"] != ""
endpoint.safe_psql("create table t(i int)")