Compare commits

..

14 Commits

Author SHA1 Message Date
James Broadhead
f79048f0ec gci autocommit 2024-09-20 14:46:19 +01:00
James Broadhead
3476727923 Readme: add more rustup steps 2024-09-20 14:44:15 +01:00
Alexander Bayandin
3104f0f250 Safekeeper: fix OpenAPI spec (#9066)
## Problem

Safekeeper's OpenAPI spec is incorrect:

```
Semantic error at paths./v1/tenant/{tenant_id}/timeline/{timeline_id}.get.responses.404.content.application/json.schema.$ref
$refs must reference a valid location in the document
Jump to line 126
```
Checked on https://editor.swagger.io

## Summary of changes
- Add `NotFoundError` 
- Add `description` and `license` fields to make Cloud OpenAPI spec
linter happy
2024-09-20 12:00:05 +01:00
Arseny Sher
f2c08195f0 Bump vendor/postgres.
Includes PRs:
- ERROR out instead of segfaulting when walsender slots are full.
- logical worker: respond to publisher even under dense stream.
2024-09-20 12:38:42 +03:00
Alex Chi Z.
d0cbfda15c refactor(pageserver): check layer map valid in one place (#9051)
We have 3 places where we implement layer map checks.

## Summary of changes

Now we have a single check function being called in all places.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-09-19 20:29:28 +00:00
Yuchen Liang
1708743e78 pageserver: wait for lsn lease duration after transition into AttachedSingle (#9024)
Part of #7497, closes https://github.com/neondatabase/neon/issues/8890.

## Problem

Since leases are in-memory objects, we need to take special care of them
after pageserver restarts and while doing a live migration. The approach
we took for pageserver restart is to wait for at least lease duration
before doing first GC. We want to do the same for live migration. Since
we do not do any GC when a tenant is in `AttachedStale` or
`AttachedMulti` mode, only the transition from `AttachedMulti` to
`AttachedSingle` requires this treatment.

## Summary of changes

- Added `lsn_lease_deadline` field in `GcBlock::reasons`: the tenant is
temporarily blocked from GC until we reach the deadline. This
information does not persist to S3.
- In `GCBlock::start`, skip the GC iteration if we are blocked by the
lsn lease deadline.
- In `TenantManager::upsert_location`, set the lsn_lease_deadline to
`Instant::now() + lsn_lease_length` so the granted leases have a chance
to be renewed before we run GC for the first time after transitioned
from AttachedMulti to AttachedSingle.

Signed-off-by: Yuchen Liang <yuchen@neon.tech>
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2024-09-19 17:27:10 +01:00
Conrad Ludgate
0a1ca7670c proxy: remove auth info from http conn info & fixup jwt api trait (#9047)
misc changes split out from #8855 

- **allow cloning the request context in a read-only fashion for
background tasks**
- **propagate endpoint and request context through the jwk cache**
- **only allow password based auth for md5 during testing**
- **remove auth info from conn info**
2024-09-19 15:09:30 +00:00
Alex Chi Z.
ff9f065c43 impr(pageserver): log image layer creation (#9050)
https://github.com/neondatabase/neon/pull/9028 changed the image layer
creation log into trace level. However, I personally find logging image
layer creation useful when reading the logs -- it makes it clear that
the image layer creation is happening and gives a clear idea of the
progress. Therefore, I propose to continue logging them for
create_image_layers set of functions.

## Summary of changes

* Add info logging for all image layers created in legacy compaction.
* Add info logging for all layers creation in testing functions.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-09-19 10:43:12 -04:00
Vlad Lazar
21eeafaaa5 pageserver: simple fix for vectored read image layer skip (#9026)
## Problem

Different keyspaces may require different floor LSNs in vectored
delta layer visits. This patch adds support for such cases.

## Summary of changes

Different keyspaces wishing to read the same layer might
require different stop lsns (or lsn floor). The start LSN
of the read (or the lsn ceil) will always be the same.

With this observation, we fix skipping of image layers by
indexing the fringe by layer id plus lsn floor.

This is very simple, but means that we can visit delta layers twice
in certain cases. Still, I think it's very unlikely for any extra
merging to have taken place in this case, so perhaps it makes sense to go
with the simpler patch.

Fixes https://github.com/neondatabase/neon/issues/9012
Alternative to https://github.com/neondatabase/neon/pull/9025
2024-09-19 14:51:00 +01:00
Arseny Sher
32a0e759bd safekeeper: add wal_last_modified to debug_dump.
Adds to debug_dump option to include highest modified time among all WAL
segments. In passing replace some str with OsStr to have less unwraps.
2024-09-19 16:17:25 +03:00
Heikki Linnakangas
7c489092b7 Remove unused duplicate DEFAULT_INGEST_BATCH_SIZE constant
This constant in 'tenant_conf_defaults' was unused, but there's
another constant with the same name in the global 'defaults'. I wish
the setting was configurable per-tenant, but it isn't, so let's remove
the confusing duplicate.
2024-09-19 15:41:35 +03:00
Heikki Linnakangas
06d55a3b12 Clean up concurrent logical size calc semaphore initialization
The DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES constant was
unused, because we had just hardcoded it to 1 where the constant
should've been used.

Remove the ConfigurableSemaphore::Default implementation, since it was
unused.
2024-09-19 15:41:35 +03:00
Heikki Linnakangas
5c68e6a172 Remove unused constant
The code that used it was removed in commit b9d2c7bdd5
2024-09-19 15:41:35 +03:00
Heikki Linnakangas
2753abc0d8 Remove leftover enums for configuring vectored get implementation
The settings were removed in commit corb9d2c7b.
2024-09-19 15:41:35 +03:00
49 changed files with 622 additions and 305 deletions

View File

@@ -52,6 +52,11 @@ Building Neon requires 3.15+ version of `protoc` (protobuf-compiler). If your di
```
# recommended approach from https://www.rust-lang.org/tools/install
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# bash-only: add $HOME/.cargo/bin to PATH. The script does this for zsh
# echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> ~/.bashrc
rustup show
```
#### Installing dependencies on macOS (12.3.1)
@@ -74,6 +79,11 @@ brew link --force m4
```
# recommended approach from https://www.rust-lang.org/tools/install
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# bash-only: add $HOME/.cargo/bin to PATH. The script does this for zsh
echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> ~/.bashrc
rustup show
```
3. Install PostgreSQL Client

View File

@@ -173,40 +173,6 @@ impl Default for EvictionOrder {
}
}
#[derive(
Eq,
PartialEq,
Debug,
Copy,
Clone,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
)]
#[strum(serialize_all = "kebab-case")]
pub enum GetVectoredImpl {
Sequential,
Vectored,
}
#[derive(
Eq,
PartialEq,
Debug,
Copy,
Clone,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
)]
#[strum(serialize_all = "kebab-case")]
pub enum GetImpl {
Legacy,
Vectored,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct MaxVectoredReadBytes(pub NonZeroUsize);
@@ -338,8 +304,6 @@ pub mod defaults {
pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm =
ImageCompressionAlgorithm::Zstd { level: Some(1) };
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = false;
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
@@ -376,7 +340,10 @@ impl Default for ConfigToml {
concurrent_tenant_warmup: (NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
.expect("Invalid default constant")),
concurrent_tenant_size_logical_size_queries: NonZeroUsize::new(1).unwrap(),
concurrent_tenant_size_logical_size_queries: NonZeroUsize::new(
DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES,
)
.unwrap(),
metric_collection_interval: (humantime::parse_duration(
DEFAULT_METRIC_COLLECTION_INTERVAL,
)
@@ -467,8 +434,6 @@ pub mod tenant_conf_defaults {
// By default ingest enough WAL for two new L0 layers before checking if new image
// image layers should be created.
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
}
impl Default for TenantConfigToml {

View File

@@ -26,6 +26,7 @@ use bytes::{Buf, Bytes};
use log::*;
use serde::Serialize;
use std::ffi::OsStr;
use std::fs::File;
use std::io::prelude::*;
use std::io::ErrorKind;
@@ -78,19 +79,34 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize
)
}
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
pub fn XLogFromFileName(
fname: &OsStr,
wal_seg_size: usize,
) -> anyhow::Result<(XLogSegNo, TimeLineID)> {
if let Some(fname_str) = fname.to_str() {
let tli = u32::from_str_radix(&fname_str[0..8], 16)?;
let log = u32::from_str_radix(&fname_str[8..16], 16)? as XLogSegNo;
let seg = u32::from_str_radix(&fname_str[16..24], 16)? as XLogSegNo;
Ok((log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli))
} else {
anyhow::bail!("non-ut8 filename: {:?}", fname);
}
}
pub fn IsXLogFileName(fname: &str) -> bool {
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
pub fn IsXLogFileName(fname: &OsStr) -> bool {
if let Some(fname) = fname.to_str() {
fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit())
} else {
false
}
}
pub fn IsPartialXLogFileName(fname: &str) -> bool {
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
pub fn IsPartialXLogFileName(fname: &OsStr) -> bool {
if let Some(fname) = fname.to_str() {
fname.ends_with(".partial") && IsXLogFileName(OsStr::new(&fname[0..fname.len() - 8]))
} else {
false
}
}
/// If LSN points to the beginning of the page, then shift it to first record,

View File

@@ -7,6 +7,7 @@ use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
use postgres_ffi::{
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
@@ -135,8 +136,8 @@ impl Conf {
pub fn pg_waldump(
&self,
first_segment_name: &str,
last_segment_name: &str,
first_segment_name: &OsStr,
last_segment_name: &OsStr,
) -> anyhow::Result<std::process::Output> {
let first_segment_file = self.datadir.join(first_segment_name);
let last_segment_file = self.datadir.join(last_segment_name);

View File

@@ -4,6 +4,7 @@ use super::*;
use crate::{error, info};
use regex::Regex;
use std::cmp::min;
use std::ffi::OsStr;
use std::fs::{self, File};
use std::io::Write;
use std::{env, str::FromStr};
@@ -54,7 +55,7 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
.wal_dir()
.read_dir()
.unwrap()
.map(|f| f.unwrap().file_name().into_string().unwrap())
.map(|f| f.unwrap().file_name())
.filter(|fname| IsXLogFileName(fname))
.max()
.unwrap();
@@ -70,11 +71,11 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
start_lsn
);
for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
let fname = file.file_name().into_string().unwrap();
let fname = file.file_name();
if !IsXLogFileName(&fname) {
continue;
}
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE);
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE).unwrap();
let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
if seg_start_lsn > u64::from(*start_lsn) {
continue;
@@ -93,10 +94,10 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
}
}
fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &str) -> Lsn {
fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &OsStr) -> Lsn {
// Get the actual end of WAL by pg_waldump
let waldump_output = cfg
.pg_waldump("000000010000000000000001", last_segment)
.pg_waldump(OsStr::new("000000010000000000000001"), last_segment)
.unwrap()
.stderr;
let waldump_output = std::str::from_utf8(&waldump_output).unwrap();
@@ -117,7 +118,7 @@ fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &str) -> Lsn {
fn check_end_of_wal(
cfg: &crate::Conf,
last_segment: &str,
last_segment: &OsStr,
start_lsn: Lsn,
expected_end_of_wal: Lsn,
) {
@@ -132,7 +133,8 @@ fn check_end_of_wal(
// Rename file to partial to actually find last valid lsn, then rename it back.
fs::rename(
cfg.wal_dir().join(last_segment),
cfg.wal_dir().join(format!("{}.partial", last_segment)),
cfg.wal_dir()
.join(format!("{}.partial", last_segment.to_str().unwrap())),
)
.unwrap();
let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
@@ -142,7 +144,8 @@ fn check_end_of_wal(
);
assert_eq!(wal_end, expected_end_of_wal);
fs::rename(
cfg.wal_dir().join(format!("{}.partial", last_segment)),
cfg.wal_dir()
.join(format!("{}.partial", last_segment.to_str().unwrap())),
cfg.wal_dir().join(last_segment),
)
.unwrap();

View File

@@ -479,11 +479,6 @@ pub struct ConfigurableSemaphore {
}
impl ConfigurableSemaphore {
pub const DEFAULT_INITIAL: NonZeroUsize = match NonZeroUsize::new(1) {
Some(x) => x,
None => panic!("const unwrap is not yet stable"),
};
/// Initializse using a non-zero amount of permits.
///
/// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a
@@ -504,12 +499,6 @@ impl ConfigurableSemaphore {
}
}
impl Default for ConfigurableSemaphore {
fn default() -> Self {
Self::new(Self::DEFAULT_INITIAL)
}
}
impl PartialEq for ConfigurableSemaphore {
fn eq(&self, other: &Self) -> bool {
// the number of permits can be increased at runtime, so we cannot really fulfill the

View File

@@ -140,6 +140,7 @@ pub mod metadata;
pub mod remote_timeline_client;
pub mod storage_layer;
pub mod checks;
pub mod config;
pub mod mgr;
pub mod secondary;
@@ -1573,6 +1574,9 @@ impl Tenant {
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
end_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
use checks::check_valid_layermap;
use itertools::Itertools;
let tline = self
.create_test_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
.await?;
@@ -1587,6 +1591,18 @@ impl Tenant {
.force_create_image_layer(lsn, images, Some(initdb_lsn), ctx)
.await?;
}
let layer_names = tline
.layers
.read()
.await
.layer_map()
.unwrap()
.iter_historic_layers()
.map(|layer| layer.layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
bail!("invalid layermap: {err}");
}
Ok(tline)
}
@@ -3197,6 +3213,9 @@ impl Tenant {
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
end_lsn: Lsn,
) -> anyhow::Result<Arc<Timeline>> {
use checks::check_valid_layermap;
use itertools::Itertools;
let tline = self
.branch_timeline_test(src_timeline, dst_id, ancestor_lsn, ctx)
.await?;
@@ -3217,6 +3236,18 @@ impl Tenant {
.force_create_image_layer(lsn, images, Some(ancestor_lsn), ctx)
.await?;
}
let layer_names = tline
.layers
.read()
.await
.layer_map()
.unwrap()
.iter_historic_layers()
.map(|layer| layer.layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
bail!("invalid layermap: {err}");
}
Ok(tline)
}
@@ -4164,9 +4195,18 @@ pub(crate) mod harness {
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
if records_neon {
// For Neon wal records, we can decode without spawning postgres, so do so.
let base_img = base_img.expect("Neon WAL redo requires base image").1;
let mut page = BytesMut::new();
page.extend_from_slice(&base_img);
let mut page = match (base_img, records.first()) {
(Some((_lsn, img)), _) => {
let mut page = BytesMut::new();
page.extend_from_slice(&img);
page
}
(_, Some((_lsn, rec))) if rec.will_init() => BytesMut::new(),
_ => {
panic!("Neon WAL redo requires base image or will init record");
}
};
for (record_lsn, record) in records {
apply_neon::apply_in_neon(&record, record_lsn, key, &mut page)?;
}
@@ -8470,4 +8510,135 @@ mod tests {
Ok(())
}
// Regression test for https://github.com/neondatabase/neon/issues/9012
// Create an image arrangement where we have to read at different LSN ranges
// from a delta layer. This is achieved by overlapping an image layer on top of
// a delta layer. Like so:
//
// A B
// +----------------+ -> delta_layer
// | | ^ lsn
// | =========|-> nested_image_layer |
// | C | |
// +----------------+ |
// ======== -> baseline_image_layer +-------> key
//
//
// When querying the key range [A, B) we need to read at different LSN ranges
// for [A, C) and [C, B). This test checks that the described edge case is handled correctly.
#[tokio::test]
async fn test_vectored_read_with_nested_image_layer() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_read_with_nested_image_layer").await?;
let (tenant, ctx) = harness.load().await;
let will_init_keys = [2, 6];
fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("110000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
let mut expected_key_values = HashMap::new();
let baseline_image_layer_lsn = Lsn(0x10);
let mut baseline_img_layer = Vec::new();
for i in 0..5 {
let key = get_key(i);
let value = format!("value {i}@{baseline_image_layer_lsn}");
let removed = expected_key_values.insert(key, value.clone());
assert!(removed.is_none());
baseline_img_layer.push((key, Bytes::from(value)));
}
let nested_image_layer_lsn = Lsn(0x50);
let mut nested_img_layer = Vec::new();
for i in 5..10 {
let key = get_key(i);
let value = format!("value {i}@{nested_image_layer_lsn}");
let removed = expected_key_values.insert(key, value.clone());
assert!(removed.is_none());
nested_img_layer.push((key, Bytes::from(value)));
}
let mut delta_layer_spec = Vec::default();
let delta_layer_start_lsn = Lsn(0x20);
let mut delta_layer_end_lsn = delta_layer_start_lsn;
for i in 0..10 {
let key = get_key(i);
let key_in_nested = nested_img_layer
.iter()
.any(|(key_with_img, _)| *key_with_img == key);
let lsn = {
if key_in_nested {
Lsn(nested_image_layer_lsn.0 + 0x10)
} else {
delta_layer_start_lsn
}
};
let will_init = will_init_keys.contains(&i);
if will_init {
delta_layer_spec.push((key, lsn, Value::WalRecord(NeonWalRecord::wal_init())));
expected_key_values.insert(key, "".to_string());
} else {
let delta = format!("@{lsn}");
delta_layer_spec.push((
key,
lsn,
Value::WalRecord(NeonWalRecord::wal_append(&delta)),
));
expected_key_values
.get_mut(&key)
.expect("An image exists for each key")
.push_str(delta.as_str());
}
delta_layer_end_lsn = std::cmp::max(delta_layer_start_lsn, lsn);
}
delta_layer_end_lsn = Lsn(delta_layer_end_lsn.0 + 1);
assert!(
nested_image_layer_lsn > delta_layer_start_lsn
&& nested_image_layer_lsn < delta_layer_end_lsn
);
let tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
baseline_image_layer_lsn,
DEFAULT_PG_VERSION,
&ctx,
vec![DeltaLayerTestDesc::new_with_inferred_key_range(
delta_layer_start_lsn..delta_layer_end_lsn,
delta_layer_spec,
)], // delta layers
vec![
(baseline_image_layer_lsn, baseline_img_layer),
(nested_image_layer_lsn, nested_img_layer),
], // image layers
delta_layer_end_lsn,
)
.await?;
let keyspace = KeySpace::single(get_key(0)..get_key(10));
let results = tline
.get_vectored(keyspace, delta_layer_end_lsn, &ctx)
.await
.expect("No vectored errors");
for (key, res) in results {
let value = res.expect("No key errors");
let expected_value = expected_key_values.remove(&key).expect("No unknown keys");
assert_eq!(value, Bytes::from(expected_value));
}
Ok(())
}
}

View File

@@ -0,0 +1,55 @@
use std::collections::BTreeSet;
use itertools::Itertools;
use super::storage_layer::LayerName;
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
///
/// ```plain
/// | | | |
/// | 1 | | 2 | | 3 |
/// | | | | | |
/// ```
///
/// This is not a valid layer map because the LSN range of layer 1 intersects with the LSN range of layer 2. 1 and 2 should have
/// the same LSN range.
///
/// The exception is that when layer 2 only contains a single key, it could be split over the LSN range. For example,
///
/// ```plain
/// | | | 2 | | |
/// | 1 | |-------| | 3 |
/// | | | 4 | | |
///
/// If layer 2 and 4 contain the same single key, this is also a valid layer map.
pub fn check_valid_layermap(metadata: &[LayerName]) -> Option<String> {
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
let mut all_delta_layers = Vec::new();
for name in metadata {
if let LayerName::Delta(layer) = name {
if layer.key_range.start.next() != layer.key_range.end {
all_delta_layers.push(layer.clone());
}
}
}
for layer in &all_delta_layers {
let lsn_range = &layer.lsn_range;
lsn_split_point.insert(lsn_range.start);
lsn_split_point.insert(lsn_range.end);
}
for layer in &all_delta_layers {
let lsn_range = layer.lsn_range.clone();
let intersects = lsn_split_point.range(lsn_range).collect_vec();
if intersects.len() > 1 {
let err = format!(
"layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
layer,
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
);
return Some(err);
}
}
None
}

View File

@@ -1,11 +1,29 @@
use std::collections::HashMap;
use utils::id::TimelineId;
use std::{collections::HashMap, time::Duration};
use super::remote_timeline_client::index::GcBlockingReason;
use tokio::time::Instant;
use utils::id::TimelineId;
type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
type TimelinesBlocked = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
#[derive(Default)]
struct Storage {
timelines_blocked: TimelinesBlocked,
/// The deadline before which we are blocked from GC so that
/// leases have a chance to be renewed.
lsn_lease_deadline: Option<Instant>,
}
impl Storage {
fn is_blocked_by_lsn_lease_deadline(&self) -> bool {
self.lsn_lease_deadline
.map(|d| Instant::now() < d)
.unwrap_or(false)
}
}
/// GcBlock provides persistent (per-timeline) gc blocking and facilitates transient time based gc
/// blocking.
#[derive(Default)]
pub(crate) struct GcBlock {
/// The timelines which have current reasons to block gc.
@@ -13,6 +31,12 @@ pub(crate) struct GcBlock {
/// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done
/// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`.
reasons: std::sync::Mutex<Storage>,
/// GC background task or manually run `Tenant::gc_iteration` holds a lock on this.
///
/// Do not add any more features taking and forbidding taking this lock. It should be
/// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
/// synchronizes with gc attempts by locking and unlocking this mutex.
blocking: tokio::sync::Mutex<()>,
}
@@ -42,6 +66,20 @@ impl GcBlock {
}
}
/// Sets a deadline before which we cannot proceed to GC due to lsn lease.
///
/// We do this as the leases mapping are not persisted to disk. By delaying GC by lease
/// length, we guarantee that all the leases we granted before will have a chance to renew
/// when we run GC for the first time after restart / transition from AttachedMulti to AttachedSingle.
pub(super) fn set_lsn_lease_deadline(&self, lsn_lease_length: Duration) {
let deadline = Instant::now() + lsn_lease_length;
let mut g = self.reasons.lock().unwrap();
g.lsn_lease_deadline = Some(deadline);
}
/// Describe the current gc blocking reasons.
///
/// TODO: make this json serializable.
pub(crate) fn summary(&self) -> Option<BlockingReasons> {
let g = self.reasons.lock().unwrap();
@@ -64,7 +102,7 @@ impl GcBlock {
) -> anyhow::Result<bool> {
let (added, uploaded) = {
let mut g = self.reasons.lock().unwrap();
let set = g.entry(timeline.timeline_id).or_default();
let set = g.timelines_blocked.entry(timeline.timeline_id).or_default();
let added = set.insert(reason);
// LOCK ORDER: intentionally hold the lock, see self.reasons.
@@ -95,7 +133,7 @@ impl GcBlock {
let (remaining_blocks, uploaded) = {
let mut g = self.reasons.lock().unwrap();
match g.entry(timeline.timeline_id) {
match g.timelines_blocked.entry(timeline.timeline_id) {
Entry::Occupied(mut oe) => {
let set = oe.get_mut();
set.remove(reason);
@@ -109,7 +147,7 @@ impl GcBlock {
}
}
let remaining_blocks = g.len();
let remaining_blocks = g.timelines_blocked.len();
// LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
let uploaded = timeline
@@ -134,11 +172,11 @@ impl GcBlock {
pub(crate) fn before_delete(&self, timeline: &super::Timeline) {
let unblocked = {
let mut g = self.reasons.lock().unwrap();
if g.is_empty() {
if g.timelines_blocked.is_empty() {
return;
}
g.remove(&timeline.timeline_id);
g.timelines_blocked.remove(&timeline.timeline_id);
BlockingReasons::clean_and_summarize(g).is_none()
};
@@ -149,10 +187,11 @@ impl GcBlock {
}
/// Initialize with the non-deleted timelines of this tenant.
pub(crate) fn set_scanned(&self, scanned: Storage) {
pub(crate) fn set_scanned(&self, scanned: TimelinesBlocked) {
let mut g = self.reasons.lock().unwrap();
assert!(g.is_empty());
g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
assert!(g.timelines_blocked.is_empty());
g.timelines_blocked
.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
tracing::info!(summary=?reasons, "initialized with gc blocked");
@@ -166,6 +205,7 @@ pub(super) struct Guard<'a> {
#[derive(Debug)]
pub(crate) struct BlockingReasons {
tenant_blocked_by_lsn_lease_deadline: bool,
timelines: usize,
reasons: enumset::EnumSet<GcBlockingReason>,
}
@@ -174,8 +214,8 @@ impl std::fmt::Display for BlockingReasons {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} timelines block for {:?}",
self.timelines, self.reasons
"tenant_blocked_by_lsn_lease_deadline: {}, {} timelines block for {:?}",
self.tenant_blocked_by_lsn_lease_deadline, self.timelines, self.reasons
)
}
}
@@ -183,13 +223,15 @@ impl std::fmt::Display for BlockingReasons {
impl BlockingReasons {
fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
let mut reasons = enumset::EnumSet::empty();
g.retain(|_key, value| {
g.timelines_blocked.retain(|_key, value| {
reasons = reasons.union(*value);
!value.is_empty()
});
if !g.is_empty() {
let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline();
if !g.timelines_blocked.is_empty() || blocked_by_lsn_lease_deadline {
Some(BlockingReasons {
timelines: g.len(),
tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline,
timelines: g.timelines_blocked.len(),
reasons,
})
} else {
@@ -198,14 +240,17 @@ impl BlockingReasons {
}
fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
if g.is_empty() {
let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline();
if g.timelines_blocked.is_empty() && !blocked_by_lsn_lease_deadline {
None
} else {
let reasons = g
.timelines_blocked
.values()
.fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
Some(BlockingReasons {
timelines: g.len(),
tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline,
timelines: g.timelines_blocked.len(),
reasons,
})
}

View File

@@ -949,6 +949,12 @@ impl TenantManager {
(LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
match attach_conf.generation.cmp(&tenant.generation) {
Ordering::Equal => {
if attach_conf.attach_mode == AttachmentMode::Single {
tenant
.gc_block
.set_lsn_lease_deadline(tenant.get_lsn_lease_length());
}
// A transition from Attached to Attached in the same generation, we may
// take our fast path and just provide the updated configuration
// to the tenant.

View File

@@ -276,6 +276,16 @@ pub(crate) enum LayerId {
InMemoryLayerId(InMemoryLayerFileId),
}
/// Uniquely identify a layer visit by the layer
/// and LSN floor (or start LSN) of the reads.
/// The layer itself is not enough since we may
/// have different LSN lower bounds for delta layer reads.
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
struct LayerToVisitId {
layer_id: LayerId,
lsn_floor: Lsn,
}
/// Layer wrapper for the read path. Note that it is valid
/// to use these layers even after external operations have
/// been performed on them (compaction, freeze, etc.).
@@ -287,9 +297,9 @@ pub(crate) enum ReadableLayer {
/// A partial description of a read to be done.
#[derive(Debug, Clone)]
struct ReadDesc {
struct LayerVisit {
/// An id used to resolve the readable layer within the fringe
layer_id: LayerId,
layer_to_visit_id: LayerToVisitId,
/// Lsn range for the read, used for selecting the next read
lsn_range: Range<Lsn>,
}
@@ -303,12 +313,12 @@ struct ReadDesc {
/// a two layer indexing scheme.
#[derive(Debug)]
pub(crate) struct LayerFringe {
planned_reads_by_lsn: BinaryHeap<ReadDesc>,
layers: HashMap<LayerId, LayerKeyspace>,
planned_visits_by_lsn: BinaryHeap<LayerVisit>,
visit_reads: HashMap<LayerToVisitId, LayerVisitReads>,
}
#[derive(Debug)]
struct LayerKeyspace {
struct LayerVisitReads {
layer: ReadableLayer,
target_keyspace: KeySpaceRandomAccum,
}
@@ -316,23 +326,23 @@ struct LayerKeyspace {
impl LayerFringe {
pub(crate) fn new() -> Self {
LayerFringe {
planned_reads_by_lsn: BinaryHeap::new(),
layers: HashMap::new(),
planned_visits_by_lsn: BinaryHeap::new(),
visit_reads: HashMap::new(),
}
}
pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
let read_desc = match self.planned_reads_by_lsn.pop() {
let read_desc = match self.planned_visits_by_lsn.pop() {
Some(desc) => desc,
None => return None,
};
let removed = self.layers.remove_entry(&read_desc.layer_id);
let removed = self.visit_reads.remove_entry(&read_desc.layer_to_visit_id);
match removed {
Some((
_,
LayerKeyspace {
LayerVisitReads {
layer,
mut target_keyspace,
},
@@ -351,20 +361,24 @@ impl LayerFringe {
keyspace: KeySpace,
lsn_range: Range<Lsn>,
) {
let layer_id = layer.id();
let entry = self.layers.entry(layer_id.clone());
let layer_to_visit_id = LayerToVisitId {
layer_id: layer.id(),
lsn_floor: lsn_range.start,
};
let entry = self.visit_reads.entry(layer_to_visit_id.clone());
match entry {
Entry::Occupied(mut entry) => {
entry.get_mut().target_keyspace.add_keyspace(keyspace);
}
Entry::Vacant(entry) => {
self.planned_reads_by_lsn.push(ReadDesc {
self.planned_visits_by_lsn.push(LayerVisit {
lsn_range,
layer_id: layer_id.clone(),
layer_to_visit_id: layer_to_visit_id.clone(),
});
let mut accum = KeySpaceRandomAccum::new();
accum.add_keyspace(keyspace);
entry.insert(LayerKeyspace {
entry.insert(LayerVisitReads {
layer,
target_keyspace: accum,
});
@@ -379,7 +393,7 @@ impl Default for LayerFringe {
}
}
impl Ord for ReadDesc {
impl Ord for LayerVisit {
fn cmp(&self, other: &Self) -> Ordering {
let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
if ord == std::cmp::Ordering::Equal {
@@ -390,19 +404,19 @@ impl Ord for ReadDesc {
}
}
impl PartialOrd for ReadDesc {
impl PartialOrd for LayerVisit {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for ReadDesc {
impl PartialEq for LayerVisit {
fn eq(&self, other: &Self) -> bool {
self.lsn_range == other.lsn_range
}
}
impl Eq for ReadDesc {}
impl Eq for LayerVisit {}
impl ReadableLayer {
pub(crate) fn id(&self) -> LayerId {

View File

@@ -346,6 +346,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let mut first = true;
tenant.gc_block.set_lsn_lease_deadline(tenant.get_lsn_lease_length());
loop {
tokio::select! {
_ = cancel.cancelled() => {
@@ -363,7 +364,6 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
first = false;
let delays = async {
delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel).await?;
random_init_delay(period, &cancel).await?;
Ok::<_, Cancelled>(())
};
@@ -538,28 +538,12 @@ pub(crate) async fn random_init_delay(
let mut rng = rand::thread_rng();
rng.gen_range(Duration::ZERO..=period)
};
match tokio::time::timeout(d, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
}
}
/// Delays GC by defaul lease length at restart.
///
/// We do this as the leases mapping are not persisted to disk. By delaying GC by default
/// length, we gurantees that all the leases we granted before the restart will expire
/// when we run GC for the first time after the restart.
pub(crate) async fn delay_by_lease_length(
length: Duration,
cancel: &CancellationToken,
) -> Result<(), Cancelled> {
match tokio::time::timeout(length, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
}
}
struct Iteration {
started_at: Instant,
period: Duration,

View File

@@ -4015,6 +4015,7 @@ impl Timeline {
// partition, so flush it to disk.
let (desc, path) = image_layer_writer.finish(ctx).await?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("created image layer for rel {}", image_layer.local_path());
Ok(ImageLayerCreationOutcome {
image: Some(image_layer),
next_start_key: img_range.end,
@@ -4104,6 +4105,10 @@ impl Timeline {
// partition, so flush it to disk.
let (desc, path) = image_layer_writer.finish(ctx).await?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!(
"created image layer for metadata {}",
image_layer.local_path()
);
Ok(ImageLayerCreationOutcome {
image: Some(image_layer),
next_start_key: img_range.end,
@@ -5373,7 +5378,8 @@ impl Timeline {
/// Force create an image layer and place it into the layer map.
///
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are placed into the layer map in one run.
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are
/// placed into the layer map in one run AND be validated.
#[cfg(test)]
pub(super) async fn force_create_image_layer(
self: &Arc<Timeline>,
@@ -5407,7 +5413,7 @@ impl Timeline {
}
let (desc, path) = image_layer_writer.finish(ctx).await?;
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created image layer {}", image_layer.local_path());
{
let mut guard = self.layers.write().await;
guard.open_mut().unwrap().force_insert_layer(image_layer);
@@ -5419,7 +5425,8 @@ impl Timeline {
/// Force create a delta layer and place it into the layer map.
///
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are placed into the layer map in one run.
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are
/// placed into the layer map in one run AND be validated.
#[cfg(test)]
pub(super) async fn force_create_delta_layer(
self: &Arc<Timeline>,
@@ -5445,33 +5452,6 @@ impl Timeline {
if let Some(check_start_lsn) = check_start_lsn {
assert!(deltas.lsn_range.start >= check_start_lsn);
}
// check if the delta layer does not violate the LSN invariant, the legacy compaction should always produce a batch of
// layers of the same start/end LSN, and so should the force inserted layer
{
/// Checks if a overlaps with b, assume a/b = [start, end).
pub fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
!(a.end <= b.start || b.end <= a.start)
}
if deltas.key_range.start.next() != deltas.key_range.end {
let guard = self.layers.read().await;
let mut invalid_layers =
guard.layer_map()?.iter_historic_layers().filter(|layer| {
layer.is_delta()
&& overlaps_with(&layer.lsn_range, &deltas.lsn_range)
&& layer.lsn_range != deltas.lsn_range
// skip single-key layer files
&& layer.key_range.start.next() != layer.key_range.end
});
if let Some(layer) = invalid_layers.next() {
// If a delta layer overlaps with another delta layer AND their LSN range is not the same, panic
panic!(
"inserted layer violates delta layer LSN invariant: current_lsn_range={}..{}, conflict_lsn_range={}..{}",
deltas.lsn_range.start, deltas.lsn_range.end, layer.lsn_range.start, layer.lsn_range.end
);
}
}
}
let mut delta_layer_writer = DeltaLayerWriter::new(
self.conf,
self.timeline_id,
@@ -5486,7 +5466,7 @@ impl Timeline {
}
let (desc, path) = delta_layer_writer.finish(deltas.key_range.end, ctx).await?;
let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created delta layer {}", delta_layer.local_path());
{
let mut guard = self.layers.write().await;
guard.open_mut().unwrap().force_insert_layer(delta_layer);

View File

@@ -29,6 +29,7 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::split_writer::{
@@ -1788,20 +1789,12 @@ impl Timeline {
stat.visit_image_layer(desc.file_size());
}
}
for layer in &layer_selection {
let desc = layer.layer_desc();
let key_range = &desc.key_range;
if desc.is_delta() && key_range.start.next() != key_range.end {
let lsn_range = desc.lsn_range.clone();
let intersects = lsn_split_point.range(lsn_range).collect_vec();
if intersects.len() > 1 {
bail!(
"cannot run gc-compaction because it violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
desc.key(),
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
);
}
}
let layer_names: Vec<crate::tenant::storage_layer::LayerName> = layer_selection
.iter()
.map(|layer| layer.layer_desc().layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
bail!("cannot run gc-compaction because {}", err);
}
// The maximum LSN we are processing in this compaction loop
let end_lsn = layer_selection

View File

@@ -163,6 +163,7 @@ impl ComputeUserInfo {
}
pub(crate) enum ComputeCredentialKeys {
#[cfg(any(test, feature = "testing"))]
Password(Vec<u8>),
AuthKeys(AuthKeys),
None,
@@ -293,16 +294,10 @@ async fn auth_quirks(
// We now expect to see a very specific payload in the place of password.
let (info, unauthenticated_password) = match user_info.try_into() {
Err(info) => {
let res = hacks::password_hack_no_authentication(ctx, info, client).await?;
ctx.set_endpoint_id(res.info.endpoint.clone());
let password = match res.keys {
ComputeCredentialKeys::Password(p) => p,
ComputeCredentialKeys::AuthKeys(_) | ComputeCredentialKeys::None => {
unreachable!("password hack should return a password")
}
};
(res.info, Some(password))
let (info, password) =
hacks::password_hack_no_authentication(ctx, info, client).await?;
ctx.set_endpoint_id(info.endpoint.clone());
(info, Some(password))
}
Ok(info) => (info, None),
};

View File

@@ -1,6 +1,4 @@
use super::{
ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint,
};
use super::{ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint};
use crate::{
auth::{self, AuthFlow},
config::AuthenticationConfig,
@@ -63,7 +61,7 @@ pub(crate) async fn password_hack_no_authentication(
ctx: &RequestMonitoring,
info: ComputeUserInfoNoEndpoint,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
) -> auth::Result<ComputeCredentials> {
) -> auth::Result<(ComputeUserInfo, Vec<u8>)> {
warn!("project not specified, resorting to the password hack auth flow");
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
@@ -79,12 +77,12 @@ pub(crate) async fn password_hack_no_authentication(
info!(project = &*payload.endpoint, "received missing parameter");
// Report tentative success; compute node will check the password anyway.
Ok(ComputeCredentials {
info: ComputeUserInfo {
Ok((
ComputeUserInfo {
user: info.user,
options: info.options,
endpoint: payload.endpoint,
},
keys: ComputeCredentialKeys::Password(payload.password),
})
payload.password,
))
}

View File

@@ -25,6 +25,8 @@ const MAX_JWK_BODY_SIZE: usize = 64 * 1024;
pub(crate) trait FetchAuthRules: Clone + Send + Sync + 'static {
fn fetch_auth_rules(
&self,
ctx: &RequestMonitoring,
endpoint: EndpointId,
role_name: RoleName,
) -> impl Future<Output = anyhow::Result<Vec<AuthRule>>> + Send;
}
@@ -101,7 +103,9 @@ impl JwkCacheEntryLock {
async fn renew_jwks<F: FetchAuthRules>(
&self,
_permit: JwkRenewalPermit<'_>,
ctx: &RequestMonitoring,
client: &reqwest::Client,
endpoint: EndpointId,
role_name: RoleName,
auth_rules: &F,
) -> anyhow::Result<Arc<JwkCacheEntry>> {
@@ -115,7 +119,9 @@ impl JwkCacheEntryLock {
}
}
let rules = auth_rules.fetch_auth_rules(role_name).await?;
let rules = auth_rules
.fetch_auth_rules(ctx, endpoint, role_name)
.await?;
let mut key_sets =
ahash::HashMap::with_capacity_and_hasher(rules.len(), ahash::RandomState::new());
// TODO(conrad): run concurrently
@@ -166,6 +172,7 @@ impl JwkCacheEntryLock {
self: &Arc<Self>,
ctx: &RequestMonitoring,
client: &reqwest::Client,
endpoint: EndpointId,
role_name: RoleName,
fetch: &F,
) -> Result<Arc<JwkCacheEntry>, anyhow::Error> {
@@ -176,7 +183,9 @@ impl JwkCacheEntryLock {
let Some(cached) = guard else {
let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let permit = self.acquire_permit().await;
return self.renew_jwks(permit, client, role_name, fetch).await;
return self
.renew_jwks(permit, ctx, client, endpoint, role_name, fetch)
.await;
};
let last_update = now.duration_since(cached.last_retrieved);
@@ -187,7 +196,9 @@ impl JwkCacheEntryLock {
let permit = self.acquire_permit().await;
// it's been too long since we checked the keys. wait for them to update.
return self.renew_jwks(permit, client, role_name, fetch).await;
return self
.renew_jwks(permit, ctx, client, endpoint, role_name, fetch)
.await;
}
// every 5 minutes we should spawn a job to eagerly update the token.
@@ -198,8 +209,12 @@ impl JwkCacheEntryLock {
let entry = self.clone();
let client = client.clone();
let fetch = fetch.clone();
let ctx = ctx.clone();
tokio::spawn(async move {
if let Err(e) = entry.renew_jwks(permit, &client, role_name, &fetch).await {
if let Err(e) = entry
.renew_jwks(permit, &ctx, &client, endpoint, role_name, &fetch)
.await
{
tracing::warn!(error=?e, "could not fetch JWKs in background job");
}
});
@@ -216,6 +231,7 @@ impl JwkCacheEntryLock {
ctx: &RequestMonitoring,
jwt: &str,
client: &reqwest::Client,
endpoint: EndpointId,
role_name: RoleName,
fetch: &F,
) -> Result<(), anyhow::Error> {
@@ -242,7 +258,7 @@ impl JwkCacheEntryLock {
let kid = header.key_id.context("missing key id")?;
let mut guard = self
.get_or_update_jwk_cache(ctx, client, role_name.clone(), fetch)
.get_or_update_jwk_cache(ctx, client, endpoint.clone(), role_name.clone(), fetch)
.await?;
// get the key from the JWKs if possible. If not, wait for the keys to update.
@@ -254,7 +270,14 @@ impl JwkCacheEntryLock {
let permit = self.acquire_permit().await;
guard = self
.renew_jwks(permit, client, role_name.clone(), fetch)
.renew_jwks(
permit,
ctx,
client,
endpoint.clone(),
role_name.clone(),
fetch,
)
.await?;
}
_ => {
@@ -318,7 +341,7 @@ impl JwkCache {
jwt: &str,
) -> Result<(), anyhow::Error> {
// try with just a read lock first
let key = (endpoint, role_name.clone());
let key = (endpoint.clone(), role_name.clone());
let entry = self.map.get(&key).as_deref().map(Arc::clone);
let entry = entry.unwrap_or_else(|| {
// acquire a write lock after to insert.
@@ -327,7 +350,7 @@ impl JwkCache {
});
entry
.check_jwt(ctx, jwt, &self.client, role_name, fetch)
.check_jwt(ctx, jwt, &self.client, endpoint, role_name, fetch)
.await
}
}
@@ -688,6 +711,8 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
impl FetchAuthRules for Fetch {
async fn fetch_auth_rules(
&self,
_ctx: &RequestMonitoring,
_endpoint: EndpointId,
_role_name: RoleName,
) -> anyhow::Result<Vec<AuthRule>> {
Ok(vec![
@@ -706,6 +731,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
}
let role_name = RoleName::from("user");
let endpoint = EndpointId::from("ep");
let jwk_cache = Arc::new(JwkCacheEntryLock::default());
@@ -715,6 +741,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
&RequestMonitoring::test(),
&token,
&client,
endpoint.clone(),
role_name.clone(),
&Fetch(addr),
)

View File

@@ -9,8 +9,9 @@ use crate::{
messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo},
NodeInfo,
},
context::RequestMonitoring,
intern::{BranchIdInt, BranchIdTag, EndpointIdTag, InternId, ProjectIdInt, ProjectIdTag},
RoleName,
EndpointId, RoleName,
};
use super::jwt::{AuthRule, FetchAuthRules, JwkCache};
@@ -57,7 +58,12 @@ pub struct JwksRoleSettings {
}
impl FetchAuthRules for StaticAuthRules {
async fn fetch_auth_rules(&self, role_name: RoleName) -> anyhow::Result<Vec<AuthRule>> {
async fn fetch_auth_rules(
&self,
_ctx: &RequestMonitoring,
_endpoint: EndpointId,
role_name: RoleName,
) -> anyhow::Result<Vec<AuthRule>> {
let mappings = JWKS_ROLE_MAP.load();
let role_mappings = mappings
.as_deref()

View File

@@ -303,6 +303,7 @@ impl NodeInfo {
pub(crate) fn set_keys(&mut self, keys: &ComputeCredentialKeys) {
match keys {
#[cfg(any(test, feature = "testing"))]
ComputeCredentialKeys::Password(password) => self.config.password(password),
ComputeCredentialKeys::AuthKeys(auth_keys) => self.config.auth_keys(*auth_keys),
ComputeCredentialKeys::None => &mut self.config,

View File

@@ -79,6 +79,40 @@ pub(crate) enum AuthMethod {
Cleartext,
}
impl Clone for RequestMonitoring {
fn clone(&self) -> Self {
let inner = self.0.try_lock().expect("should not deadlock");
let new = RequestMonitoringInner {
peer_addr: inner.peer_addr,
session_id: inner.session_id,
protocol: inner.protocol,
first_packet: inner.first_packet,
region: inner.region,
span: info_span!("background_task"),
project: inner.project,
branch: inner.branch,
endpoint_id: inner.endpoint_id.clone(),
dbname: inner.dbname.clone(),
user: inner.user.clone(),
application: inner.application.clone(),
error_kind: inner.error_kind,
auth_method: inner.auth_method.clone(),
success: inner.success,
rejected: inner.rejected,
cold_start_info: inner.cold_start_info,
pg_options: inner.pg_options.clone(),
sender: None,
disconnect_sender: None,
latency_timer: LatencyTimer::noop(inner.protocol),
disconnect_timestamp: inner.disconnect_timestamp,
};
Self(TryLock::new(new))
}
}
impl RequestMonitoring {
pub fn new(
session_id: Uuid,

View File

@@ -397,6 +397,8 @@ pub struct LatencyTimer {
protocol: Protocol,
cold_start_info: ColdStartInfo,
outcome: ConnectOutcome,
skip_reporting: bool,
}
impl LatencyTimer {
@@ -409,6 +411,20 @@ impl LatencyTimer {
cold_start_info: ColdStartInfo::Unknown,
// assume failed unless otherwise specified
outcome: ConnectOutcome::Failed,
skip_reporting: false,
}
}
pub(crate) fn noop(protocol: Protocol) -> Self {
Self {
start: time::Instant::now(),
stop: None,
accumulated: Accumulated::default(),
protocol,
cold_start_info: ColdStartInfo::Unknown,
// assume failed unless otherwise specified
outcome: ConnectOutcome::Failed,
skip_reporting: true,
}
}
@@ -443,6 +459,10 @@ pub enum ConnectOutcome {
impl Drop for LatencyTimer {
fn drop(&mut self) {
if self.skip_reporting {
return;
}
let duration = self
.stop
.unwrap_or_else(time::Instant::now)

View File

@@ -27,7 +27,7 @@ use crate::{
Host,
};
use super::conn_pool::{poll_client, AuthData, Client, ConnInfo, GlobalConnPool};
use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool};
pub(crate) struct PoolingBackend {
pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
@@ -274,13 +274,6 @@ impl ConnectMechanism for TokioMechanism {
.dbname(&self.conn_info.dbname)
.connect_timeout(timeout);
match &self.conn_info.auth {
AuthData::Jwt(_) => {}
AuthData::Password(pw) => {
config.password(pw);
}
}
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let res = config.connect(tokio_postgres::NoTls).await;
drop(pause);

View File

@@ -29,11 +29,16 @@ use tracing::{info, info_span, Instrument};
use super::backend::HttpConnError;
#[derive(Debug, Clone)]
pub(crate) struct ConnInfoWithAuth {
pub(crate) conn_info: ConnInfo,
pub(crate) auth: AuthData,
}
#[derive(Debug, Clone)]
pub(crate) struct ConnInfo {
pub(crate) user_info: ComputeUserInfo,
pub(crate) dbname: DbName,
pub(crate) auth: AuthData,
}
#[derive(Debug, Clone)]
@@ -787,7 +792,6 @@ mod tests {
options: NeonOptions::default(),
},
dbname: "dbname".into(),
auth: AuthData::Password("password".as_bytes().into()),
};
let ep_pool = Arc::downgrade(
&pool.get_or_create_endpoint_pool(&conn_info.endpoint_cache_key().unwrap()),
@@ -845,7 +849,6 @@ mod tests {
options: NeonOptions::default(),
},
dbname: "dbname".into(),
auth: AuthData::Password("password".as_bytes().into()),
};
let ep_pool = Arc::downgrade(
&pool.get_or_create_endpoint_pool(&conn_info.endpoint_cache_key().unwrap()),

View File

@@ -60,6 +60,7 @@ use super::backend::PoolingBackend;
use super::conn_pool::AuthData;
use super::conn_pool::Client;
use super::conn_pool::ConnInfo;
use super::conn_pool::ConnInfoWithAuth;
use super::http_util::json_response;
use super::json::json_to_pg_text;
use super::json::pg_text_row_to_json;
@@ -148,7 +149,7 @@ fn get_conn_info(
ctx: &RequestMonitoring,
headers: &HeaderMap,
tls: Option<&TlsConfig>,
) -> Result<ConnInfo, ConnInfoError> {
) -> Result<ConnInfoWithAuth, ConnInfoError> {
// HTTP only uses cleartext (for now and likely always)
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
@@ -235,11 +236,8 @@ fn get_conn_info(
options: options.unwrap_or_default(),
};
Ok(ConnInfo {
user_info,
dbname,
auth,
})
let conn_info = ConnInfo { user_info, dbname };
Ok(ConnInfoWithAuth { conn_info, auth })
}
// TODO: return different http error codes
@@ -523,7 +521,10 @@ async fn handle_inner(
// TLS config should be there.
let conn_info = get_conn_info(ctx, headers, config.tls_config.as_ref())?;
info!(user = conn_info.user_info.user.as_str(), "credentials");
info!(
user = conn_info.conn_info.user_info.user.as_str(),
"credentials"
);
// Allow connection pooling only if explicitly requested
// or if we have decided that http pool is no longer opt-in
@@ -568,20 +569,20 @@ async fn handle_inner(
.authenticate_with_password(
ctx,
&config.authentication_config,
&conn_info.user_info,
&conn_info.conn_info.user_info,
pw,
)
.await?
}
AuthData::Jwt(jwt) => {
backend
.authenticate_with_jwt(ctx, &conn_info.user_info, jwt)
.authenticate_with_jwt(ctx, &conn_info.conn_info.user_info, jwt)
.await?
}
};
let client = backend
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
.connect_to_compute(ctx, conn_info.conn_info, keys, !allow_pool)
.await?;
// not strictly necessary to mark success here,
// but it's just insurance for if we forget it somewhere else

View File

@@ -17,6 +17,7 @@ use postgres_ffi::MAX_SEND_SIZE;
use serde::Deserialize;
use serde::Serialize;
use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName};
use sha2::{Digest, Sha256};
use utils::id::NodeId;
use utils::id::TenantTimelineId;
@@ -51,6 +52,9 @@ pub struct Args {
/// Dump full term history. True by default.
pub dump_term_history: bool,
/// Dump last modified time of WAL segments. Uses value of `dump_all` by default.
pub dump_wal_last_modified: bool,
/// Filter timelines by tenant_id.
pub tenant_id: Option<TenantId>,
@@ -128,12 +132,19 @@ async fn build_from_tli_dump(
None
};
let wal_last_modified = if args.dump_wal_last_modified {
get_wal_last_modified(timeline_dir).ok().flatten()
} else {
None
};
Timeline {
tenant_id: timeline.ttid.tenant_id,
timeline_id: timeline.ttid.timeline_id,
control_file,
memory,
disk_content,
wal_last_modified,
}
}
@@ -156,6 +167,7 @@ pub struct Timeline {
pub control_file: Option<TimelinePersistentState>,
pub memory: Option<Memory>,
pub disk_content: Option<DiskContent>,
pub wal_last_modified: Option<DateTime<Utc>>,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -302,6 +314,27 @@ fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
})
}
/// Get highest modified time of WAL segments in the directory.
fn get_wal_last_modified(path: &Utf8Path) -> Result<Option<DateTime<Utc>>> {
let mut res = None;
for entry in fs::read_dir(path)? {
if entry.is_err() {
continue;
}
let entry = entry?;
/* Ignore files that are not XLOG segments */
let fname = entry.file_name();
if !IsXLogFileName(&fname) && !IsPartialXLogFileName(&fname) {
continue;
}
let metadata = entry.metadata()?;
let modified: DateTime<Utc> = DateTime::from(metadata.modified()?);
res = std::cmp::max(res, Some(modified));
}
Ok(res)
}
/// Converts SafeKeeperConf to Config, filtering out the fields that are not
/// supposed to be exposed.
fn build_config(config: SafeKeeperConf) -> Config {

View File

@@ -1,7 +1,11 @@
openapi: "3.0.2"
info:
title: Safekeeper control API
description: Neon Safekeeper API
version: "1.0"
license:
name: "Apache"
url: https://github.com/neondatabase/neon/blob/main/LICENSE
servers:
@@ -386,6 +390,12 @@ components:
msg:
type: string
NotFoundError:
type: object
properties:
msg:
type: string
responses:
#

View File

@@ -481,6 +481,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
let mut dump_memory: Option<bool> = None;
let mut dump_disk_content: Option<bool> = None;
let mut dump_term_history: Option<bool> = None;
let mut dump_wal_last_modified: Option<bool> = None;
let mut tenant_id: Option<TenantId> = None;
let mut timeline_id: Option<TimelineId> = None;
@@ -494,6 +495,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
"dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
"dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
"dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
"dump_wal_last_modified" => dump_wal_last_modified = Some(parse_kv_str(&k, &v)?),
"tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
"timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
_ => Err(ApiError::BadRequest(anyhow::anyhow!(
@@ -508,6 +510,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
let dump_memory = dump_memory.unwrap_or(dump_all);
let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
let dump_term_history = dump_term_history.unwrap_or(true);
let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all);
let args = debug_dump::Args {
dump_all,
@@ -515,6 +518,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
dump_memory,
dump_disk_content,
dump_term_history,
dump_wal_last_modified,
tenant_id,
timeline_id,
};

View File

@@ -539,20 +539,17 @@ async fn remove_segments_from_disk(
while let Some(entry) = entries.next_entry().await? {
let entry_path = entry.path();
let fname = entry_path.file_name().unwrap();
if let Some(fname_str) = fname.to_str() {
/* Ignore files that are not XLOG segments */
if !IsXLogFileName(fname_str) && !IsPartialXLogFileName(fname_str) {
continue;
}
let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
if remove_predicate(segno) {
remove_file(entry_path).await?;
n_removed += 1;
min_removed = min(min_removed, segno);
max_removed = max(max_removed, segno);
REMOVED_WAL_SEGMENTS.inc();
}
/* Ignore files that are not XLOG segments */
if !IsXLogFileName(fname) && !IsPartialXLogFileName(fname) {
continue;
}
let (segno, _) = XLogFromFileName(fname, wal_seg_size)?;
if remove_predicate(segno) {
remove_file(entry_path).await?;
n_removed += 1;
min_removed = min(min_removed, segno);
max_removed = max(max_removed, segno);
REMOVED_WAL_SEGMENTS.inc();
}
}

View File

@@ -1,7 +1,8 @@
use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use anyhow::Context;
use itertools::Itertools;
use pageserver::tenant::checks::check_valid_layermap;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver_api::shard::ShardIndex;
@@ -48,56 +49,6 @@ impl TimelineAnalysis {
}
}
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
///
/// ```plain
/// | | | |
/// | 1 | | 2 | | 3 |
/// | | | | | |
/// ```
///
/// This is not a valid layer map because the LSN range of layer 1 intersects with the LSN range of layer 2. 1 and 2 should have
/// the same LSN range.
///
/// The exception is that when layer 2 only contains a single key, it could be split over the LSN range. For example,
///
/// ```plain
/// | | | 2 | | |
/// | 1 | |-------| | 3 |
/// | | | 4 | | |
///
/// If layer 2 and 4 contain the same single key, this is also a valid layer map.
fn check_valid_layermap(metadata: &HashMap<LayerName, LayerFileMetadata>) -> Option<String> {
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
let mut all_delta_layers = Vec::new();
for (name, _) in metadata.iter() {
if let LayerName::Delta(layer) = name {
if layer.key_range.start.next() != layer.key_range.end {
all_delta_layers.push(layer.clone());
}
}
}
for layer in &all_delta_layers {
let lsn_range = &layer.lsn_range;
lsn_split_point.insert(lsn_range.start);
lsn_split_point.insert(lsn_range.end);
}
for layer in &all_delta_layers {
let lsn_range = layer.lsn_range.clone();
let intersects = lsn_split_point.range(lsn_range).collect_vec();
if intersects.len() > 1 {
let err = format!(
"layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
layer,
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
);
return Some(err);
}
}
None
}
pub(crate) async fn branch_cleanup_and_check_errors(
remote_client: &GenericRemoteStorage,
id: &TenantShardTimelineId,
@@ -177,7 +128,8 @@ pub(crate) async fn branch_cleanup_and_check_errors(
}
}
if let Some(err) = check_valid_layermap(&index_part.layer_metadata) {
let layer_names = index_part.layer_metadata.keys().cloned().collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
result.errors.push(format!(
"index_part.json contains invalid layer map structure: {err}"
));

View File

@@ -3863,9 +3863,6 @@ def static_proxy(
dbname = vanilla_pg.default_options["dbname"]
auth_endpoint = f"postgres://proxy:password@{host}:{port}/{dbname}"
# require password for 'http_auth' user
vanilla_pg.edit_hba([f"host {dbname} http_auth {host} password"])
# For simplicity, we use the same user for both `--auth-endpoint` and `safe_psql`
vanilla_pg.start()
vanilla_pg.safe_psql("create user proxy with login superuser password 'password'")

View File

@@ -142,6 +142,7 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
"image_creation_threshold": "1",
# set PITR interval to be small, so we can do GC
"pitr_interval": "0 s",
"lsn_lease_length": "0s",
}
)

View File

@@ -11,7 +11,9 @@ from fixtures.utils import print_gc_result, query_scalar
#
def test_branch_behind(neon_env_builder: NeonEnvBuilder):
# Disable pitr, because here we want to test branch creation after GC
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
env = neon_env_builder.init_start(
initial_tenant_conf={"pitr_interval": "0 sec", "lsn_lease_length": "0s"}
)
error_regexes = [
".*invalid branch start lsn.*",

View File

@@ -419,7 +419,7 @@ def test_duplicate_creation(neon_env_builder: NeonEnvBuilder):
def test_branching_while_stuck_find_gc_cutoffs(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
client = env.pageserver.http_client()

View File

@@ -240,6 +240,7 @@ def test_uploads_and_deletions(
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"compaction_algorithm": json.dumps({"kind": compaction_algorithm.value}),
"lsn_lease_length": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)

View File

@@ -222,7 +222,7 @@ def pgbench_accounts_initialized(ep):
# Without hs feedback enabled we'd see 'User query might have needed to see row
# versions that must be removed.' errors.
def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
agressive_vacuum_conf = [
"log_autovacuum_min_duration = 0",
"autovacuum_naptime = 10s",

View File

@@ -173,6 +173,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
# "image_creation_threshold": set at runtime
"compaction_target_size": f"{128 * (1024**2)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
"image_layer_creation_check_threshold": "0", # always check if a new image layer can be created
"lsn_lease_length": "0s",
}
def tenant_update_config(changes):

View File

@@ -53,6 +53,7 @@ TENANT_CONF = {
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"lsn_lease_length": "0s",
}

View File

@@ -244,6 +244,7 @@ def test_remote_storage_upload_queue_retries(
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
"image_layer_creation_check_threshold": "0",
"lsn_lease_length": "0s",
}
)
@@ -391,6 +392,7 @@ def test_remote_timeline_client_calls_started_metric(
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
"lsn_lease_length": "0s",
}
)

View File

@@ -200,6 +200,7 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
# Disable automatic creation of image layers, as we will create them explicitly when we want them
"image_creation_threshold": 9999,
"image_layer_creation_check_threshold": 0,
"lsn_lease_length": "0s",
}
neon_env_builder.storage_controller_config = {

View File

@@ -485,7 +485,7 @@ def test_storage_controller_compute_hook(
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
# Start running
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
# Initial notification from tenant creation
assert len(notifications) == 1

View File

@@ -204,6 +204,7 @@ def test_scrubber_physical_gc_ancestors(
# No PITR, so that as soon as child shards generate an image layer, it covers ancestor deltas
# and makes them GC'able
"pitr_interval": "0s",
"lsn_lease_length": "0s",
},
)

View File

@@ -266,13 +266,13 @@ def test_tenant_reattach_while_busy(
def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
pageserver_http = env.pageserver.http_client()
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)
# create new nenant
tenant_id, timeline_id = env.neon_cli.create_tenant()
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
# assert tenant exists on disk
assert env.pageserver.tenant_dir(tenant_id).exists()

View File

@@ -45,7 +45,10 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool
tenant_after = http.tenant_status(env.initial_tenant)
assert tenant_before != tenant_after
gc_blocking = tenant_after["gc_blocking"]
assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }"
assert (
gc_blocking
== "BlockingReasons { tenant_blocked_by_lsn_lease_deadline: false, timelines: 1, reasons: EnumSet(Manual) }"
)
wait_for_another_gc_round()
pss.assert_log_contains(gc_skipped_line)

View File

@@ -892,6 +892,7 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
log.info(f"debug_dump before reboot {debug_dump_0}")
assert debug_dump_0["timelines_count"] == 1
assert debug_dump_0["timelines"][0]["timeline_id"] == str(timeline_id)
assert debug_dump_0["timelines"][0]["wal_last_modified"] != ""
endpoint.safe_psql("create table t(i int)")

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17rc1",
"9156d63ce253bed9d1f76355ceec610e444eaffa"
"5bbb9bd93dd805e90bd8af15d00080363d18ec68"
],
"v16": [
"16.4",
"0baa7346dfd42d61912eeca554c9bb0a190f0a1e"
"3ec6e2496f64c6fec35c67cb82efd6490a6a4738"
],
"v15": [
"15.8",
"6f6d77fb5960602fcd3fd130aca9f99ecb1619c9"
"72b904c0b3ac43bd74d1e8e6d772e2c476ae25b1"
],
"v14": [
"14.13",
"a317b9b5b96978b49e78986697f3dd80d06f99a7"
"87cb68f899db434cd6f1908cf0ac8fdeafdd88c1"
]
}