mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-20 20:02:56 +00:00
Compare commits
14 Commits
fix_path_f
...
readme_add
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f79048f0ec | ||
|
|
3476727923 | ||
|
|
3104f0f250 | ||
|
|
f2c08195f0 | ||
|
|
d0cbfda15c | ||
|
|
1708743e78 | ||
|
|
0a1ca7670c | ||
|
|
ff9f065c43 | ||
|
|
21eeafaaa5 | ||
|
|
32a0e759bd | ||
|
|
7c489092b7 | ||
|
|
06d55a3b12 | ||
|
|
5c68e6a172 | ||
|
|
2753abc0d8 |
10
README.md
10
README.md
@@ -52,6 +52,11 @@ Building Neon requires 3.15+ version of `protoc` (protobuf-compiler). If your di
|
||||
```
|
||||
# recommended approach from https://www.rust-lang.org/tools/install
|
||||
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
|
||||
|
||||
# bash-only: add $HOME/.cargo/bin to PATH. The script does this for zsh
|
||||
# echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> ~/.bashrc
|
||||
|
||||
rustup show
|
||||
```
|
||||
|
||||
#### Installing dependencies on macOS (12.3.1)
|
||||
@@ -74,6 +79,11 @@ brew link --force m4
|
||||
```
|
||||
# recommended approach from https://www.rust-lang.org/tools/install
|
||||
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
|
||||
|
||||
# bash-only: add $HOME/.cargo/bin to PATH. The script does this for zsh
|
||||
echo 'export PATH="$HOME/.cargo/bin:$PATH"' >> ~/.bashrc
|
||||
|
||||
rustup show
|
||||
```
|
||||
|
||||
3. Install PostgreSQL Client
|
||||
|
||||
@@ -173,40 +173,6 @@ impl Default for EvictionOrder {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Eq,
|
||||
PartialEq,
|
||||
Debug,
|
||||
Copy,
|
||||
Clone,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum GetVectoredImpl {
|
||||
Sequential,
|
||||
Vectored,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Eq,
|
||||
PartialEq,
|
||||
Debug,
|
||||
Copy,
|
||||
Clone,
|
||||
strum_macros::EnumString,
|
||||
strum_macros::Display,
|
||||
serde_with::DeserializeFromStr,
|
||||
serde_with::SerializeDisplay,
|
||||
)]
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum GetImpl {
|
||||
Legacy,
|
||||
Vectored,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(transparent)]
|
||||
pub struct MaxVectoredReadBytes(pub NonZeroUsize);
|
||||
@@ -338,8 +304,6 @@ pub mod defaults {
|
||||
pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm =
|
||||
ImageCompressionAlgorithm::Zstd { level: Some(1) };
|
||||
|
||||
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = false;
|
||||
|
||||
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
|
||||
|
||||
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
|
||||
@@ -376,7 +340,10 @@ impl Default for ConfigToml {
|
||||
|
||||
concurrent_tenant_warmup: (NonZeroUsize::new(DEFAULT_CONCURRENT_TENANT_WARMUP)
|
||||
.expect("Invalid default constant")),
|
||||
concurrent_tenant_size_logical_size_queries: NonZeroUsize::new(1).unwrap(),
|
||||
concurrent_tenant_size_logical_size_queries: NonZeroUsize::new(
|
||||
DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES,
|
||||
)
|
||||
.unwrap(),
|
||||
metric_collection_interval: (humantime::parse_duration(
|
||||
DEFAULT_METRIC_COLLECTION_INTERVAL,
|
||||
)
|
||||
@@ -467,8 +434,6 @@ pub mod tenant_conf_defaults {
|
||||
// By default ingest enough WAL for two new L0 layers before checking if new image
|
||||
// image layers should be created.
|
||||
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
}
|
||||
|
||||
impl Default for TenantConfigToml {
|
||||
|
||||
@@ -26,6 +26,7 @@ use bytes::{Buf, Bytes};
|
||||
use log::*;
|
||||
|
||||
use serde::Serialize;
|
||||
use std::ffi::OsStr;
|
||||
use std::fs::File;
|
||||
use std::io::prelude::*;
|
||||
use std::io::ErrorKind;
|
||||
@@ -78,19 +79,34 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize
|
||||
)
|
||||
}
|
||||
|
||||
pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
|
||||
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
|
||||
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
|
||||
let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
|
||||
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
|
||||
pub fn XLogFromFileName(
|
||||
fname: &OsStr,
|
||||
wal_seg_size: usize,
|
||||
) -> anyhow::Result<(XLogSegNo, TimeLineID)> {
|
||||
if let Some(fname_str) = fname.to_str() {
|
||||
let tli = u32::from_str_radix(&fname_str[0..8], 16)?;
|
||||
let log = u32::from_str_radix(&fname_str[8..16], 16)? as XLogSegNo;
|
||||
let seg = u32::from_str_radix(&fname_str[16..24], 16)? as XLogSegNo;
|
||||
Ok((log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli))
|
||||
} else {
|
||||
anyhow::bail!("non-ut8 filename: {:?}", fname);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn IsXLogFileName(fname: &str) -> bool {
|
||||
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
|
||||
pub fn IsXLogFileName(fname: &OsStr) -> bool {
|
||||
if let Some(fname) = fname.to_str() {
|
||||
fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit())
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn IsPartialXLogFileName(fname: &str) -> bool {
|
||||
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
|
||||
pub fn IsPartialXLogFileName(fname: &OsStr) -> bool {
|
||||
if let Some(fname) = fname.to_str() {
|
||||
fname.ends_with(".partial") && IsXLogFileName(OsStr::new(&fname[0..fname.len() - 8]))
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// If LSN points to the beginning of the page, then shift it to first record,
|
||||
|
||||
@@ -7,6 +7,7 @@ use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
|
||||
use postgres_ffi::{
|
||||
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
|
||||
};
|
||||
use std::ffi::OsStr;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::Command;
|
||||
use std::time::{Duration, Instant};
|
||||
@@ -135,8 +136,8 @@ impl Conf {
|
||||
|
||||
pub fn pg_waldump(
|
||||
&self,
|
||||
first_segment_name: &str,
|
||||
last_segment_name: &str,
|
||||
first_segment_name: &OsStr,
|
||||
last_segment_name: &OsStr,
|
||||
) -> anyhow::Result<std::process::Output> {
|
||||
let first_segment_file = self.datadir.join(first_segment_name);
|
||||
let last_segment_file = self.datadir.join(last_segment_name);
|
||||
|
||||
@@ -4,6 +4,7 @@ use super::*;
|
||||
use crate::{error, info};
|
||||
use regex::Regex;
|
||||
use std::cmp::min;
|
||||
use std::ffi::OsStr;
|
||||
use std::fs::{self, File};
|
||||
use std::io::Write;
|
||||
use std::{env, str::FromStr};
|
||||
@@ -54,7 +55,7 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
|
||||
.wal_dir()
|
||||
.read_dir()
|
||||
.unwrap()
|
||||
.map(|f| f.unwrap().file_name().into_string().unwrap())
|
||||
.map(|f| f.unwrap().file_name())
|
||||
.filter(|fname| IsXLogFileName(fname))
|
||||
.max()
|
||||
.unwrap();
|
||||
@@ -70,11 +71,11 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
|
||||
start_lsn
|
||||
);
|
||||
for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
|
||||
let fname = file.file_name().into_string().unwrap();
|
||||
let fname = file.file_name();
|
||||
if !IsXLogFileName(&fname) {
|
||||
continue;
|
||||
}
|
||||
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE);
|
||||
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE).unwrap();
|
||||
let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
|
||||
if seg_start_lsn > u64::from(*start_lsn) {
|
||||
continue;
|
||||
@@ -93,10 +94,10 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
|
||||
}
|
||||
}
|
||||
|
||||
fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &str) -> Lsn {
|
||||
fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &OsStr) -> Lsn {
|
||||
// Get the actual end of WAL by pg_waldump
|
||||
let waldump_output = cfg
|
||||
.pg_waldump("000000010000000000000001", last_segment)
|
||||
.pg_waldump(OsStr::new("000000010000000000000001"), last_segment)
|
||||
.unwrap()
|
||||
.stderr;
|
||||
let waldump_output = std::str::from_utf8(&waldump_output).unwrap();
|
||||
@@ -117,7 +118,7 @@ fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &str) -> Lsn {
|
||||
|
||||
fn check_end_of_wal(
|
||||
cfg: &crate::Conf,
|
||||
last_segment: &str,
|
||||
last_segment: &OsStr,
|
||||
start_lsn: Lsn,
|
||||
expected_end_of_wal: Lsn,
|
||||
) {
|
||||
@@ -132,7 +133,8 @@ fn check_end_of_wal(
|
||||
// Rename file to partial to actually find last valid lsn, then rename it back.
|
||||
fs::rename(
|
||||
cfg.wal_dir().join(last_segment),
|
||||
cfg.wal_dir().join(format!("{}.partial", last_segment)),
|
||||
cfg.wal_dir()
|
||||
.join(format!("{}.partial", last_segment.to_str().unwrap())),
|
||||
)
|
||||
.unwrap();
|
||||
let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
|
||||
@@ -142,7 +144,8 @@ fn check_end_of_wal(
|
||||
);
|
||||
assert_eq!(wal_end, expected_end_of_wal);
|
||||
fs::rename(
|
||||
cfg.wal_dir().join(format!("{}.partial", last_segment)),
|
||||
cfg.wal_dir()
|
||||
.join(format!("{}.partial", last_segment.to_str().unwrap())),
|
||||
cfg.wal_dir().join(last_segment),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@@ -479,11 +479,6 @@ pub struct ConfigurableSemaphore {
|
||||
}
|
||||
|
||||
impl ConfigurableSemaphore {
|
||||
pub const DEFAULT_INITIAL: NonZeroUsize = match NonZeroUsize::new(1) {
|
||||
Some(x) => x,
|
||||
None => panic!("const unwrap is not yet stable"),
|
||||
};
|
||||
|
||||
/// Initializse using a non-zero amount of permits.
|
||||
///
|
||||
/// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a
|
||||
@@ -504,12 +499,6 @@ impl ConfigurableSemaphore {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ConfigurableSemaphore {
|
||||
fn default() -> Self {
|
||||
Self::new(Self::DEFAULT_INITIAL)
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for ConfigurableSemaphore {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
// the number of permits can be increased at runtime, so we cannot really fulfill the
|
||||
|
||||
@@ -140,6 +140,7 @@ pub mod metadata;
|
||||
pub mod remote_timeline_client;
|
||||
pub mod storage_layer;
|
||||
|
||||
pub mod checks;
|
||||
pub mod config;
|
||||
pub mod mgr;
|
||||
pub mod secondary;
|
||||
@@ -1573,6 +1574,9 @@ impl Tenant {
|
||||
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
|
||||
end_lsn: Lsn,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
use checks::check_valid_layermap;
|
||||
use itertools::Itertools;
|
||||
|
||||
let tline = self
|
||||
.create_test_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
|
||||
.await?;
|
||||
@@ -1587,6 +1591,18 @@ impl Tenant {
|
||||
.force_create_image_layer(lsn, images, Some(initdb_lsn), ctx)
|
||||
.await?;
|
||||
}
|
||||
let layer_names = tline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.layer_map()
|
||||
.unwrap()
|
||||
.iter_historic_layers()
|
||||
.map(|layer| layer.layer_name())
|
||||
.collect_vec();
|
||||
if let Some(err) = check_valid_layermap(&layer_names) {
|
||||
bail!("invalid layermap: {err}");
|
||||
}
|
||||
Ok(tline)
|
||||
}
|
||||
|
||||
@@ -3197,6 +3213,9 @@ impl Tenant {
|
||||
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
|
||||
end_lsn: Lsn,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
use checks::check_valid_layermap;
|
||||
use itertools::Itertools;
|
||||
|
||||
let tline = self
|
||||
.branch_timeline_test(src_timeline, dst_id, ancestor_lsn, ctx)
|
||||
.await?;
|
||||
@@ -3217,6 +3236,18 @@ impl Tenant {
|
||||
.force_create_image_layer(lsn, images, Some(ancestor_lsn), ctx)
|
||||
.await?;
|
||||
}
|
||||
let layer_names = tline
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.layer_map()
|
||||
.unwrap()
|
||||
.iter_historic_layers()
|
||||
.map(|layer| layer.layer_name())
|
||||
.collect_vec();
|
||||
if let Some(err) = check_valid_layermap(&layer_names) {
|
||||
bail!("invalid layermap: {err}");
|
||||
}
|
||||
Ok(tline)
|
||||
}
|
||||
|
||||
@@ -4164,9 +4195,18 @@ pub(crate) mod harness {
|
||||
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
|
||||
if records_neon {
|
||||
// For Neon wal records, we can decode without spawning postgres, so do so.
|
||||
let base_img = base_img.expect("Neon WAL redo requires base image").1;
|
||||
let mut page = BytesMut::new();
|
||||
page.extend_from_slice(&base_img);
|
||||
let mut page = match (base_img, records.first()) {
|
||||
(Some((_lsn, img)), _) => {
|
||||
let mut page = BytesMut::new();
|
||||
page.extend_from_slice(&img);
|
||||
page
|
||||
}
|
||||
(_, Some((_lsn, rec))) if rec.will_init() => BytesMut::new(),
|
||||
_ => {
|
||||
panic!("Neon WAL redo requires base image or will init record");
|
||||
}
|
||||
};
|
||||
|
||||
for (record_lsn, record) in records {
|
||||
apply_neon::apply_in_neon(&record, record_lsn, key, &mut page)?;
|
||||
}
|
||||
@@ -8470,4 +8510,135 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Regression test for https://github.com/neondatabase/neon/issues/9012
|
||||
// Create an image arrangement where we have to read at different LSN ranges
|
||||
// from a delta layer. This is achieved by overlapping an image layer on top of
|
||||
// a delta layer. Like so:
|
||||
//
|
||||
// A B
|
||||
// +----------------+ -> delta_layer
|
||||
// | | ^ lsn
|
||||
// | =========|-> nested_image_layer |
|
||||
// | C | |
|
||||
// +----------------+ |
|
||||
// ======== -> baseline_image_layer +-------> key
|
||||
//
|
||||
//
|
||||
// When querying the key range [A, B) we need to read at different LSN ranges
|
||||
// for [A, C) and [C, B). This test checks that the described edge case is handled correctly.
|
||||
#[tokio::test]
|
||||
async fn test_vectored_read_with_nested_image_layer() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_vectored_read_with_nested_image_layer").await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let will_init_keys = [2, 6];
|
||||
fn get_key(id: u32) -> Key {
|
||||
let mut key = Key::from_hex("110000000033333333444444445500000000").unwrap();
|
||||
key.field6 = id;
|
||||
key
|
||||
}
|
||||
|
||||
let mut expected_key_values = HashMap::new();
|
||||
|
||||
let baseline_image_layer_lsn = Lsn(0x10);
|
||||
let mut baseline_img_layer = Vec::new();
|
||||
for i in 0..5 {
|
||||
let key = get_key(i);
|
||||
let value = format!("value {i}@{baseline_image_layer_lsn}");
|
||||
|
||||
let removed = expected_key_values.insert(key, value.clone());
|
||||
assert!(removed.is_none());
|
||||
|
||||
baseline_img_layer.push((key, Bytes::from(value)));
|
||||
}
|
||||
|
||||
let nested_image_layer_lsn = Lsn(0x50);
|
||||
let mut nested_img_layer = Vec::new();
|
||||
for i in 5..10 {
|
||||
let key = get_key(i);
|
||||
let value = format!("value {i}@{nested_image_layer_lsn}");
|
||||
|
||||
let removed = expected_key_values.insert(key, value.clone());
|
||||
assert!(removed.is_none());
|
||||
|
||||
nested_img_layer.push((key, Bytes::from(value)));
|
||||
}
|
||||
|
||||
let mut delta_layer_spec = Vec::default();
|
||||
let delta_layer_start_lsn = Lsn(0x20);
|
||||
let mut delta_layer_end_lsn = delta_layer_start_lsn;
|
||||
|
||||
for i in 0..10 {
|
||||
let key = get_key(i);
|
||||
let key_in_nested = nested_img_layer
|
||||
.iter()
|
||||
.any(|(key_with_img, _)| *key_with_img == key);
|
||||
let lsn = {
|
||||
if key_in_nested {
|
||||
Lsn(nested_image_layer_lsn.0 + 0x10)
|
||||
} else {
|
||||
delta_layer_start_lsn
|
||||
}
|
||||
};
|
||||
|
||||
let will_init = will_init_keys.contains(&i);
|
||||
if will_init {
|
||||
delta_layer_spec.push((key, lsn, Value::WalRecord(NeonWalRecord::wal_init())));
|
||||
|
||||
expected_key_values.insert(key, "".to_string());
|
||||
} else {
|
||||
let delta = format!("@{lsn}");
|
||||
delta_layer_spec.push((
|
||||
key,
|
||||
lsn,
|
||||
Value::WalRecord(NeonWalRecord::wal_append(&delta)),
|
||||
));
|
||||
|
||||
expected_key_values
|
||||
.get_mut(&key)
|
||||
.expect("An image exists for each key")
|
||||
.push_str(delta.as_str());
|
||||
}
|
||||
delta_layer_end_lsn = std::cmp::max(delta_layer_start_lsn, lsn);
|
||||
}
|
||||
|
||||
delta_layer_end_lsn = Lsn(delta_layer_end_lsn.0 + 1);
|
||||
|
||||
assert!(
|
||||
nested_image_layer_lsn > delta_layer_start_lsn
|
||||
&& nested_image_layer_lsn < delta_layer_end_lsn
|
||||
);
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
baseline_image_layer_lsn,
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![DeltaLayerTestDesc::new_with_inferred_key_range(
|
||||
delta_layer_start_lsn..delta_layer_end_lsn,
|
||||
delta_layer_spec,
|
||||
)], // delta layers
|
||||
vec![
|
||||
(baseline_image_layer_lsn, baseline_img_layer),
|
||||
(nested_image_layer_lsn, nested_img_layer),
|
||||
], // image layers
|
||||
delta_layer_end_lsn,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let keyspace = KeySpace::single(get_key(0)..get_key(10));
|
||||
let results = tline
|
||||
.get_vectored(keyspace, delta_layer_end_lsn, &ctx)
|
||||
.await
|
||||
.expect("No vectored errors");
|
||||
for (key, res) in results {
|
||||
let value = res.expect("No key errors");
|
||||
let expected_value = expected_key_values.remove(&key).expect("No unknown keys");
|
||||
assert_eq!(value, Bytes::from(expected_value));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
55
pageserver/src/tenant/checks.rs
Normal file
55
pageserver/src/tenant/checks.rs
Normal file
@@ -0,0 +1,55 @@
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use itertools::Itertools;
|
||||
|
||||
use super::storage_layer::LayerName;
|
||||
|
||||
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
|
||||
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
|
||||
///
|
||||
/// ```plain
|
||||
/// | | | |
|
||||
/// | 1 | | 2 | | 3 |
|
||||
/// | | | | | |
|
||||
/// ```
|
||||
///
|
||||
/// This is not a valid layer map because the LSN range of layer 1 intersects with the LSN range of layer 2. 1 and 2 should have
|
||||
/// the same LSN range.
|
||||
///
|
||||
/// The exception is that when layer 2 only contains a single key, it could be split over the LSN range. For example,
|
||||
///
|
||||
/// ```plain
|
||||
/// | | | 2 | | |
|
||||
/// | 1 | |-------| | 3 |
|
||||
/// | | | 4 | | |
|
||||
///
|
||||
/// If layer 2 and 4 contain the same single key, this is also a valid layer map.
|
||||
pub fn check_valid_layermap(metadata: &[LayerName]) -> Option<String> {
|
||||
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
|
||||
let mut all_delta_layers = Vec::new();
|
||||
for name in metadata {
|
||||
if let LayerName::Delta(layer) = name {
|
||||
if layer.key_range.start.next() != layer.key_range.end {
|
||||
all_delta_layers.push(layer.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
for layer in &all_delta_layers {
|
||||
let lsn_range = &layer.lsn_range;
|
||||
lsn_split_point.insert(lsn_range.start);
|
||||
lsn_split_point.insert(lsn_range.end);
|
||||
}
|
||||
for layer in &all_delta_layers {
|
||||
let lsn_range = layer.lsn_range.clone();
|
||||
let intersects = lsn_split_point.range(lsn_range).collect_vec();
|
||||
if intersects.len() > 1 {
|
||||
let err = format!(
|
||||
"layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
|
||||
layer,
|
||||
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
|
||||
);
|
||||
return Some(err);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
@@ -1,11 +1,29 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use utils::id::TimelineId;
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
|
||||
use super::remote_timeline_client::index::GcBlockingReason;
|
||||
use tokio::time::Instant;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
|
||||
type TimelinesBlocked = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
|
||||
|
||||
#[derive(Default)]
|
||||
struct Storage {
|
||||
timelines_blocked: TimelinesBlocked,
|
||||
/// The deadline before which we are blocked from GC so that
|
||||
/// leases have a chance to be renewed.
|
||||
lsn_lease_deadline: Option<Instant>,
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
fn is_blocked_by_lsn_lease_deadline(&self) -> bool {
|
||||
self.lsn_lease_deadline
|
||||
.map(|d| Instant::now() < d)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
}
|
||||
|
||||
/// GcBlock provides persistent (per-timeline) gc blocking and facilitates transient time based gc
|
||||
/// blocking.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct GcBlock {
|
||||
/// The timelines which have current reasons to block gc.
|
||||
@@ -13,6 +31,12 @@ pub(crate) struct GcBlock {
|
||||
/// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done
|
||||
/// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`.
|
||||
reasons: std::sync::Mutex<Storage>,
|
||||
|
||||
/// GC background task or manually run `Tenant::gc_iteration` holds a lock on this.
|
||||
///
|
||||
/// Do not add any more features taking and forbidding taking this lock. It should be
|
||||
/// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
|
||||
/// synchronizes with gc attempts by locking and unlocking this mutex.
|
||||
blocking: tokio::sync::Mutex<()>,
|
||||
}
|
||||
|
||||
@@ -42,6 +66,20 @@ impl GcBlock {
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets a deadline before which we cannot proceed to GC due to lsn lease.
|
||||
///
|
||||
/// We do this as the leases mapping are not persisted to disk. By delaying GC by lease
|
||||
/// length, we guarantee that all the leases we granted before will have a chance to renew
|
||||
/// when we run GC for the first time after restart / transition from AttachedMulti to AttachedSingle.
|
||||
pub(super) fn set_lsn_lease_deadline(&self, lsn_lease_length: Duration) {
|
||||
let deadline = Instant::now() + lsn_lease_length;
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
g.lsn_lease_deadline = Some(deadline);
|
||||
}
|
||||
|
||||
/// Describe the current gc blocking reasons.
|
||||
///
|
||||
/// TODO: make this json serializable.
|
||||
pub(crate) fn summary(&self) -> Option<BlockingReasons> {
|
||||
let g = self.reasons.lock().unwrap();
|
||||
|
||||
@@ -64,7 +102,7 @@ impl GcBlock {
|
||||
) -> anyhow::Result<bool> {
|
||||
let (added, uploaded) = {
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
let set = g.entry(timeline.timeline_id).or_default();
|
||||
let set = g.timelines_blocked.entry(timeline.timeline_id).or_default();
|
||||
let added = set.insert(reason);
|
||||
|
||||
// LOCK ORDER: intentionally hold the lock, see self.reasons.
|
||||
@@ -95,7 +133,7 @@ impl GcBlock {
|
||||
|
||||
let (remaining_blocks, uploaded) = {
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
match g.entry(timeline.timeline_id) {
|
||||
match g.timelines_blocked.entry(timeline.timeline_id) {
|
||||
Entry::Occupied(mut oe) => {
|
||||
let set = oe.get_mut();
|
||||
set.remove(reason);
|
||||
@@ -109,7 +147,7 @@ impl GcBlock {
|
||||
}
|
||||
}
|
||||
|
||||
let remaining_blocks = g.len();
|
||||
let remaining_blocks = g.timelines_blocked.len();
|
||||
|
||||
// LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
|
||||
let uploaded = timeline
|
||||
@@ -134,11 +172,11 @@ impl GcBlock {
|
||||
pub(crate) fn before_delete(&self, timeline: &super::Timeline) {
|
||||
let unblocked = {
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
if g.is_empty() {
|
||||
if g.timelines_blocked.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
g.remove(&timeline.timeline_id);
|
||||
g.timelines_blocked.remove(&timeline.timeline_id);
|
||||
|
||||
BlockingReasons::clean_and_summarize(g).is_none()
|
||||
};
|
||||
@@ -149,10 +187,11 @@ impl GcBlock {
|
||||
}
|
||||
|
||||
/// Initialize with the non-deleted timelines of this tenant.
|
||||
pub(crate) fn set_scanned(&self, scanned: Storage) {
|
||||
pub(crate) fn set_scanned(&self, scanned: TimelinesBlocked) {
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
assert!(g.is_empty());
|
||||
g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
|
||||
assert!(g.timelines_blocked.is_empty());
|
||||
g.timelines_blocked
|
||||
.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
|
||||
|
||||
if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
|
||||
tracing::info!(summary=?reasons, "initialized with gc blocked");
|
||||
@@ -166,6 +205,7 @@ pub(super) struct Guard<'a> {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct BlockingReasons {
|
||||
tenant_blocked_by_lsn_lease_deadline: bool,
|
||||
timelines: usize,
|
||||
reasons: enumset::EnumSet<GcBlockingReason>,
|
||||
}
|
||||
@@ -174,8 +214,8 @@ impl std::fmt::Display for BlockingReasons {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{} timelines block for {:?}",
|
||||
self.timelines, self.reasons
|
||||
"tenant_blocked_by_lsn_lease_deadline: {}, {} timelines block for {:?}",
|
||||
self.tenant_blocked_by_lsn_lease_deadline, self.timelines, self.reasons
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -183,13 +223,15 @@ impl std::fmt::Display for BlockingReasons {
|
||||
impl BlockingReasons {
|
||||
fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
|
||||
let mut reasons = enumset::EnumSet::empty();
|
||||
g.retain(|_key, value| {
|
||||
g.timelines_blocked.retain(|_key, value| {
|
||||
reasons = reasons.union(*value);
|
||||
!value.is_empty()
|
||||
});
|
||||
if !g.is_empty() {
|
||||
let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline();
|
||||
if !g.timelines_blocked.is_empty() || blocked_by_lsn_lease_deadline {
|
||||
Some(BlockingReasons {
|
||||
timelines: g.len(),
|
||||
tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline,
|
||||
timelines: g.timelines_blocked.len(),
|
||||
reasons,
|
||||
})
|
||||
} else {
|
||||
@@ -198,14 +240,17 @@ impl BlockingReasons {
|
||||
}
|
||||
|
||||
fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
|
||||
if g.is_empty() {
|
||||
let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline();
|
||||
if g.timelines_blocked.is_empty() && !blocked_by_lsn_lease_deadline {
|
||||
None
|
||||
} else {
|
||||
let reasons = g
|
||||
.timelines_blocked
|
||||
.values()
|
||||
.fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
|
||||
Some(BlockingReasons {
|
||||
timelines: g.len(),
|
||||
tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline,
|
||||
timelines: g.timelines_blocked.len(),
|
||||
reasons,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -949,6 +949,12 @@ impl TenantManager {
|
||||
(LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
|
||||
match attach_conf.generation.cmp(&tenant.generation) {
|
||||
Ordering::Equal => {
|
||||
if attach_conf.attach_mode == AttachmentMode::Single {
|
||||
tenant
|
||||
.gc_block
|
||||
.set_lsn_lease_deadline(tenant.get_lsn_lease_length());
|
||||
}
|
||||
|
||||
// A transition from Attached to Attached in the same generation, we may
|
||||
// take our fast path and just provide the updated configuration
|
||||
// to the tenant.
|
||||
|
||||
@@ -276,6 +276,16 @@ pub(crate) enum LayerId {
|
||||
InMemoryLayerId(InMemoryLayerFileId),
|
||||
}
|
||||
|
||||
/// Uniquely identify a layer visit by the layer
|
||||
/// and LSN floor (or start LSN) of the reads.
|
||||
/// The layer itself is not enough since we may
|
||||
/// have different LSN lower bounds for delta layer reads.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
|
||||
struct LayerToVisitId {
|
||||
layer_id: LayerId,
|
||||
lsn_floor: Lsn,
|
||||
}
|
||||
|
||||
/// Layer wrapper for the read path. Note that it is valid
|
||||
/// to use these layers even after external operations have
|
||||
/// been performed on them (compaction, freeze, etc.).
|
||||
@@ -287,9 +297,9 @@ pub(crate) enum ReadableLayer {
|
||||
|
||||
/// A partial description of a read to be done.
|
||||
#[derive(Debug, Clone)]
|
||||
struct ReadDesc {
|
||||
struct LayerVisit {
|
||||
/// An id used to resolve the readable layer within the fringe
|
||||
layer_id: LayerId,
|
||||
layer_to_visit_id: LayerToVisitId,
|
||||
/// Lsn range for the read, used for selecting the next read
|
||||
lsn_range: Range<Lsn>,
|
||||
}
|
||||
@@ -303,12 +313,12 @@ struct ReadDesc {
|
||||
/// a two layer indexing scheme.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct LayerFringe {
|
||||
planned_reads_by_lsn: BinaryHeap<ReadDesc>,
|
||||
layers: HashMap<LayerId, LayerKeyspace>,
|
||||
planned_visits_by_lsn: BinaryHeap<LayerVisit>,
|
||||
visit_reads: HashMap<LayerToVisitId, LayerVisitReads>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct LayerKeyspace {
|
||||
struct LayerVisitReads {
|
||||
layer: ReadableLayer,
|
||||
target_keyspace: KeySpaceRandomAccum,
|
||||
}
|
||||
@@ -316,23 +326,23 @@ struct LayerKeyspace {
|
||||
impl LayerFringe {
|
||||
pub(crate) fn new() -> Self {
|
||||
LayerFringe {
|
||||
planned_reads_by_lsn: BinaryHeap::new(),
|
||||
layers: HashMap::new(),
|
||||
planned_visits_by_lsn: BinaryHeap::new(),
|
||||
visit_reads: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
|
||||
let read_desc = match self.planned_reads_by_lsn.pop() {
|
||||
let read_desc = match self.planned_visits_by_lsn.pop() {
|
||||
Some(desc) => desc,
|
||||
None => return None,
|
||||
};
|
||||
|
||||
let removed = self.layers.remove_entry(&read_desc.layer_id);
|
||||
let removed = self.visit_reads.remove_entry(&read_desc.layer_to_visit_id);
|
||||
|
||||
match removed {
|
||||
Some((
|
||||
_,
|
||||
LayerKeyspace {
|
||||
LayerVisitReads {
|
||||
layer,
|
||||
mut target_keyspace,
|
||||
},
|
||||
@@ -351,20 +361,24 @@ impl LayerFringe {
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
) {
|
||||
let layer_id = layer.id();
|
||||
let entry = self.layers.entry(layer_id.clone());
|
||||
let layer_to_visit_id = LayerToVisitId {
|
||||
layer_id: layer.id(),
|
||||
lsn_floor: lsn_range.start,
|
||||
};
|
||||
|
||||
let entry = self.visit_reads.entry(layer_to_visit_id.clone());
|
||||
match entry {
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().target_keyspace.add_keyspace(keyspace);
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
self.planned_reads_by_lsn.push(ReadDesc {
|
||||
self.planned_visits_by_lsn.push(LayerVisit {
|
||||
lsn_range,
|
||||
layer_id: layer_id.clone(),
|
||||
layer_to_visit_id: layer_to_visit_id.clone(),
|
||||
});
|
||||
let mut accum = KeySpaceRandomAccum::new();
|
||||
accum.add_keyspace(keyspace);
|
||||
entry.insert(LayerKeyspace {
|
||||
entry.insert(LayerVisitReads {
|
||||
layer,
|
||||
target_keyspace: accum,
|
||||
});
|
||||
@@ -379,7 +393,7 @@ impl Default for LayerFringe {
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for ReadDesc {
|
||||
impl Ord for LayerVisit {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
|
||||
if ord == std::cmp::Ordering::Equal {
|
||||
@@ -390,19 +404,19 @@ impl Ord for ReadDesc {
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for ReadDesc {
|
||||
impl PartialOrd for LayerVisit {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for ReadDesc {
|
||||
impl PartialEq for LayerVisit {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.lsn_range == other.lsn_range
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for ReadDesc {}
|
||||
impl Eq for LayerVisit {}
|
||||
|
||||
impl ReadableLayer {
|
||||
pub(crate) fn id(&self) -> LayerId {
|
||||
|
||||
@@ -346,6 +346,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||
|
||||
let mut first = true;
|
||||
tenant.gc_block.set_lsn_lease_deadline(tenant.get_lsn_lease_length());
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
@@ -363,7 +364,6 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
first = false;
|
||||
|
||||
let delays = async {
|
||||
delay_by_lease_length(tenant.get_lsn_lease_length(), &cancel).await?;
|
||||
random_init_delay(period, &cancel).await?;
|
||||
Ok::<_, Cancelled>(())
|
||||
};
|
||||
@@ -538,28 +538,12 @@ pub(crate) async fn random_init_delay(
|
||||
let mut rng = rand::thread_rng();
|
||||
rng.gen_range(Duration::ZERO..=period)
|
||||
};
|
||||
|
||||
match tokio::time::timeout(d, cancel.cancelled()).await {
|
||||
Ok(_) => Err(Cancelled),
|
||||
Err(_) => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Delays GC by defaul lease length at restart.
|
||||
///
|
||||
/// We do this as the leases mapping are not persisted to disk. By delaying GC by default
|
||||
/// length, we gurantees that all the leases we granted before the restart will expire
|
||||
/// when we run GC for the first time after the restart.
|
||||
pub(crate) async fn delay_by_lease_length(
|
||||
length: Duration,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), Cancelled> {
|
||||
match tokio::time::timeout(length, cancel.cancelled()).await {
|
||||
Ok(_) => Err(Cancelled),
|
||||
Err(_) => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
struct Iteration {
|
||||
started_at: Instant,
|
||||
period: Duration,
|
||||
|
||||
@@ -4015,6 +4015,7 @@ impl Timeline {
|
||||
// partition, so flush it to disk.
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await?;
|
||||
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
info!("created image layer for rel {}", image_layer.local_path());
|
||||
Ok(ImageLayerCreationOutcome {
|
||||
image: Some(image_layer),
|
||||
next_start_key: img_range.end,
|
||||
@@ -4104,6 +4105,10 @@ impl Timeline {
|
||||
// partition, so flush it to disk.
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await?;
|
||||
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
info!(
|
||||
"created image layer for metadata {}",
|
||||
image_layer.local_path()
|
||||
);
|
||||
Ok(ImageLayerCreationOutcome {
|
||||
image: Some(image_layer),
|
||||
next_start_key: img_range.end,
|
||||
@@ -5373,7 +5378,8 @@ impl Timeline {
|
||||
/// Force create an image layer and place it into the layer map.
|
||||
///
|
||||
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]
|
||||
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are placed into the layer map in one run.
|
||||
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are
|
||||
/// placed into the layer map in one run AND be validated.
|
||||
#[cfg(test)]
|
||||
pub(super) async fn force_create_image_layer(
|
||||
self: &Arc<Timeline>,
|
||||
@@ -5407,7 +5413,7 @@ impl Timeline {
|
||||
}
|
||||
let (desc, path) = image_layer_writer.finish(ctx).await?;
|
||||
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
|
||||
info!("force created image layer {}", image_layer.local_path());
|
||||
{
|
||||
let mut guard = self.layers.write().await;
|
||||
guard.open_mut().unwrap().force_insert_layer(image_layer);
|
||||
@@ -5419,7 +5425,8 @@ impl Timeline {
|
||||
/// Force create a delta layer and place it into the layer map.
|
||||
///
|
||||
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]
|
||||
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are placed into the layer map in one run.
|
||||
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are
|
||||
/// placed into the layer map in one run AND be validated.
|
||||
#[cfg(test)]
|
||||
pub(super) async fn force_create_delta_layer(
|
||||
self: &Arc<Timeline>,
|
||||
@@ -5445,33 +5452,6 @@ impl Timeline {
|
||||
if let Some(check_start_lsn) = check_start_lsn {
|
||||
assert!(deltas.lsn_range.start >= check_start_lsn);
|
||||
}
|
||||
// check if the delta layer does not violate the LSN invariant, the legacy compaction should always produce a batch of
|
||||
// layers of the same start/end LSN, and so should the force inserted layer
|
||||
{
|
||||
/// Checks if a overlaps with b, assume a/b = [start, end).
|
||||
pub fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
|
||||
!(a.end <= b.start || b.end <= a.start)
|
||||
}
|
||||
|
||||
if deltas.key_range.start.next() != deltas.key_range.end {
|
||||
let guard = self.layers.read().await;
|
||||
let mut invalid_layers =
|
||||
guard.layer_map()?.iter_historic_layers().filter(|layer| {
|
||||
layer.is_delta()
|
||||
&& overlaps_with(&layer.lsn_range, &deltas.lsn_range)
|
||||
&& layer.lsn_range != deltas.lsn_range
|
||||
// skip single-key layer files
|
||||
&& layer.key_range.start.next() != layer.key_range.end
|
||||
});
|
||||
if let Some(layer) = invalid_layers.next() {
|
||||
// If a delta layer overlaps with another delta layer AND their LSN range is not the same, panic
|
||||
panic!(
|
||||
"inserted layer violates delta layer LSN invariant: current_lsn_range={}..{}, conflict_lsn_range={}..{}",
|
||||
deltas.lsn_range.start, deltas.lsn_range.end, layer.lsn_range.start, layer.lsn_range.end
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut delta_layer_writer = DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
@@ -5486,7 +5466,7 @@ impl Timeline {
|
||||
}
|
||||
let (desc, path) = delta_layer_writer.finish(deltas.key_range.end, ctx).await?;
|
||||
let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
|
||||
|
||||
info!("force created delta layer {}", delta_layer.local_path());
|
||||
{
|
||||
let mut guard = self.layers.write().await;
|
||||
guard.open_mut().unwrap().force_insert_layer(delta_layer);
|
||||
|
||||
@@ -29,6 +29,7 @@ use utils::id::TimelineId;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache;
|
||||
use crate::tenant::checks::check_valid_layermap;
|
||||
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
||||
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
|
||||
use crate::tenant::storage_layer::split_writer::{
|
||||
@@ -1788,20 +1789,12 @@ impl Timeline {
|
||||
stat.visit_image_layer(desc.file_size());
|
||||
}
|
||||
}
|
||||
for layer in &layer_selection {
|
||||
let desc = layer.layer_desc();
|
||||
let key_range = &desc.key_range;
|
||||
if desc.is_delta() && key_range.start.next() != key_range.end {
|
||||
let lsn_range = desc.lsn_range.clone();
|
||||
let intersects = lsn_split_point.range(lsn_range).collect_vec();
|
||||
if intersects.len() > 1 {
|
||||
bail!(
|
||||
"cannot run gc-compaction because it violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
|
||||
desc.key(),
|
||||
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
|
||||
);
|
||||
}
|
||||
}
|
||||
let layer_names: Vec<crate::tenant::storage_layer::LayerName> = layer_selection
|
||||
.iter()
|
||||
.map(|layer| layer.layer_desc().layer_name())
|
||||
.collect_vec();
|
||||
if let Some(err) = check_valid_layermap(&layer_names) {
|
||||
bail!("cannot run gc-compaction because {}", err);
|
||||
}
|
||||
// The maximum LSN we are processing in this compaction loop
|
||||
let end_lsn = layer_selection
|
||||
|
||||
@@ -163,6 +163,7 @@ impl ComputeUserInfo {
|
||||
}
|
||||
|
||||
pub(crate) enum ComputeCredentialKeys {
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
Password(Vec<u8>),
|
||||
AuthKeys(AuthKeys),
|
||||
None,
|
||||
@@ -293,16 +294,10 @@ async fn auth_quirks(
|
||||
// We now expect to see a very specific payload in the place of password.
|
||||
let (info, unauthenticated_password) = match user_info.try_into() {
|
||||
Err(info) => {
|
||||
let res = hacks::password_hack_no_authentication(ctx, info, client).await?;
|
||||
|
||||
ctx.set_endpoint_id(res.info.endpoint.clone());
|
||||
let password = match res.keys {
|
||||
ComputeCredentialKeys::Password(p) => p,
|
||||
ComputeCredentialKeys::AuthKeys(_) | ComputeCredentialKeys::None => {
|
||||
unreachable!("password hack should return a password")
|
||||
}
|
||||
};
|
||||
(res.info, Some(password))
|
||||
let (info, password) =
|
||||
hacks::password_hack_no_authentication(ctx, info, client).await?;
|
||||
ctx.set_endpoint_id(info.endpoint.clone());
|
||||
(info, Some(password))
|
||||
}
|
||||
Ok(info) => (info, None),
|
||||
};
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
use super::{
|
||||
ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint,
|
||||
};
|
||||
use super::{ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint};
|
||||
use crate::{
|
||||
auth::{self, AuthFlow},
|
||||
config::AuthenticationConfig,
|
||||
@@ -63,7 +61,7 @@ pub(crate) async fn password_hack_no_authentication(
|
||||
ctx: &RequestMonitoring,
|
||||
info: ComputeUserInfoNoEndpoint,
|
||||
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
|
||||
) -> auth::Result<ComputeCredentials> {
|
||||
) -> auth::Result<(ComputeUserInfo, Vec<u8>)> {
|
||||
warn!("project not specified, resorting to the password hack auth flow");
|
||||
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
|
||||
|
||||
@@ -79,12 +77,12 @@ pub(crate) async fn password_hack_no_authentication(
|
||||
info!(project = &*payload.endpoint, "received missing parameter");
|
||||
|
||||
// Report tentative success; compute node will check the password anyway.
|
||||
Ok(ComputeCredentials {
|
||||
info: ComputeUserInfo {
|
||||
Ok((
|
||||
ComputeUserInfo {
|
||||
user: info.user,
|
||||
options: info.options,
|
||||
endpoint: payload.endpoint,
|
||||
},
|
||||
keys: ComputeCredentialKeys::Password(payload.password),
|
||||
})
|
||||
payload.password,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -25,6 +25,8 @@ const MAX_JWK_BODY_SIZE: usize = 64 * 1024;
|
||||
pub(crate) trait FetchAuthRules: Clone + Send + Sync + 'static {
|
||||
fn fetch_auth_rules(
|
||||
&self,
|
||||
ctx: &RequestMonitoring,
|
||||
endpoint: EndpointId,
|
||||
role_name: RoleName,
|
||||
) -> impl Future<Output = anyhow::Result<Vec<AuthRule>>> + Send;
|
||||
}
|
||||
@@ -101,7 +103,9 @@ impl JwkCacheEntryLock {
|
||||
async fn renew_jwks<F: FetchAuthRules>(
|
||||
&self,
|
||||
_permit: JwkRenewalPermit<'_>,
|
||||
ctx: &RequestMonitoring,
|
||||
client: &reqwest::Client,
|
||||
endpoint: EndpointId,
|
||||
role_name: RoleName,
|
||||
auth_rules: &F,
|
||||
) -> anyhow::Result<Arc<JwkCacheEntry>> {
|
||||
@@ -115,7 +119,9 @@ impl JwkCacheEntryLock {
|
||||
}
|
||||
}
|
||||
|
||||
let rules = auth_rules.fetch_auth_rules(role_name).await?;
|
||||
let rules = auth_rules
|
||||
.fetch_auth_rules(ctx, endpoint, role_name)
|
||||
.await?;
|
||||
let mut key_sets =
|
||||
ahash::HashMap::with_capacity_and_hasher(rules.len(), ahash::RandomState::new());
|
||||
// TODO(conrad): run concurrently
|
||||
@@ -166,6 +172,7 @@ impl JwkCacheEntryLock {
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestMonitoring,
|
||||
client: &reqwest::Client,
|
||||
endpoint: EndpointId,
|
||||
role_name: RoleName,
|
||||
fetch: &F,
|
||||
) -> Result<Arc<JwkCacheEntry>, anyhow::Error> {
|
||||
@@ -176,7 +183,9 @@ impl JwkCacheEntryLock {
|
||||
let Some(cached) = guard else {
|
||||
let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
|
||||
let permit = self.acquire_permit().await;
|
||||
return self.renew_jwks(permit, client, role_name, fetch).await;
|
||||
return self
|
||||
.renew_jwks(permit, ctx, client, endpoint, role_name, fetch)
|
||||
.await;
|
||||
};
|
||||
|
||||
let last_update = now.duration_since(cached.last_retrieved);
|
||||
@@ -187,7 +196,9 @@ impl JwkCacheEntryLock {
|
||||
let permit = self.acquire_permit().await;
|
||||
|
||||
// it's been too long since we checked the keys. wait for them to update.
|
||||
return self.renew_jwks(permit, client, role_name, fetch).await;
|
||||
return self
|
||||
.renew_jwks(permit, ctx, client, endpoint, role_name, fetch)
|
||||
.await;
|
||||
}
|
||||
|
||||
// every 5 minutes we should spawn a job to eagerly update the token.
|
||||
@@ -198,8 +209,12 @@ impl JwkCacheEntryLock {
|
||||
let entry = self.clone();
|
||||
let client = client.clone();
|
||||
let fetch = fetch.clone();
|
||||
let ctx = ctx.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = entry.renew_jwks(permit, &client, role_name, &fetch).await {
|
||||
if let Err(e) = entry
|
||||
.renew_jwks(permit, &ctx, &client, endpoint, role_name, &fetch)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(error=?e, "could not fetch JWKs in background job");
|
||||
}
|
||||
});
|
||||
@@ -216,6 +231,7 @@ impl JwkCacheEntryLock {
|
||||
ctx: &RequestMonitoring,
|
||||
jwt: &str,
|
||||
client: &reqwest::Client,
|
||||
endpoint: EndpointId,
|
||||
role_name: RoleName,
|
||||
fetch: &F,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
@@ -242,7 +258,7 @@ impl JwkCacheEntryLock {
|
||||
let kid = header.key_id.context("missing key id")?;
|
||||
|
||||
let mut guard = self
|
||||
.get_or_update_jwk_cache(ctx, client, role_name.clone(), fetch)
|
||||
.get_or_update_jwk_cache(ctx, client, endpoint.clone(), role_name.clone(), fetch)
|
||||
.await?;
|
||||
|
||||
// get the key from the JWKs if possible. If not, wait for the keys to update.
|
||||
@@ -254,7 +270,14 @@ impl JwkCacheEntryLock {
|
||||
|
||||
let permit = self.acquire_permit().await;
|
||||
guard = self
|
||||
.renew_jwks(permit, client, role_name.clone(), fetch)
|
||||
.renew_jwks(
|
||||
permit,
|
||||
ctx,
|
||||
client,
|
||||
endpoint.clone(),
|
||||
role_name.clone(),
|
||||
fetch,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
_ => {
|
||||
@@ -318,7 +341,7 @@ impl JwkCache {
|
||||
jwt: &str,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
// try with just a read lock first
|
||||
let key = (endpoint, role_name.clone());
|
||||
let key = (endpoint.clone(), role_name.clone());
|
||||
let entry = self.map.get(&key).as_deref().map(Arc::clone);
|
||||
let entry = entry.unwrap_or_else(|| {
|
||||
// acquire a write lock after to insert.
|
||||
@@ -327,7 +350,7 @@ impl JwkCache {
|
||||
});
|
||||
|
||||
entry
|
||||
.check_jwt(ctx, jwt, &self.client, role_name, fetch)
|
||||
.check_jwt(ctx, jwt, &self.client, endpoint, role_name, fetch)
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -688,6 +711,8 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
|
||||
impl FetchAuthRules for Fetch {
|
||||
async fn fetch_auth_rules(
|
||||
&self,
|
||||
_ctx: &RequestMonitoring,
|
||||
_endpoint: EndpointId,
|
||||
_role_name: RoleName,
|
||||
) -> anyhow::Result<Vec<AuthRule>> {
|
||||
Ok(vec![
|
||||
@@ -706,6 +731,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
|
||||
}
|
||||
|
||||
let role_name = RoleName::from("user");
|
||||
let endpoint = EndpointId::from("ep");
|
||||
|
||||
let jwk_cache = Arc::new(JwkCacheEntryLock::default());
|
||||
|
||||
@@ -715,6 +741,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
|
||||
&RequestMonitoring::test(),
|
||||
&token,
|
||||
&client,
|
||||
endpoint.clone(),
|
||||
role_name.clone(),
|
||||
&Fetch(addr),
|
||||
)
|
||||
|
||||
@@ -9,8 +9,9 @@ use crate::{
|
||||
messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo},
|
||||
NodeInfo,
|
||||
},
|
||||
context::RequestMonitoring,
|
||||
intern::{BranchIdInt, BranchIdTag, EndpointIdTag, InternId, ProjectIdInt, ProjectIdTag},
|
||||
RoleName,
|
||||
EndpointId, RoleName,
|
||||
};
|
||||
|
||||
use super::jwt::{AuthRule, FetchAuthRules, JwkCache};
|
||||
@@ -57,7 +58,12 @@ pub struct JwksRoleSettings {
|
||||
}
|
||||
|
||||
impl FetchAuthRules for StaticAuthRules {
|
||||
async fn fetch_auth_rules(&self, role_name: RoleName) -> anyhow::Result<Vec<AuthRule>> {
|
||||
async fn fetch_auth_rules(
|
||||
&self,
|
||||
_ctx: &RequestMonitoring,
|
||||
_endpoint: EndpointId,
|
||||
role_name: RoleName,
|
||||
) -> anyhow::Result<Vec<AuthRule>> {
|
||||
let mappings = JWKS_ROLE_MAP.load();
|
||||
let role_mappings = mappings
|
||||
.as_deref()
|
||||
|
||||
@@ -303,6 +303,7 @@ impl NodeInfo {
|
||||
|
||||
pub(crate) fn set_keys(&mut self, keys: &ComputeCredentialKeys) {
|
||||
match keys {
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
ComputeCredentialKeys::Password(password) => self.config.password(password),
|
||||
ComputeCredentialKeys::AuthKeys(auth_keys) => self.config.auth_keys(*auth_keys),
|
||||
ComputeCredentialKeys::None => &mut self.config,
|
||||
|
||||
@@ -79,6 +79,40 @@ pub(crate) enum AuthMethod {
|
||||
Cleartext,
|
||||
}
|
||||
|
||||
impl Clone for RequestMonitoring {
|
||||
fn clone(&self) -> Self {
|
||||
let inner = self.0.try_lock().expect("should not deadlock");
|
||||
let new = RequestMonitoringInner {
|
||||
peer_addr: inner.peer_addr,
|
||||
session_id: inner.session_id,
|
||||
protocol: inner.protocol,
|
||||
first_packet: inner.first_packet,
|
||||
region: inner.region,
|
||||
span: info_span!("background_task"),
|
||||
|
||||
project: inner.project,
|
||||
branch: inner.branch,
|
||||
endpoint_id: inner.endpoint_id.clone(),
|
||||
dbname: inner.dbname.clone(),
|
||||
user: inner.user.clone(),
|
||||
application: inner.application.clone(),
|
||||
error_kind: inner.error_kind,
|
||||
auth_method: inner.auth_method.clone(),
|
||||
success: inner.success,
|
||||
rejected: inner.rejected,
|
||||
cold_start_info: inner.cold_start_info,
|
||||
pg_options: inner.pg_options.clone(),
|
||||
|
||||
sender: None,
|
||||
disconnect_sender: None,
|
||||
latency_timer: LatencyTimer::noop(inner.protocol),
|
||||
disconnect_timestamp: inner.disconnect_timestamp,
|
||||
};
|
||||
|
||||
Self(TryLock::new(new))
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestMonitoring {
|
||||
pub fn new(
|
||||
session_id: Uuid,
|
||||
|
||||
@@ -397,6 +397,8 @@ pub struct LatencyTimer {
|
||||
protocol: Protocol,
|
||||
cold_start_info: ColdStartInfo,
|
||||
outcome: ConnectOutcome,
|
||||
|
||||
skip_reporting: bool,
|
||||
}
|
||||
|
||||
impl LatencyTimer {
|
||||
@@ -409,6 +411,20 @@ impl LatencyTimer {
|
||||
cold_start_info: ColdStartInfo::Unknown,
|
||||
// assume failed unless otherwise specified
|
||||
outcome: ConnectOutcome::Failed,
|
||||
skip_reporting: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn noop(protocol: Protocol) -> Self {
|
||||
Self {
|
||||
start: time::Instant::now(),
|
||||
stop: None,
|
||||
accumulated: Accumulated::default(),
|
||||
protocol,
|
||||
cold_start_info: ColdStartInfo::Unknown,
|
||||
// assume failed unless otherwise specified
|
||||
outcome: ConnectOutcome::Failed,
|
||||
skip_reporting: true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -443,6 +459,10 @@ pub enum ConnectOutcome {
|
||||
|
||||
impl Drop for LatencyTimer {
|
||||
fn drop(&mut self) {
|
||||
if self.skip_reporting {
|
||||
return;
|
||||
}
|
||||
|
||||
let duration = self
|
||||
.stop
|
||||
.unwrap_or_else(time::Instant::now)
|
||||
|
||||
@@ -27,7 +27,7 @@ use crate::{
|
||||
Host,
|
||||
};
|
||||
|
||||
use super::conn_pool::{poll_client, AuthData, Client, ConnInfo, GlobalConnPool};
|
||||
use super::conn_pool::{poll_client, Client, ConnInfo, GlobalConnPool};
|
||||
|
||||
pub(crate) struct PoolingBackend {
|
||||
pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
|
||||
@@ -274,13 +274,6 @@ impl ConnectMechanism for TokioMechanism {
|
||||
.dbname(&self.conn_info.dbname)
|
||||
.connect_timeout(timeout);
|
||||
|
||||
match &self.conn_info.auth {
|
||||
AuthData::Jwt(_) => {}
|
||||
AuthData::Password(pw) => {
|
||||
config.password(pw);
|
||||
}
|
||||
}
|
||||
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
|
||||
let res = config.connect(tokio_postgres::NoTls).await;
|
||||
drop(pause);
|
||||
|
||||
@@ -29,11 +29,16 @@ use tracing::{info, info_span, Instrument};
|
||||
|
||||
use super::backend::HttpConnError;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ConnInfoWithAuth {
|
||||
pub(crate) conn_info: ConnInfo,
|
||||
pub(crate) auth: AuthData,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ConnInfo {
|
||||
pub(crate) user_info: ComputeUserInfo,
|
||||
pub(crate) dbname: DbName,
|
||||
pub(crate) auth: AuthData,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -787,7 +792,6 @@ mod tests {
|
||||
options: NeonOptions::default(),
|
||||
},
|
||||
dbname: "dbname".into(),
|
||||
auth: AuthData::Password("password".as_bytes().into()),
|
||||
};
|
||||
let ep_pool = Arc::downgrade(
|
||||
&pool.get_or_create_endpoint_pool(&conn_info.endpoint_cache_key().unwrap()),
|
||||
@@ -845,7 +849,6 @@ mod tests {
|
||||
options: NeonOptions::default(),
|
||||
},
|
||||
dbname: "dbname".into(),
|
||||
auth: AuthData::Password("password".as_bytes().into()),
|
||||
};
|
||||
let ep_pool = Arc::downgrade(
|
||||
&pool.get_or_create_endpoint_pool(&conn_info.endpoint_cache_key().unwrap()),
|
||||
|
||||
@@ -60,6 +60,7 @@ use super::backend::PoolingBackend;
|
||||
use super::conn_pool::AuthData;
|
||||
use super::conn_pool::Client;
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::conn_pool::ConnInfoWithAuth;
|
||||
use super::http_util::json_response;
|
||||
use super::json::json_to_pg_text;
|
||||
use super::json::pg_text_row_to_json;
|
||||
@@ -148,7 +149,7 @@ fn get_conn_info(
|
||||
ctx: &RequestMonitoring,
|
||||
headers: &HeaderMap,
|
||||
tls: Option<&TlsConfig>,
|
||||
) -> Result<ConnInfo, ConnInfoError> {
|
||||
) -> Result<ConnInfoWithAuth, ConnInfoError> {
|
||||
// HTTP only uses cleartext (for now and likely always)
|
||||
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
|
||||
|
||||
@@ -235,11 +236,8 @@ fn get_conn_info(
|
||||
options: options.unwrap_or_default(),
|
||||
};
|
||||
|
||||
Ok(ConnInfo {
|
||||
user_info,
|
||||
dbname,
|
||||
auth,
|
||||
})
|
||||
let conn_info = ConnInfo { user_info, dbname };
|
||||
Ok(ConnInfoWithAuth { conn_info, auth })
|
||||
}
|
||||
|
||||
// TODO: return different http error codes
|
||||
@@ -523,7 +521,10 @@ async fn handle_inner(
|
||||
|
||||
// TLS config should be there.
|
||||
let conn_info = get_conn_info(ctx, headers, config.tls_config.as_ref())?;
|
||||
info!(user = conn_info.user_info.user.as_str(), "credentials");
|
||||
info!(
|
||||
user = conn_info.conn_info.user_info.user.as_str(),
|
||||
"credentials"
|
||||
);
|
||||
|
||||
// Allow connection pooling only if explicitly requested
|
||||
// or if we have decided that http pool is no longer opt-in
|
||||
@@ -568,20 +569,20 @@ async fn handle_inner(
|
||||
.authenticate_with_password(
|
||||
ctx,
|
||||
&config.authentication_config,
|
||||
&conn_info.user_info,
|
||||
&conn_info.conn_info.user_info,
|
||||
pw,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
AuthData::Jwt(jwt) => {
|
||||
backend
|
||||
.authenticate_with_jwt(ctx, &conn_info.user_info, jwt)
|
||||
.authenticate_with_jwt(ctx, &conn_info.conn_info.user_info, jwt)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
let client = backend
|
||||
.connect_to_compute(ctx, conn_info, keys, !allow_pool)
|
||||
.connect_to_compute(ctx, conn_info.conn_info, keys, !allow_pool)
|
||||
.await?;
|
||||
// not strictly necessary to mark success here,
|
||||
// but it's just insurance for if we forget it somewhere else
|
||||
|
||||
@@ -17,6 +17,7 @@ use postgres_ffi::MAX_SEND_SIZE;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
|
||||
use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName};
|
||||
use sha2::{Digest, Sha256};
|
||||
use utils::id::NodeId;
|
||||
use utils::id::TenantTimelineId;
|
||||
@@ -51,6 +52,9 @@ pub struct Args {
|
||||
/// Dump full term history. True by default.
|
||||
pub dump_term_history: bool,
|
||||
|
||||
/// Dump last modified time of WAL segments. Uses value of `dump_all` by default.
|
||||
pub dump_wal_last_modified: bool,
|
||||
|
||||
/// Filter timelines by tenant_id.
|
||||
pub tenant_id: Option<TenantId>,
|
||||
|
||||
@@ -128,12 +132,19 @@ async fn build_from_tli_dump(
|
||||
None
|
||||
};
|
||||
|
||||
let wal_last_modified = if args.dump_wal_last_modified {
|
||||
get_wal_last_modified(timeline_dir).ok().flatten()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Timeline {
|
||||
tenant_id: timeline.ttid.tenant_id,
|
||||
timeline_id: timeline.ttid.timeline_id,
|
||||
control_file,
|
||||
memory,
|
||||
disk_content,
|
||||
wal_last_modified,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,6 +167,7 @@ pub struct Timeline {
|
||||
pub control_file: Option<TimelinePersistentState>,
|
||||
pub memory: Option<Memory>,
|
||||
pub disk_content: Option<DiskContent>,
|
||||
pub wal_last_modified: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@@ -302,6 +314,27 @@ fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Get highest modified time of WAL segments in the directory.
|
||||
fn get_wal_last_modified(path: &Utf8Path) -> Result<Option<DateTime<Utc>>> {
|
||||
let mut res = None;
|
||||
for entry in fs::read_dir(path)? {
|
||||
if entry.is_err() {
|
||||
continue;
|
||||
}
|
||||
let entry = entry?;
|
||||
/* Ignore files that are not XLOG segments */
|
||||
let fname = entry.file_name();
|
||||
if !IsXLogFileName(&fname) && !IsPartialXLogFileName(&fname) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let metadata = entry.metadata()?;
|
||||
let modified: DateTime<Utc> = DateTime::from(metadata.modified()?);
|
||||
res = std::cmp::max(res, Some(modified));
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
/// Converts SafeKeeperConf to Config, filtering out the fields that are not
|
||||
/// supposed to be exposed.
|
||||
fn build_config(config: SafeKeeperConf) -> Config {
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
openapi: "3.0.2"
|
||||
info:
|
||||
title: Safekeeper control API
|
||||
description: Neon Safekeeper API
|
||||
version: "1.0"
|
||||
license:
|
||||
name: "Apache"
|
||||
url: https://github.com/neondatabase/neon/blob/main/LICENSE
|
||||
|
||||
|
||||
servers:
|
||||
@@ -386,6 +390,12 @@ components:
|
||||
msg:
|
||||
type: string
|
||||
|
||||
NotFoundError:
|
||||
type: object
|
||||
properties:
|
||||
msg:
|
||||
type: string
|
||||
|
||||
responses:
|
||||
|
||||
#
|
||||
|
||||
@@ -481,6 +481,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
|
||||
let mut dump_memory: Option<bool> = None;
|
||||
let mut dump_disk_content: Option<bool> = None;
|
||||
let mut dump_term_history: Option<bool> = None;
|
||||
let mut dump_wal_last_modified: Option<bool> = None;
|
||||
let mut tenant_id: Option<TenantId> = None;
|
||||
let mut timeline_id: Option<TimelineId> = None;
|
||||
|
||||
@@ -494,6 +495,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
|
||||
"dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
|
||||
"dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
|
||||
"dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
|
||||
"dump_wal_last_modified" => dump_wal_last_modified = Some(parse_kv_str(&k, &v)?),
|
||||
"tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
|
||||
"timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
|
||||
_ => Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
@@ -508,6 +510,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
|
||||
let dump_memory = dump_memory.unwrap_or(dump_all);
|
||||
let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
|
||||
let dump_term_history = dump_term_history.unwrap_or(true);
|
||||
let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all);
|
||||
|
||||
let args = debug_dump::Args {
|
||||
dump_all,
|
||||
@@ -515,6 +518,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
|
||||
dump_memory,
|
||||
dump_disk_content,
|
||||
dump_term_history,
|
||||
dump_wal_last_modified,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
};
|
||||
|
||||
@@ -539,20 +539,17 @@ async fn remove_segments_from_disk(
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
let entry_path = entry.path();
|
||||
let fname = entry_path.file_name().unwrap();
|
||||
|
||||
if let Some(fname_str) = fname.to_str() {
|
||||
/* Ignore files that are not XLOG segments */
|
||||
if !IsXLogFileName(fname_str) && !IsPartialXLogFileName(fname_str) {
|
||||
continue;
|
||||
}
|
||||
let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
|
||||
if remove_predicate(segno) {
|
||||
remove_file(entry_path).await?;
|
||||
n_removed += 1;
|
||||
min_removed = min(min_removed, segno);
|
||||
max_removed = max(max_removed, segno);
|
||||
REMOVED_WAL_SEGMENTS.inc();
|
||||
}
|
||||
/* Ignore files that are not XLOG segments */
|
||||
if !IsXLogFileName(fname) && !IsPartialXLogFileName(fname) {
|
||||
continue;
|
||||
}
|
||||
let (segno, _) = XLogFromFileName(fname, wal_seg_size)?;
|
||||
if remove_predicate(segno) {
|
||||
remove_file(entry_path).await?;
|
||||
n_removed += 1;
|
||||
min_removed = min(min_removed, segno);
|
||||
max_removed = max(max_removed, segno);
|
||||
REMOVED_WAL_SEGMENTS.inc();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use anyhow::Context;
|
||||
use itertools::Itertools;
|
||||
use pageserver::tenant::checks::check_valid_layermap;
|
||||
use pageserver::tenant::layer_map::LayerMap;
|
||||
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
@@ -48,56 +49,6 @@ impl TimelineAnalysis {
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks whether a layer map is valid (i.e., is a valid result of the current compaction algorithm if nothing goes wrong).
|
||||
/// The function checks if we can split the LSN range of a delta layer only at the LSNs of the delta layers. For example,
|
||||
///
|
||||
/// ```plain
|
||||
/// | | | |
|
||||
/// | 1 | | 2 | | 3 |
|
||||
/// | | | | | |
|
||||
/// ```
|
||||
///
|
||||
/// This is not a valid layer map because the LSN range of layer 1 intersects with the LSN range of layer 2. 1 and 2 should have
|
||||
/// the same LSN range.
|
||||
///
|
||||
/// The exception is that when layer 2 only contains a single key, it could be split over the LSN range. For example,
|
||||
///
|
||||
/// ```plain
|
||||
/// | | | 2 | | |
|
||||
/// | 1 | |-------| | 3 |
|
||||
/// | | | 4 | | |
|
||||
///
|
||||
/// If layer 2 and 4 contain the same single key, this is also a valid layer map.
|
||||
fn check_valid_layermap(metadata: &HashMap<LayerName, LayerFileMetadata>) -> Option<String> {
|
||||
let mut lsn_split_point = BTreeSet::new(); // TODO: use a better data structure (range tree / range set?)
|
||||
let mut all_delta_layers = Vec::new();
|
||||
for (name, _) in metadata.iter() {
|
||||
if let LayerName::Delta(layer) = name {
|
||||
if layer.key_range.start.next() != layer.key_range.end {
|
||||
all_delta_layers.push(layer.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
for layer in &all_delta_layers {
|
||||
let lsn_range = &layer.lsn_range;
|
||||
lsn_split_point.insert(lsn_range.start);
|
||||
lsn_split_point.insert(lsn_range.end);
|
||||
}
|
||||
for layer in &all_delta_layers {
|
||||
let lsn_range = layer.lsn_range.clone();
|
||||
let intersects = lsn_split_point.range(lsn_range).collect_vec();
|
||||
if intersects.len() > 1 {
|
||||
let err = format!(
|
||||
"layer violates the layer map LSN split assumption: layer {} intersects with LSN [{}]",
|
||||
layer,
|
||||
intersects.into_iter().map(|lsn| lsn.to_string()).join(", ")
|
||||
);
|
||||
return Some(err);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
remote_client: &GenericRemoteStorage,
|
||||
id: &TenantShardTimelineId,
|
||||
@@ -177,7 +128,8 @@ pub(crate) async fn branch_cleanup_and_check_errors(
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(err) = check_valid_layermap(&index_part.layer_metadata) {
|
||||
let layer_names = index_part.layer_metadata.keys().cloned().collect_vec();
|
||||
if let Some(err) = check_valid_layermap(&layer_names) {
|
||||
result.errors.push(format!(
|
||||
"index_part.json contains invalid layer map structure: {err}"
|
||||
));
|
||||
|
||||
@@ -3863,9 +3863,6 @@ def static_proxy(
|
||||
dbname = vanilla_pg.default_options["dbname"]
|
||||
auth_endpoint = f"postgres://proxy:password@{host}:{port}/{dbname}"
|
||||
|
||||
# require password for 'http_auth' user
|
||||
vanilla_pg.edit_hba([f"host {dbname} http_auth {host} password"])
|
||||
|
||||
# For simplicity, we use the same user for both `--auth-endpoint` and `safe_psql`
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user proxy with login superuser password 'password'")
|
||||
|
||||
@@ -142,6 +142,7 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
|
||||
"image_creation_threshold": "1",
|
||||
# set PITR interval to be small, so we can do GC
|
||||
"pitr_interval": "0 s",
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -11,7 +11,9 @@ from fixtures.utils import print_gc_result, query_scalar
|
||||
#
|
||||
def test_branch_behind(neon_env_builder: NeonEnvBuilder):
|
||||
# Disable pitr, because here we want to test branch creation after GC
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf={"pitr_interval": "0 sec", "lsn_lease_length": "0s"}
|
||||
)
|
||||
|
||||
error_regexes = [
|
||||
".*invalid branch start lsn.*",
|
||||
|
||||
@@ -419,7 +419,7 @@ def test_duplicate_creation(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
|
||||
def test_branching_while_stuck_find_gc_cutoffs(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
|
||||
@@ -240,6 +240,7 @@ def test_uploads_and_deletions(
|
||||
"image_creation_threshold": "1",
|
||||
"image_layer_creation_check_threshold": "0",
|
||||
"compaction_algorithm": json.dumps({"kind": compaction_algorithm.value}),
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
|
||||
|
||||
|
||||
@@ -222,7 +222,7 @@ def pgbench_accounts_initialized(ep):
|
||||
# Without hs feedback enabled we'd see 'User query might have needed to see row
|
||||
# versions that must be removed.' errors.
|
||||
def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
|
||||
agressive_vacuum_conf = [
|
||||
"log_autovacuum_min_duration = 0",
|
||||
"autovacuum_naptime = 10s",
|
||||
|
||||
@@ -173,6 +173,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
|
||||
# "image_creation_threshold": set at runtime
|
||||
"compaction_target_size": f"{128 * (1024**2)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
|
||||
"image_layer_creation_check_threshold": "0", # always check if a new image layer can be created
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
|
||||
def tenant_update_config(changes):
|
||||
|
||||
@@ -53,6 +53,7 @@ TENANT_CONF = {
|
||||
# create image layers eagerly, so that GC can remove some layers
|
||||
"image_creation_threshold": "1",
|
||||
"image_layer_creation_check_threshold": "0",
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -244,6 +244,7 @@ def test_remote_storage_upload_queue_retries(
|
||||
# create image layers eagerly, so that GC can remove some layers
|
||||
"image_creation_threshold": "1",
|
||||
"image_layer_creation_check_threshold": "0",
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
)
|
||||
|
||||
@@ -391,6 +392,7 @@ def test_remote_timeline_client_calls_started_metric(
|
||||
# disable background compaction and GC. We invoke it manually when we want it to happen.
|
||||
"gc_period": "0s",
|
||||
"compaction_period": "0s",
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -200,6 +200,7 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
|
||||
# Disable automatic creation of image layers, as we will create them explicitly when we want them
|
||||
"image_creation_threshold": 9999,
|
||||
"image_layer_creation_check_threshold": 0,
|
||||
"lsn_lease_length": "0s",
|
||||
}
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
|
||||
@@ -485,7 +485,7 @@ def test_storage_controller_compute_hook(
|
||||
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
|
||||
|
||||
# Initial notification from tenant creation
|
||||
assert len(notifications) == 1
|
||||
|
||||
@@ -204,6 +204,7 @@ def test_scrubber_physical_gc_ancestors(
|
||||
# No PITR, so that as soon as child shards generate an image layer, it covers ancestor deltas
|
||||
# and makes them GC'able
|
||||
"pitr_interval": "0s",
|
||||
"lsn_lease_length": "0s",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -266,13 +266,13 @@ def test_tenant_reattach_while_busy(
|
||||
|
||||
|
||||
def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env = neon_env_builder.init_start(initial_tenant_conf={"lsn_lease_length": "0s"})
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
env.pageserver.allowed_errors.extend(PERMIT_PAGE_SERVICE_ERRORS)
|
||||
|
||||
# create new nenant
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
|
||||
|
||||
# assert tenant exists on disk
|
||||
assert env.pageserver.tenant_dir(tenant_id).exists()
|
||||
|
||||
@@ -45,7 +45,10 @@ def test_gc_blocking_by_timeline(neon_env_builder: NeonEnvBuilder, sharded: bool
|
||||
tenant_after = http.tenant_status(env.initial_tenant)
|
||||
assert tenant_before != tenant_after
|
||||
gc_blocking = tenant_after["gc_blocking"]
|
||||
assert gc_blocking == "BlockingReasons { timelines: 1, reasons: EnumSet(Manual) }"
|
||||
assert (
|
||||
gc_blocking
|
||||
== "BlockingReasons { tenant_blocked_by_lsn_lease_deadline: false, timelines: 1, reasons: EnumSet(Manual) }"
|
||||
)
|
||||
|
||||
wait_for_another_gc_round()
|
||||
pss.assert_log_contains(gc_skipped_line)
|
||||
|
||||
@@ -892,6 +892,7 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
log.info(f"debug_dump before reboot {debug_dump_0}")
|
||||
assert debug_dump_0["timelines_count"] == 1
|
||||
assert debug_dump_0["timelines"][0]["timeline_id"] == str(timeline_id)
|
||||
assert debug_dump_0["timelines"][0]["wal_last_modified"] != ""
|
||||
|
||||
endpoint.safe_psql("create table t(i int)")
|
||||
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: a317b9b5b9...87cb68f899
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 6f6d77fb59...72b904c0b3
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 3e74e0a7df...3ec6e2496f
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: 9156d63ce2...5bbb9bd93d
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17rc1",
|
||||
"9156d63ce253bed9d1f76355ceec610e444eaffa"
|
||||
"5bbb9bd93dd805e90bd8af15d00080363d18ec68"
|
||||
],
|
||||
"v16": [
|
||||
"16.4",
|
||||
"0baa7346dfd42d61912eeca554c9bb0a190f0a1e"
|
||||
"3ec6e2496f64c6fec35c67cb82efd6490a6a4738"
|
||||
],
|
||||
"v15": [
|
||||
"15.8",
|
||||
"6f6d77fb5960602fcd3fd130aca9f99ecb1619c9"
|
||||
"72b904c0b3ac43bd74d1e8e6d772e2c476ae25b1"
|
||||
],
|
||||
"v14": [
|
||||
"14.13",
|
||||
"a317b9b5b96978b49e78986697f3dd80d06f99a7"
|
||||
"87cb68f899db434cd6f1908cf0ac8fdeafdd88c1"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user