mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-05 15:20:39 +00:00
Compare commits
10 Commits
release-co
...
conrad/par
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0e952cee8e | ||
|
|
6d7ab67401 | ||
|
|
3ff8afa32c | ||
|
|
682a54fa9e | ||
|
|
d5c17559ce | ||
|
|
e8ccb4a4d1 | ||
|
|
bdd68bb069 | ||
|
|
783260b88a | ||
|
|
15f633922a | ||
|
|
c34d36d8a2 |
119
Cargo.lock
generated
119
Cargo.lock
generated
@@ -1009,6 +1009,12 @@ dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "boxcar"
|
||||
version = "0.2.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "26c4925bc979b677330a8c7fe7a8c94af2dbb4a2d37b4a20a80d884400f46baa"
|
||||
|
||||
[[package]]
|
||||
name = "bstr"
|
||||
version = "1.5.0"
|
||||
@@ -1242,14 +1248,14 @@ checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7"
|
||||
|
||||
[[package]]
|
||||
name = "clashmap"
|
||||
version = "1.0.0"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "93bd59c81e2bd87a775ae2de75f070f7e2bfe97363a6ad652f46824564c23e4d"
|
||||
checksum = "e8a055b1f1bf558eae4959f6dd77cf2d7d50ae1483928e60ef21ca5a24fd4321"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"hashbrown 0.15.2",
|
||||
"lock_api",
|
||||
"parking_lot_core 0.9.8",
|
||||
"parking_lot_core 0.9.10",
|
||||
"polonius-the-crab",
|
||||
"replace_with",
|
||||
]
|
||||
@@ -1753,19 +1759,6 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dashmap"
|
||||
version = "5.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6943ae99c34386c84a470c499d3414f66502a41340aa895406e0d2e4a207b91d"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"hashbrown 0.14.5",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core 0.9.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dashmap"
|
||||
version = "6.1.0"
|
||||
@@ -1777,7 +1770,7 @@ dependencies = [
|
||||
"hashbrown 0.14.5",
|
||||
"lock_api",
|
||||
"once_cell",
|
||||
"parking_lot_core 0.9.8",
|
||||
"parking_lot_core 0.9.10",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2343,6 +2336,12 @@ version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
||||
|
||||
[[package]]
|
||||
name = "foldhash"
|
||||
version = "0.1.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
|
||||
|
||||
[[package]]
|
||||
name = "form_urlencoded"
|
||||
version = "1.2.1"
|
||||
@@ -2582,7 +2581,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "842dc78579ce01e6a1576ad896edc92fca002dd60c9c3746b7fc2bec6fb429d0"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"dashmap 6.1.0",
|
||||
"dashmap",
|
||||
"futures-sink",
|
||||
"futures-timer",
|
||||
"futures-util",
|
||||
@@ -3288,7 +3287,7 @@ dependencies = [
|
||||
"clap",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-utils",
|
||||
"dashmap 6.1.0",
|
||||
"dashmap",
|
||||
"env_logger",
|
||||
"indexmap 2.9.0",
|
||||
"itoa",
|
||||
@@ -3545,16 +3544,6 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lasso"
|
||||
version = "0.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4644821e1c3d7a560fe13d842d13f587c07348a1a05d3a797152d41c90c56df2"
|
||||
dependencies = [
|
||||
"dashmap 5.5.0",
|
||||
"hashbrown 0.13.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lazy_static"
|
||||
version = "1.5.0"
|
||||
@@ -3674,17 +3663,17 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
|
||||
|
||||
[[package]]
|
||||
name = "measured"
|
||||
version = "0.0.22"
|
||||
version = "0.0.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3051f3a030d55d680cdef6ca50e80abd1182f8da29f2344a7c9cb575721138f0"
|
||||
checksum = "d22ae866c28b9c59afaeb488ad6e3bd148570cf5a2bacf6c4845def5b9a03470"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"crossbeam-utils",
|
||||
"hashbrown 0.14.5",
|
||||
"itoa",
|
||||
"lasso",
|
||||
"measured-derive",
|
||||
"memchr",
|
||||
"paracord",
|
||||
"parking_lot 0.12.1",
|
||||
"rustc-hash 1.1.0",
|
||||
"ryu",
|
||||
@@ -3692,9 +3681,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "measured-derive"
|
||||
version = "0.0.22"
|
||||
version = "0.0.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b9e6777fc80a575f9503d908c8b498782a6c3ee88a06cb416dc3941401e43b94"
|
||||
checksum = "16d734ed9dbca22d87d56b54d220f254ce921cb5cce97d4a960075af0131d076"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro2",
|
||||
@@ -3704,15 +3693,26 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "measured-process"
|
||||
version = "0.0.22"
|
||||
version = "0.0.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c4b80445aeb08e832d87bf1830049a924cdc1d6b7ef40b6b9b365bff17bf8ec"
|
||||
checksum = "f71a318d2b9edcded1e5e0ccf3c9e1e1614217d7f07933631c771daa717743aa"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"measured",
|
||||
"procfs",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "measured-tokio"
|
||||
version = "0.0.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5e79a936051c484268e2d71a5dd01219096c62c8ff09afad95f07c14a3b0c580"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"measured",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.6.4"
|
||||
@@ -4571,6 +4571,21 @@ dependencies = [
|
||||
"seize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "paracord"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5e41e924113f9a05eecd561c6e695648f243a03b3ad8d9b3eff689342b95023b"
|
||||
dependencies = [
|
||||
"boxcar",
|
||||
"clashmap",
|
||||
"foldhash",
|
||||
"hashbrown 0.15.2",
|
||||
"serde",
|
||||
"sync_wrapper 1.0.1",
|
||||
"typed-arena",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking"
|
||||
version = "2.1.1"
|
||||
@@ -4595,7 +4610,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
"parking_lot_core 0.9.8",
|
||||
"parking_lot_core 0.9.10",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4614,15 +4629,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot_core"
|
||||
version = "0.9.8"
|
||||
version = "0.9.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447"
|
||||
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"redox_syscall 0.3.5",
|
||||
"redox_syscall 0.5.10",
|
||||
"smallvec",
|
||||
"windows-targets 0.48.0",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5328,13 +5343,14 @@ dependencies = [
|
||||
"itoa",
|
||||
"jose-jwa",
|
||||
"jose-jwk",
|
||||
"lasso",
|
||||
"measured",
|
||||
"measured-tokio",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"opentelemetry",
|
||||
"p256 0.13.2",
|
||||
"papaya",
|
||||
"paracord",
|
||||
"parking_lot 0.12.1",
|
||||
"parquet",
|
||||
"parquet_derive",
|
||||
@@ -5637,6 +5653,15 @@ dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.5.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1"
|
||||
dependencies = [
|
||||
"bitflags 2.8.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.10.2"
|
||||
@@ -6897,12 +6922,12 @@ dependencies = [
|
||||
"hyper 0.14.30",
|
||||
"itertools 0.10.5",
|
||||
"json-structural-diff",
|
||||
"lasso",
|
||||
"measured",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"paracord",
|
||||
"postgres_connection",
|
||||
"posthog_client_lite",
|
||||
"rand 0.8.5",
|
||||
@@ -6991,6 +7016,7 @@ dependencies = [
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"reqwest",
|
||||
"safekeeper_api",
|
||||
"serde_json",
|
||||
"storage_controller_client",
|
||||
"tokio",
|
||||
@@ -8016,6 +8042,12 @@ dependencies = [
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "typed-arena"
|
||||
version = "2.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6af6ae20167a9ece4bcb41af5b80f8a1f1df981f6391189ce00fd257af04126a"
|
||||
|
||||
[[package]]
|
||||
name = "typed-json"
|
||||
version = "0.1.1"
|
||||
@@ -8760,6 +8792,7 @@ dependencies = [
|
||||
"num-traits",
|
||||
"once_cell",
|
||||
"p256 0.13.2",
|
||||
"paracord",
|
||||
"parquet",
|
||||
"prettyplease",
|
||||
"proc-macro2",
|
||||
|
||||
@@ -128,11 +128,11 @@ itertools = "0.10"
|
||||
itoa = "1.0.11"
|
||||
jemalloc_pprof = { version = "0.7", features = ["symbolize", "flamegraph"] }
|
||||
jsonwebtoken = "9"
|
||||
lasso = "0.7"
|
||||
libc = "0.2"
|
||||
md5 = "0.7.0"
|
||||
measured = { version = "0.0.22", features=["lasso"] }
|
||||
measured-process = { version = "0.0.22" }
|
||||
measured = { version = "0.0.23", features = ["paracord"] }
|
||||
measured-process = { version = "0.0.23" }
|
||||
measured-tokio = { version = "0.0.23" }
|
||||
memoffset = "0.9"
|
||||
nix = { version = "0.30.1", features = ["dir", "fs", "mman", "process", "socket", "signal", "poll"] }
|
||||
# Do not update to >= 7.0.0, at least. The update will have a significant impact
|
||||
@@ -145,6 +145,7 @@ opentelemetry = "0.27"
|
||||
opentelemetry_sdk = "0.27"
|
||||
opentelemetry-otlp = { version = "0.27", default-features = false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions = "0.27"
|
||||
paracord = { version = "0.1.0", features = ["serde"] }
|
||||
parking_lot = "0.12"
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
parquet_derive = "53"
|
||||
|
||||
@@ -36,7 +36,7 @@ impl StorageBroker {
|
||||
pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
|
||||
let broker = &self.env.broker;
|
||||
|
||||
print!("Starting neon broker at {}", broker.client_url());
|
||||
println!("Starting neon broker at {}", broker.client_url());
|
||||
|
||||
let mut args = Vec::new();
|
||||
|
||||
|
||||
@@ -303,7 +303,7 @@ impl PageServerNode {
|
||||
async fn start_node(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
|
||||
// TODO: using a thread here because start_process() is not async but we need to call check_status()
|
||||
let datadir = self.repo_path();
|
||||
print!(
|
||||
println!(
|
||||
"Starting pageserver node {} at '{}' in {:?}, retrying for {:?}",
|
||||
self.conf.id,
|
||||
self.pg_connection_config.raw_address(),
|
||||
|
||||
@@ -127,7 +127,7 @@ impl SafekeeperNode {
|
||||
extra_opts: &[String],
|
||||
retry_timeout: &Duration,
|
||||
) -> anyhow::Result<()> {
|
||||
print!(
|
||||
println!(
|
||||
"Starting safekeeper at '{}' in '{}', retrying for {:?}",
|
||||
self.pg_connection_config.raw_address(),
|
||||
self.datadir_path().display(),
|
||||
|
||||
@@ -660,7 +660,7 @@ impl StorageController {
|
||||
));
|
||||
}
|
||||
|
||||
println!("Starting storage controller");
|
||||
println!("Starting storage controller at {scheme}://{host}:{listen_port}");
|
||||
|
||||
background_process::start_process(
|
||||
COMMAND,
|
||||
|
||||
@@ -14,6 +14,7 @@ humantime.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
reqwest.workspace = true
|
||||
safekeeper_api.workspace=true
|
||||
serde_json = { workspace = true, features = ["raw_value"] }
|
||||
storage_controller_client.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -11,7 +11,7 @@ use pageserver_api::controller_api::{
|
||||
PlacementPolicy, SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest,
|
||||
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
|
||||
SkSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
|
||||
TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
TenantShardMigrateRequest, TenantShardMigrateResponse, TimelineSafekeeperMigrateRequest,
|
||||
};
|
||||
use pageserver_api::models::{
|
||||
EvictionPolicy, EvictionPolicyLayerAccessThreshold, ShardParameters, TenantConfig,
|
||||
@@ -21,6 +21,7 @@ use pageserver_api::models::{
|
||||
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
|
||||
use pageserver_client::mgmt_api::{self};
|
||||
use reqwest::{Certificate, Method, StatusCode, Url};
|
||||
use safekeeper_api::models::TimelineLocateResponse;
|
||||
use storage_controller_client::control_api::Client;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
|
||||
@@ -279,6 +280,23 @@ enum Command {
|
||||
#[arg(long)]
|
||||
concurrency: Option<usize>,
|
||||
},
|
||||
/// Locate safekeepers for a timeline from the storcon DB.
|
||||
TimelineLocate {
|
||||
#[arg(long)]
|
||||
tenant_id: TenantId,
|
||||
#[arg(long)]
|
||||
timeline_id: TimelineId,
|
||||
},
|
||||
/// Migrate a timeline to a new set of safekeepers
|
||||
TimelineSafekeeperMigrate {
|
||||
#[arg(long)]
|
||||
tenant_id: TenantId,
|
||||
#[arg(long)]
|
||||
timeline_id: TimelineId,
|
||||
/// Example: --new-sk-set 1,2,3
|
||||
#[arg(long, required = true, value_delimiter = ',')]
|
||||
new_sk_set: Vec<NodeId>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
@@ -1324,7 +1342,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
concurrency,
|
||||
} => {
|
||||
let mut path = format!(
|
||||
"/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
|
||||
"v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
|
||||
);
|
||||
|
||||
if let Some(c) = concurrency {
|
||||
@@ -1335,6 +1353,41 @@ async fn main() -> anyhow::Result<()> {
|
||||
.dispatch::<(), ()>(Method::POST, path, None)
|
||||
.await?;
|
||||
}
|
||||
Command::TimelineLocate {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
} => {
|
||||
let path = format!("debug/v1/tenant/{tenant_id}/timeline/{timeline_id}/locate");
|
||||
|
||||
let resp = storcon_client
|
||||
.dispatch::<(), TimelineLocateResponse>(Method::GET, path, None)
|
||||
.await?;
|
||||
|
||||
let sk_set = resp.sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
|
||||
let new_sk_set = resp
|
||||
.new_sk_set
|
||||
.as_ref()
|
||||
.map(|ids| ids.iter().map(|id| id.0 as i64).collect::<Vec<_>>());
|
||||
|
||||
println!("generation = {}", resp.generation);
|
||||
println!("sk_set = {sk_set:?}");
|
||||
println!("new_sk_set = {new_sk_set:?}");
|
||||
}
|
||||
Command::TimelineSafekeeperMigrate {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
new_sk_set,
|
||||
} => {
|
||||
let path = format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate");
|
||||
|
||||
storcon_client
|
||||
.dispatch::<_, ()>(
|
||||
Method::POST,
|
||||
path,
|
||||
Some(TimelineSafekeeperMigrateRequest { new_sk_set }),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -52,6 +52,7 @@ exceptions = [
|
||||
# Zlib license has some restrictions if we decide to change sth
|
||||
{ allow = ["Zlib"], name = "const_format_proc_macros", version = "*" },
|
||||
{ allow = ["Zlib"], name = "const_format", version = "*" },
|
||||
{ allow = ["Zlib"], name = "foldhash", version = "*" },
|
||||
]
|
||||
|
||||
[licenses.private]
|
||||
|
||||
@@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::models::{PageserverUtilization, ShardParameters, TenantConfig};
|
||||
use crate::models::{PageserverUtilization, ShardParameters, TenantConfig, TimelineInfo};
|
||||
use crate::shard::{ShardStripeSize, TenantShardId};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
@@ -126,6 +126,13 @@ pub struct TenantDescribeResponse {
|
||||
pub config: TenantConfig,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantTimelineDescribeResponse {
|
||||
pub shards: Vec<TimelineInfo>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image_consistent_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct NodeShardResponse {
|
||||
pub node_id: NodeId,
|
||||
|
||||
@@ -1622,6 +1622,9 @@ pub struct TimelineInfo {
|
||||
|
||||
/// Whether the timeline is invisible in synthetic size calculations.
|
||||
pub is_invisible: Option<bool>,
|
||||
// HADRON: the largest LSN below which all page updates have been included in the image layers.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image_consistent_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
||||
@@ -11,7 +11,7 @@ use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
|
||||
use crate::membership::Configuration;
|
||||
use crate::membership::{Configuration, SafekeeperGeneration};
|
||||
use crate::{ServerInfo, Term};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@@ -311,3 +311,12 @@ pub struct PullTimelineResponse {
|
||||
pub safekeeper_host: Option<String>,
|
||||
// TODO: add more fields?
|
||||
}
|
||||
|
||||
/// Response to a timeline locate request.
|
||||
/// Storcon-only API.
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct TimelineLocateResponse {
|
||||
pub generation: SafekeeperGeneration,
|
||||
pub sk_set: Vec<NodeId>,
|
||||
pub new_sk_set: Option<Vec<NodeId>>,
|
||||
}
|
||||
|
||||
@@ -397,6 +397,7 @@ async fn build_timeline_info(
|
||||
timeline: &Arc<Timeline>,
|
||||
include_non_incremental_logical_size: bool,
|
||||
force_await_initial_logical_size: bool,
|
||||
include_image_consistent_lsn: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
@@ -421,6 +422,10 @@ async fn build_timeline_info(
|
||||
.await?,
|
||||
);
|
||||
}
|
||||
// HADRON
|
||||
if include_image_consistent_lsn {
|
||||
info.image_consistent_lsn = Some(timeline.compute_image_consistent_lsn().await?);
|
||||
}
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
@@ -510,6 +515,8 @@ async fn build_timeline_info_common(
|
||||
is_invisible: Some(is_invisible),
|
||||
|
||||
walreceiver_status,
|
||||
// HADRON
|
||||
image_consistent_lsn: None,
|
||||
};
|
||||
Ok(info)
|
||||
}
|
||||
@@ -712,6 +719,8 @@ async fn timeline_list_handler(
|
||||
parse_query_param(&request, "include-non-incremental-logical-size")?;
|
||||
let force_await_initial_logical_size: Option<bool> =
|
||||
parse_query_param(&request, "force-await-initial-logical-size")?;
|
||||
let include_image_consistent_lsn: Option<bool> =
|
||||
parse_query_param(&request, "include-image-consistent-lsn")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
@@ -732,6 +741,7 @@ async fn timeline_list_handler(
|
||||
&timeline,
|
||||
include_non_incremental_logical_size.unwrap_or(false),
|
||||
force_await_initial_logical_size.unwrap_or(false),
|
||||
include_image_consistent_lsn.unwrap_or(false),
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
|
||||
@@ -760,6 +770,9 @@ async fn timeline_and_offloaded_list_handler(
|
||||
parse_query_param(&request, "include-non-incremental-logical-size")?;
|
||||
let force_await_initial_logical_size: Option<bool> =
|
||||
parse_query_param(&request, "force-await-initial-logical-size")?;
|
||||
let include_image_consistent_lsn: Option<bool> =
|
||||
parse_query_param(&request, "include-image-consistent-lsn")?;
|
||||
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let state = get_state(&request);
|
||||
@@ -780,6 +793,7 @@ async fn timeline_and_offloaded_list_handler(
|
||||
&timeline,
|
||||
include_non_incremental_logical_size.unwrap_or(false),
|
||||
force_await_initial_logical_size.unwrap_or(false),
|
||||
include_image_consistent_lsn.unwrap_or(false),
|
||||
&ctx,
|
||||
)
|
||||
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
|
||||
@@ -964,6 +978,9 @@ async fn timeline_detail_handler(
|
||||
parse_query_param(&request, "include-non-incremental-logical-size")?;
|
||||
let force_await_initial_logical_size: Option<bool> =
|
||||
parse_query_param(&request, "force-await-initial-logical-size")?;
|
||||
// HADRON
|
||||
let include_image_consistent_lsn: Option<bool> =
|
||||
parse_query_param(&request, "include-image-consistent-lsn")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
// Logical size calculation needs downloading.
|
||||
@@ -984,6 +1001,7 @@ async fn timeline_detail_handler(
|
||||
&timeline,
|
||||
include_non_incremental_logical_size.unwrap_or(false),
|
||||
force_await_initial_logical_size.unwrap_or(false),
|
||||
include_image_consistent_lsn.unwrap_or(false),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3643,6 +3661,7 @@ async fn activate_post_import_handler(
|
||||
let timeline_info = build_timeline_info(
|
||||
&timeline, false, // include_non_incremental_logical_size,
|
||||
false, // force_await_initial_logical_size
|
||||
false, // include_image_consistent_lsn
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
|
||||
@@ -12816,6 +12816,40 @@ mod tests {
|
||||
},
|
||||
]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_force_image_creation_lsn() -> anyhow::Result<()> {
|
||||
let tenant_conf = pageserver_api::models::TenantConfig {
|
||||
pitr_interval: Some(Duration::from_secs(7 * 3600)),
|
||||
image_layer_force_creation_period: Some(Duration::from_secs(3600)),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let tenant_id = TenantId::generate();
|
||||
|
||||
let harness = TenantHarness::create_custom(
|
||||
"test_get_force_image_creation_lsn",
|
||||
tenant_conf,
|
||||
tenant_id,
|
||||
ShardIdentity::unsharded(),
|
||||
Generation::new(1),
|
||||
)
|
||||
.await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
timeline.gc_info.write().unwrap().cutoffs.time = Some(Lsn(100));
|
||||
{
|
||||
let writer = timeline.writer().await;
|
||||
writer.finish_write(Lsn(5000));
|
||||
}
|
||||
|
||||
let image_creation_lsn = timeline.get_force_image_creation_lsn().unwrap();
|
||||
assert_eq!(image_creation_lsn, Lsn(4300));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,10 +46,11 @@
|
||||
mod historic_layer_coverage;
|
||||
mod layer_coverage;
|
||||
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::collections::{BTreeMap, HashMap, VecDeque};
|
||||
use std::iter::Peekable;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Result;
|
||||
use historic_layer_coverage::BufferedHistoricLayerCoverage;
|
||||
@@ -904,6 +905,103 @@ impl LayerMap {
|
||||
max_stacked_deltas
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
/**
|
||||
* Compute the image consistent LSN, the largest LSN below which all pages have been redone successfully.
|
||||
* It works by first finding the latest image layers and store them into a map. Then for each delta layer,
|
||||
* find all overlapping image layers in order to potentially increase the image LSN in case there are gaps
|
||||
* (e.g., if an image is created at LSN 100 but the delta layer spans LSN [150, 200], then we can increase
|
||||
* image LSN to 150 because there is no WAL record in between).
|
||||
* Finally, the image consistent LSN is computed by taking the minimum of all image layers.
|
||||
*/
|
||||
pub fn compute_image_consistent_lsn(&self, disk_consistent_lsn: Lsn) -> Lsn {
|
||||
struct ImageLayerInfo {
|
||||
// creation LSN of the image layer
|
||||
image_lsn: Lsn,
|
||||
// the current minimum LSN of newer delta layers with overlapping key ranges
|
||||
min_delta_lsn: Lsn,
|
||||
}
|
||||
let started_at = Instant::now();
|
||||
|
||||
let min_l0_deltas_lsn = {
|
||||
let l0_deltas = self.level0_deltas();
|
||||
l0_deltas
|
||||
.iter()
|
||||
.map(|layer| layer.get_lsn_range().start)
|
||||
.min()
|
||||
.unwrap_or(disk_consistent_lsn)
|
||||
};
|
||||
let global_key_range = Key::MIN..Key::MAX;
|
||||
|
||||
// step 1: collect all most recent image layers into a map
|
||||
// map: end key to image_layer_info
|
||||
let mut image_map: BTreeMap<Key, ImageLayerInfo> = BTreeMap::new();
|
||||
for (img_range, img) in self.image_coverage(&global_key_range, disk_consistent_lsn) {
|
||||
let img_lsn = img.map(|layer| layer.get_lsn_range().end).unwrap_or(Lsn(0));
|
||||
image_map.insert(
|
||||
img_range.end,
|
||||
ImageLayerInfo {
|
||||
image_lsn: img_lsn,
|
||||
min_delta_lsn: min_l0_deltas_lsn,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// step 2: go through all delta layers, and update the image layer info with overlapping
|
||||
// key ranges
|
||||
for layer in self.historic.iter() {
|
||||
if !layer.is_delta {
|
||||
continue;
|
||||
}
|
||||
let delta_key_range = layer.get_key_range();
|
||||
let delta_lsn_range = layer.get_lsn_range();
|
||||
for (img_end_key, img_info) in image_map.range_mut(delta_key_range.start..Key::MAX) {
|
||||
debug_assert!(img_end_key >= &delta_key_range.start);
|
||||
if delta_lsn_range.end > img_info.image_lsn {
|
||||
// the delta layer includes WAL records after the image
|
||||
// it's possibel that the delta layer's start LSN < image LSN, which will be simply ignored by step 3
|
||||
img_info.min_delta_lsn =
|
||||
std::cmp::min(img_info.min_delta_lsn, delta_lsn_range.start);
|
||||
}
|
||||
if img_end_key >= &delta_key_range.end {
|
||||
// we have fully processed all overlapping image layers
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// step 3, go through all image layers and find the image consistent LSN
|
||||
let mut img_consistent_lsn = min_l0_deltas_lsn.checked_sub(Lsn(1)).unwrap();
|
||||
let mut prev_key = Key::MIN;
|
||||
for (img_key, img_info) in image_map {
|
||||
tracing::debug!(
|
||||
"Image layer {:?}:{} has min delta lsn {}",
|
||||
Range {
|
||||
start: prev_key,
|
||||
end: img_key,
|
||||
},
|
||||
img_info.image_lsn,
|
||||
img_info.min_delta_lsn,
|
||||
);
|
||||
let image_lsn = std::cmp::max(
|
||||
img_info.image_lsn,
|
||||
img_info.min_delta_lsn.checked_sub(Lsn(1)).unwrap_or(Lsn(0)),
|
||||
);
|
||||
img_consistent_lsn = std::cmp::min(img_consistent_lsn, image_lsn);
|
||||
prev_key = img_key;
|
||||
}
|
||||
tracing::info!(
|
||||
"computed image_consistent_lsn {} for disk_consistent_lsn {} in {}ms. Processed {} layrs in total.",
|
||||
img_consistent_lsn,
|
||||
disk_consistent_lsn,
|
||||
started_at.elapsed().as_millis(),
|
||||
self.historic.len()
|
||||
);
|
||||
img_consistent_lsn
|
||||
}
|
||||
|
||||
/* END_HADRON */
|
||||
|
||||
/// Return all L0 delta layers
|
||||
pub fn level0_deltas(&self) -> &Vec<Arc<PersistentLayerDesc>> {
|
||||
&self.l0_delta_layers
|
||||
@@ -1579,6 +1677,138 @@ mod tests {
|
||||
LayerVisibilityHint::Visible
|
||||
));
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
#[test]
|
||||
fn test_compute_image_consistent_lsn() {
|
||||
let mut layer_map = LayerMap::default();
|
||||
|
||||
let disk_consistent_lsn = Lsn(1000);
|
||||
// case 1: empty layer map
|
||||
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
|
||||
assert_eq!(
|
||||
disk_consistent_lsn.checked_sub(Lsn(1)).unwrap(),
|
||||
image_consistent_lsn
|
||||
);
|
||||
|
||||
// case 2: only L0 delta layer
|
||||
{
|
||||
let mut updates = layer_map.batch_update();
|
||||
updates.insert_historic(PersistentLayerDesc::new_test(
|
||||
Key::from_i128(0)..Key::from_i128(100),
|
||||
Lsn(900)..Lsn(990),
|
||||
true,
|
||||
));
|
||||
|
||||
updates.insert_historic(PersistentLayerDesc::new_test(
|
||||
Key::from_i128(0)..Key::from_i128(100),
|
||||
Lsn(850)..Lsn(899),
|
||||
true,
|
||||
));
|
||||
}
|
||||
|
||||
// should use min L0 delta LSN - 1 as image consistent LSN
|
||||
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
|
||||
assert_eq!(Lsn(849), image_consistent_lsn);
|
||||
|
||||
// case 3: 3 images, no L1 delta
|
||||
{
|
||||
let mut updates = layer_map.batch_update();
|
||||
updates.insert_historic(PersistentLayerDesc::new_test(
|
||||
Key::from_i128(0)..Key::from_i128(40),
|
||||
Lsn(100)..Lsn(100),
|
||||
false,
|
||||
));
|
||||
|
||||
updates.insert_historic(PersistentLayerDesc::new_test(
|
||||
Key::from_i128(40)..Key::from_i128(70),
|
||||
Lsn(200)..Lsn(200),
|
||||
false,
|
||||
));
|
||||
|
||||
updates.insert_historic(PersistentLayerDesc::new_test(
|
||||
Key::from_i128(70)..Key::from_i128(100),
|
||||
Lsn(150)..Lsn(150),
|
||||
false,
|
||||
));
|
||||
}
|
||||
// should use min L0 delta LSN - 1 as image consistent LSN
|
||||
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
|
||||
assert_eq!(Lsn(849), image_consistent_lsn);
|
||||
|
||||
// case 4: 3 images with 1 L1 delta
|
||||
{
|
||||
let mut updates = layer_map.batch_update();
|
||||
updates.insert_historic(PersistentLayerDesc::new_test(
|
||||
Key::from_i128(0)..Key::from_i128(50),
|
||||
Lsn(300)..Lsn(350),
|
||||
true,
|
||||
));
|
||||
}
|
||||
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
|
||||
assert_eq!(Lsn(299), image_consistent_lsn);
|
||||
|
||||
// case 5: 3 images with 1 more L1 delta with smaller LSN
|
||||
{
|
||||
let mut updates = layer_map.batch_update();
|
||||
updates.insert_historic(PersistentLayerDesc::new_test(
|
||||
Key::from_i128(50)..Key::from_i128(72),
|
||||
Lsn(200)..Lsn(300),
|
||||
true,
|
||||
));
|
||||
}
|
||||
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
|
||||
assert_eq!(Lsn(199), image_consistent_lsn);
|
||||
|
||||
// case 6: 3 images with more newer L1 deltas (no impact on final results)
|
||||
{
|
||||
let mut updates = layer_map.batch_update();
|
||||
updates.insert_historic(PersistentLayerDesc::new_test(
|
||||
Key::from_i128(0)..Key::from_i128(30),
|
||||
Lsn(400)..Lsn(500),
|
||||
true,
|
||||
));
|
||||
updates.insert_historic(PersistentLayerDesc::new_test(
|
||||
Key::from_i128(35)..Key::from_i128(100),
|
||||
Lsn(450)..Lsn(600),
|
||||
true,
|
||||
));
|
||||
}
|
||||
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
|
||||
assert_eq!(Lsn(199), image_consistent_lsn);
|
||||
|
||||
// case 7: 3 images with more older L1 deltas (no impact on final results)
|
||||
{
|
||||
let mut updates = layer_map.batch_update();
|
||||
updates.insert_historic(PersistentLayerDesc::new_test(
|
||||
Key::from_i128(0)..Key::from_i128(40),
|
||||
Lsn(0)..Lsn(50),
|
||||
true,
|
||||
));
|
||||
|
||||
updates.insert_historic(PersistentLayerDesc::new_test(
|
||||
Key::from_i128(50)..Key::from_i128(100),
|
||||
Lsn(10)..Lsn(60),
|
||||
true,
|
||||
));
|
||||
}
|
||||
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
|
||||
assert_eq!(Lsn(199), image_consistent_lsn);
|
||||
|
||||
// case 8: 3 images with one more L1 delta with overlapping LSN range
|
||||
{
|
||||
let mut updates = layer_map.batch_update();
|
||||
updates.insert_historic(PersistentLayerDesc::new_test(
|
||||
Key::from_i128(0)..Key::from_i128(50),
|
||||
Lsn(50)..Lsn(250),
|
||||
true,
|
||||
));
|
||||
}
|
||||
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
|
||||
assert_eq!(Lsn(100), image_consistent_lsn);
|
||||
}
|
||||
|
||||
/* END_HADRON */
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -351,13 +351,6 @@ pub struct Timeline {
|
||||
last_image_layer_creation_check_at: AtomicLsn,
|
||||
last_image_layer_creation_check_instant: std::sync::Mutex<Option<Instant>>,
|
||||
|
||||
// HADRON
|
||||
/// If a key range has writes with LSN > force_image_creation_lsn, then we should force image layer creation
|
||||
/// on this key range.
|
||||
force_image_creation_lsn: AtomicLsn,
|
||||
/// The last time instant when force_image_creation_lsn is computed.
|
||||
force_image_creation_lsn_computed_at: std::sync::Mutex<Option<Instant>>,
|
||||
|
||||
/// Current logical size of the "datadir", at the last LSN.
|
||||
current_logical_size: LogicalSize,
|
||||
|
||||
@@ -2854,7 +2847,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// HADRON
|
||||
fn get_image_creation_timeout(&self) -> Option<Duration> {
|
||||
fn get_image_layer_force_creation_period(&self) -> Option<Duration> {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
@@ -3134,9 +3127,6 @@ impl Timeline {
|
||||
repartition_threshold: 0,
|
||||
last_image_layer_creation_check_at: AtomicLsn::new(0),
|
||||
last_image_layer_creation_check_instant: Mutex::new(None),
|
||||
// HADRON
|
||||
force_image_creation_lsn: AtomicLsn::new(0),
|
||||
force_image_creation_lsn_computed_at: std::sync::Mutex::new(None),
|
||||
last_received_wal: Mutex::new(None),
|
||||
rel_size_latest_cache: RwLock::new(HashMap::new()),
|
||||
rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)),
|
||||
@@ -5381,13 +5371,16 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// HADRON
|
||||
// for child timelines, we consider all pages up to ancestor_LSN are redone successfully by the parent timeline
|
||||
min_image_lsn = min_image_lsn.max(self.get_ancestor_lsn());
|
||||
if min_image_lsn < force_image_creation_lsn.unwrap_or(Lsn(0)) && max_deltas > 0 {
|
||||
info!(
|
||||
"forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}",
|
||||
"forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}, num deltas: {}",
|
||||
partition.ranges[0].start,
|
||||
partition.ranges[0].end,
|
||||
min_image_lsn,
|
||||
force_image_creation_lsn.unwrap()
|
||||
force_image_creation_lsn.unwrap(),
|
||||
max_deltas
|
||||
);
|
||||
return true;
|
||||
}
|
||||
@@ -7153,6 +7146,19 @@ impl Timeline {
|
||||
.unwrap()
|
||||
.clone()
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
pub(crate) async fn compute_image_consistent_lsn(&self) -> anyhow::Result<Lsn> {
|
||||
let guard = self
|
||||
.layers
|
||||
.read(LayerManagerLockHolder::ComputeImageConsistentLsn)
|
||||
.await;
|
||||
let layer_map = guard.layer_map()?;
|
||||
let disk_consistent_lsn = self.get_disk_consistent_lsn();
|
||||
|
||||
Ok(layer_map.compute_image_consistent_lsn(disk_consistent_lsn))
|
||||
}
|
||||
/* END_HADRON */
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::cmp::min;
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use super::layer_manager::LayerManagerLockHolder;
|
||||
use super::{
|
||||
@@ -34,7 +34,6 @@ use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
|
||||
use pageserver_compaction::helpers::{fully_contains, overlaps_with};
|
||||
use pageserver_compaction::interface::*;
|
||||
use postgres_ffi::to_pg_timestamp;
|
||||
use serde::Serialize;
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -47,7 +46,6 @@ use wal_decoder::models::value::Value;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache;
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::statvfs::Statvfs;
|
||||
use crate::tenant::checks::check_valid_layermap;
|
||||
use crate::tenant::gc_block::GcBlock;
|
||||
@@ -1271,10 +1269,7 @@ impl Timeline {
|
||||
// Define partitioning schema if needed
|
||||
|
||||
// HADRON
|
||||
let force_image_creation_lsn = self
|
||||
.get_or_compute_force_image_creation_lsn(cancel, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
let force_image_creation_lsn = self.get_force_image_creation_lsn();
|
||||
|
||||
// 1. L0 Compact
|
||||
let l0_outcome = {
|
||||
@@ -1484,59 +1479,37 @@ impl Timeline {
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
// Get the force image creation LSN. Compute it if the last computed LSN is too old.
|
||||
async fn get_or_compute_force_image_creation_lsn(
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Option<Lsn>> {
|
||||
const FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes
|
||||
let image_layer_force_creation_period = self.get_image_creation_timeout();
|
||||
if image_layer_force_creation_period.is_none() {
|
||||
return Ok(None);
|
||||
// Get the force image creation LSN based on gc_cutoff_lsn.
|
||||
// Note that this is an estimation and the workload rate may suddenly change. When that happens,
|
||||
// the force image creation may be too early or too late, but eventually it should be able to catch up.
|
||||
pub(crate) fn get_force_image_creation_lsn(self: &Arc<Self>) -> Option<Lsn> {
|
||||
let image_creation_period = self.get_image_layer_force_creation_period()?;
|
||||
let current_lsn = self.get_last_record_lsn();
|
||||
let pitr_lsn = self.gc_info.read().unwrap().cutoffs.time?;
|
||||
let pitr_interval = self.get_pitr_interval();
|
||||
if pitr_lsn == Lsn::INVALID || pitr_interval.is_zero() {
|
||||
tracing::warn!(
|
||||
"pitr LSN/interval not found, skipping force image creation LSN calculation"
|
||||
);
|
||||
return None;
|
||||
}
|
||||
|
||||
let image_layer_force_creation_period = image_layer_force_creation_period.unwrap();
|
||||
let force_image_creation_lsn_computed_at =
|
||||
*self.force_image_creation_lsn_computed_at.lock().unwrap();
|
||||
if force_image_creation_lsn_computed_at.is_none()
|
||||
|| force_image_creation_lsn_computed_at.unwrap().elapsed()
|
||||
> FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL
|
||||
{
|
||||
let now: SystemTime = SystemTime::now();
|
||||
let timestamp = now
|
||||
.checked_sub(image_layer_force_creation_period)
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"image creation timeout is too large: {image_layer_force_creation_period:?}"
|
||||
)
|
||||
})?;
|
||||
let timestamp = to_pg_timestamp(timestamp);
|
||||
let force_image_creation_lsn = match self
|
||||
.find_lsn_for_timestamp(timestamp, cancel, ctx)
|
||||
.await?
|
||||
{
|
||||
LsnForTimestamp::Present(lsn) | LsnForTimestamp::Future(lsn) => lsn,
|
||||
_ => {
|
||||
let gc_lsn = *self.get_applied_gc_cutoff_lsn();
|
||||
tracing::info!(
|
||||
"no LSN found for timestamp {timestamp:?}, using latest GC cutoff LSN {}",
|
||||
gc_lsn
|
||||
);
|
||||
gc_lsn
|
||||
}
|
||||
};
|
||||
self.force_image_creation_lsn
|
||||
.store(force_image_creation_lsn);
|
||||
*self.force_image_creation_lsn_computed_at.lock().unwrap() = Some(Instant::now());
|
||||
tracing::info!(
|
||||
"computed force image creation LSN: {}",
|
||||
force_image_creation_lsn
|
||||
);
|
||||
Ok(Some(force_image_creation_lsn))
|
||||
} else {
|
||||
Ok(Some(self.force_image_creation_lsn.load()))
|
||||
}
|
||||
let delta_lsn = current_lsn.checked_sub(pitr_lsn).unwrap().0
|
||||
* image_creation_period.as_secs()
|
||||
/ pitr_interval.as_secs();
|
||||
let force_image_creation_lsn = current_lsn.checked_sub(delta_lsn).unwrap_or(Lsn(0));
|
||||
|
||||
tracing::info!(
|
||||
"Tenant shard {} computed force_image_creation_lsn: {}. Current lsn: {}, image_layer_force_creation_period: {:?}, GC cutoff: {}, PITR interval: {:?}",
|
||||
self.tenant_shard_id,
|
||||
force_image_creation_lsn,
|
||||
current_lsn,
|
||||
image_creation_period,
|
||||
pitr_lsn,
|
||||
pitr_interval
|
||||
);
|
||||
|
||||
Some(force_image_creation_lsn)
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
|
||||
@@ -47,6 +47,7 @@ pub(crate) enum LayerManagerLockHolder {
|
||||
ImportPgData,
|
||||
DetachAncestor,
|
||||
Eviction,
|
||||
ComputeImageConsistentLsn,
|
||||
#[cfg(test)]
|
||||
Testing,
|
||||
}
|
||||
|
||||
@@ -48,12 +48,13 @@ indexmap = { workspace = true, features = ["serde"] }
|
||||
ipnet.workspace = true
|
||||
itertools.workspace = true
|
||||
itoa.workspace = true
|
||||
lasso = { workspace = true, features = ["multi-threaded"] }
|
||||
measured = { workspace = true, features = ["lasso"] }
|
||||
measured = { workspace = true, features = ["paracord"] }
|
||||
measured-tokio.workspace = true
|
||||
metrics.workspace = true
|
||||
once_cell.workspace = true
|
||||
opentelemetry = { workspace = true, features = ["trace"] }
|
||||
papaya = "0.2.0"
|
||||
paracord.workspace = true
|
||||
parking_lot.workspace = true
|
||||
parquet.workspace = true
|
||||
parquet_derive.workspace = true
|
||||
@@ -127,4 +128,4 @@ rstest.workspace = true
|
||||
walkdir.workspace = true
|
||||
rand_distr = "0.4"
|
||||
tokio-postgres.workspace = true
|
||||
tracing-test = "0.2"
|
||||
tracing-test = "0.2"
|
||||
|
||||
@@ -87,7 +87,12 @@ impl JwkCacheEntry {
|
||||
self.key_sets
|
||||
.values()
|
||||
// make sure our requested role has access to the key set
|
||||
.filter(|key_set| key_set.role_names.iter().any(|role| **role == **role_name))
|
||||
.filter(|key_set| {
|
||||
key_set
|
||||
.role_names
|
||||
.iter()
|
||||
.any(|role| *role.as_str() == **role_name)
|
||||
})
|
||||
// try and find the requested key-id in the key set
|
||||
.find_map(|key_set| {
|
||||
key_set
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::context::RequestContext;
|
||||
use crate::control_plane::NodeInfo;
|
||||
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo};
|
||||
use crate::http;
|
||||
use crate::intern::{BranchIdTag, EndpointIdTag, InternId, ProjectIdTag};
|
||||
use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
|
||||
use crate::types::EndpointId;
|
||||
use crate::url::ApiUrl;
|
||||
|
||||
@@ -38,9 +38,9 @@ impl LocalBackend {
|
||||
},
|
||||
// TODO(conrad): make this better reflect compute info rather than endpoint info.
|
||||
aux: MetricsAuxInfo {
|
||||
endpoint_id: EndpointIdTag::get_interner().get_or_intern("local"),
|
||||
project_id: ProjectIdTag::get_interner().get_or_intern("local"),
|
||||
branch_id: BranchIdTag::get_interner().get_or_intern("local"),
|
||||
endpoint_id: EndpointIdInt::new("local"),
|
||||
project_id: ProjectIdInt::new("local"),
|
||||
branch_id: BranchIdInt::new("local"),
|
||||
compute_id: "local".into(),
|
||||
cold_start_info: ColdStartInfo::WarmCached,
|
||||
},
|
||||
|
||||
@@ -192,6 +192,7 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
jemalloc,
|
||||
neon_metrics,
|
||||
proxy: crate::metrics::Metrics::get(),
|
||||
tokio: measured_tokio::RuntimeCollector::current(),
|
||||
},
|
||||
));
|
||||
|
||||
|
||||
@@ -513,6 +513,7 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
jemalloc,
|
||||
neon_metrics,
|
||||
proxy: crate::metrics::Metrics::get(),
|
||||
tokio: measured_tokio::RuntimeCollector::current(),
|
||||
},
|
||||
));
|
||||
maintenance_tasks.spawn(control_plane::mgmt::task_main(mgmt_listener));
|
||||
|
||||
8
proxy/src/cache/project_info.rs
vendored
8
proxy/src/cache/project_info.rs
vendored
@@ -194,7 +194,7 @@ impl ProjectInfoCacheImpl {
|
||||
&self,
|
||||
endpoint_id: &EndpointId,
|
||||
) -> Option<Ref<'_, EndpointIdInt, EndpointInfo>> {
|
||||
let endpoint_id = EndpointIdInt::get(endpoint_id)?;
|
||||
let endpoint_id = EndpointIdInt::try_new_existing(endpoint_id)?;
|
||||
self.cache.get(&endpoint_id)
|
||||
}
|
||||
|
||||
@@ -204,7 +204,7 @@ impl ProjectInfoCacheImpl {
|
||||
role_name: &RoleName,
|
||||
) -> Option<RoleAccessControl> {
|
||||
let valid_since = self.get_cache_times();
|
||||
let role_name = RoleNameInt::get(role_name)?;
|
||||
let role_name = RoleNameInt::try_new_existing(role_name)?;
|
||||
let endpoint_info = self.get_endpoint_cache(endpoint_id)?;
|
||||
endpoint_info.get_role_secret(role_name, valid_since)
|
||||
}
|
||||
@@ -297,10 +297,10 @@ impl ProjectInfoCacheImpl {
|
||||
}
|
||||
|
||||
pub fn maybe_invalidate_role_secret(&self, endpoint_id: &EndpointId, role_name: &RoleName) {
|
||||
let Some(endpoint_id) = EndpointIdInt::get(endpoint_id) else {
|
||||
let Some(endpoint_id) = EndpointIdInt::try_new_existing(endpoint_id) else {
|
||||
return;
|
||||
};
|
||||
let Some(role_name) = RoleNameInt::get(role_name) else {
|
||||
let Some(role_name) = RoleNameInt::try_new_existing(role_name) else {
|
||||
return;
|
||||
};
|
||||
|
||||
|
||||
@@ -71,6 +71,8 @@ pub struct AppMetrics {
|
||||
pub neon_metrics: NeonMetrics,
|
||||
#[metric(flatten)]
|
||||
pub proxy: &'static crate::metrics::Metrics,
|
||||
#[metric(namespace = "tokio")]
|
||||
pub tokio: measured_tokio::RuntimeCollector,
|
||||
}
|
||||
|
||||
async fn prometheus_metrics_handler(
|
||||
|
||||
@@ -1,272 +1,59 @@
|
||||
use std::hash::BuildHasherDefault;
|
||||
use std::marker::PhantomData;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::ops::Index;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use lasso::{Capacity, MemoryLimits, Spur, ThreadedRodeo};
|
||||
use rustc_hash::FxHasher;
|
||||
use paracord::custom_key;
|
||||
|
||||
use crate::types::{AccountId, BranchId, EndpointId, ProjectId, RoleName};
|
||||
|
||||
pub trait InternId: Sized + 'static {
|
||||
fn get_interner() -> &'static StringInterner<Self>;
|
||||
}
|
||||
custom_key!(pub struct RoleNameInt);
|
||||
custom_key!(pub struct EndpointIdInt);
|
||||
custom_key!(pub struct BranchIdInt);
|
||||
custom_key!(pub struct ProjectIdInt);
|
||||
custom_key!(pub struct AccountIdInt);
|
||||
|
||||
pub struct StringInterner<Id> {
|
||||
inner: ThreadedRodeo<Spur, BuildHasherDefault<FxHasher>>,
|
||||
_id: PhantomData<Id>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Debug, Clone, Copy, Eq, Hash)]
|
||||
pub struct InternedString<Id> {
|
||||
inner: Spur,
|
||||
_id: PhantomData<Id>,
|
||||
}
|
||||
|
||||
impl<Id: InternId> std::fmt::Display for InternedString<Id> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.as_str().fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Id: InternId> InternedString<Id> {
|
||||
pub(crate) fn as_str(&self) -> &'static str {
|
||||
Id::get_interner().inner.resolve(&self.inner)
|
||||
}
|
||||
pub(crate) fn get(s: &str) -> Option<Self> {
|
||||
Id::get_interner().get(s)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Id: InternId> AsRef<str> for InternedString<Id> {
|
||||
fn as_ref(&self) -> &str {
|
||||
self.as_str()
|
||||
}
|
||||
}
|
||||
|
||||
impl<Id: InternId> std::ops::Deref for InternedString<Id> {
|
||||
type Target = str;
|
||||
fn deref(&self) -> &str {
|
||||
self.as_str()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de, Id: InternId> serde::de::Deserialize<'de> for InternedString<Id> {
|
||||
fn deserialize<D: serde::de::Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
|
||||
struct Visitor<Id>(PhantomData<Id>);
|
||||
impl<Id: InternId> serde::de::Visitor<'_> for Visitor<Id> {
|
||||
type Value = InternedString<Id>;
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
formatter.write_str("a string")
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
Ok(Id::get_interner().get_or_intern(v))
|
||||
}
|
||||
}
|
||||
d.deserialize_str(Visitor::<Id>(PhantomData))
|
||||
}
|
||||
}
|
||||
|
||||
impl<Id: InternId> serde::Serialize for InternedString<Id> {
|
||||
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
|
||||
self.as_str().serialize(s)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Id: InternId> StringInterner<Id> {
|
||||
pub(crate) fn new() -> Self {
|
||||
StringInterner {
|
||||
inner: ThreadedRodeo::with_capacity_memory_limits_and_hasher(
|
||||
Capacity::new(2500, NonZeroUsize::new(1 << 16).expect("value is nonzero")),
|
||||
// unbounded
|
||||
MemoryLimits::for_memory_usage(usize::MAX),
|
||||
BuildHasherDefault::<FxHasher>::default(),
|
||||
),
|
||||
_id: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn len(&self) -> usize {
|
||||
self.inner.len()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn current_memory_usage(&self) -> usize {
|
||||
self.inner.current_memory_usage()
|
||||
}
|
||||
|
||||
pub(crate) fn get_or_intern(&self, s: &str) -> InternedString<Id> {
|
||||
InternedString {
|
||||
inner: self.inner.get_or_intern(s),
|
||||
_id: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get(&self, s: &str) -> Option<InternedString<Id>> {
|
||||
Some(InternedString {
|
||||
inner: self.inner.get(s)?,
|
||||
_id: PhantomData,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<Id: InternId> Index<InternedString<Id>> for StringInterner<Id> {
|
||||
type Output = str;
|
||||
|
||||
fn index(&self, index: InternedString<Id>) -> &Self::Output {
|
||||
self.inner.resolve(&index.inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Id: InternId> Default for StringInterner<Id> {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
|
||||
pub struct RoleNameTag;
|
||||
impl InternId for RoleNameTag {
|
||||
fn get_interner() -> &'static StringInterner<Self> {
|
||||
static ROLE_NAMES: OnceLock<StringInterner<RoleNameTag>> = OnceLock::new();
|
||||
ROLE_NAMES.get_or_init(Default::default)
|
||||
}
|
||||
}
|
||||
pub type RoleNameInt = InternedString<RoleNameTag>;
|
||||
impl From<&RoleName> for RoleNameInt {
|
||||
fn from(value: &RoleName) -> Self {
|
||||
RoleNameTag::get_interner().get_or_intern(value)
|
||||
RoleNameInt::new(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
|
||||
pub struct EndpointIdTag;
|
||||
impl InternId for EndpointIdTag {
|
||||
fn get_interner() -> &'static StringInterner<Self> {
|
||||
static ROLE_NAMES: OnceLock<StringInterner<EndpointIdTag>> = OnceLock::new();
|
||||
ROLE_NAMES.get_or_init(Default::default)
|
||||
}
|
||||
}
|
||||
pub type EndpointIdInt = InternedString<EndpointIdTag>;
|
||||
impl From<&EndpointId> for EndpointIdInt {
|
||||
fn from(value: &EndpointId) -> Self {
|
||||
EndpointIdTag::get_interner().get_or_intern(value)
|
||||
EndpointIdInt::new(value)
|
||||
}
|
||||
}
|
||||
impl From<EndpointId> for EndpointIdInt {
|
||||
fn from(value: EndpointId) -> Self {
|
||||
EndpointIdTag::get_interner().get_or_intern(&value)
|
||||
EndpointIdInt::new(&value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
|
||||
pub struct BranchIdTag;
|
||||
impl InternId for BranchIdTag {
|
||||
fn get_interner() -> &'static StringInterner<Self> {
|
||||
static ROLE_NAMES: OnceLock<StringInterner<BranchIdTag>> = OnceLock::new();
|
||||
ROLE_NAMES.get_or_init(Default::default)
|
||||
}
|
||||
}
|
||||
pub type BranchIdInt = InternedString<BranchIdTag>;
|
||||
impl From<&BranchId> for BranchIdInt {
|
||||
fn from(value: &BranchId) -> Self {
|
||||
BranchIdTag::get_interner().get_or_intern(value)
|
||||
BranchIdInt::new(value)
|
||||
}
|
||||
}
|
||||
impl From<BranchId> for BranchIdInt {
|
||||
fn from(value: BranchId) -> Self {
|
||||
BranchIdTag::get_interner().get_or_intern(&value)
|
||||
BranchIdInt::new(&value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
|
||||
pub struct ProjectIdTag;
|
||||
impl InternId for ProjectIdTag {
|
||||
fn get_interner() -> &'static StringInterner<Self> {
|
||||
static ROLE_NAMES: OnceLock<StringInterner<ProjectIdTag>> = OnceLock::new();
|
||||
ROLE_NAMES.get_or_init(Default::default)
|
||||
}
|
||||
}
|
||||
pub type ProjectIdInt = InternedString<ProjectIdTag>;
|
||||
impl From<&ProjectId> for ProjectIdInt {
|
||||
fn from(value: &ProjectId) -> Self {
|
||||
ProjectIdTag::get_interner().get_or_intern(value)
|
||||
ProjectIdInt::new(value)
|
||||
}
|
||||
}
|
||||
impl From<ProjectId> for ProjectIdInt {
|
||||
fn from(value: ProjectId) -> Self {
|
||||
ProjectIdTag::get_interner().get_or_intern(&value)
|
||||
ProjectIdInt::new(&value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
|
||||
pub struct AccountIdTag;
|
||||
impl InternId for AccountIdTag {
|
||||
fn get_interner() -> &'static StringInterner<Self> {
|
||||
static ROLE_NAMES: OnceLock<StringInterner<AccountIdTag>> = OnceLock::new();
|
||||
ROLE_NAMES.get_or_init(Default::default)
|
||||
}
|
||||
}
|
||||
pub type AccountIdInt = InternedString<AccountIdTag>;
|
||||
impl From<&AccountId> for AccountIdInt {
|
||||
fn from(value: &AccountId) -> Self {
|
||||
AccountIdTag::get_interner().get_or_intern(value)
|
||||
AccountIdInt::new(value)
|
||||
}
|
||||
}
|
||||
impl From<AccountId> for AccountIdInt {
|
||||
fn from(value: AccountId) -> Self {
|
||||
AccountIdTag::get_interner().get_or_intern(&value)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use super::InternId;
|
||||
use crate::intern::StringInterner;
|
||||
|
||||
struct MyId;
|
||||
impl InternId for MyId {
|
||||
fn get_interner() -> &'static StringInterner<Self> {
|
||||
pub(crate) static ROLE_NAMES: OnceLock<StringInterner<MyId>> = OnceLock::new();
|
||||
ROLE_NAMES.get_or_init(Default::default)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn push_many_strings() {
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use rand_distr::Zipf;
|
||||
|
||||
let endpoint_dist = Zipf::new(500000, 0.8).unwrap();
|
||||
let endpoints = StdRng::seed_from_u64(272488357).sample_iter(endpoint_dist);
|
||||
|
||||
let interner = MyId::get_interner();
|
||||
|
||||
const N: usize = 100_000;
|
||||
let mut verify = Vec::with_capacity(N);
|
||||
for endpoint in endpoints.take(N) {
|
||||
let endpoint = format!("ep-string-interning-{endpoint}");
|
||||
let key = interner.get_or_intern(&endpoint);
|
||||
verify.push((endpoint, key));
|
||||
}
|
||||
|
||||
for (s, key) in verify {
|
||||
assert_eq!(interner[key], s);
|
||||
}
|
||||
|
||||
// 2031616/59861 = 34 bytes per string
|
||||
assert_eq!(interner.len(), 59_861);
|
||||
// will have other overhead for the internal hashmaps that are not accounted for.
|
||||
assert_eq!(interner.current_memory_usage(), 2_031_616);
|
||||
AccountIdInt::new(&value)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::sync::{Arc, OnceLock};
|
||||
|
||||
use lasso::ThreadedRodeo;
|
||||
use measured::label::{
|
||||
FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet,
|
||||
};
|
||||
@@ -11,6 +10,7 @@ use measured::{
|
||||
MetricGroup,
|
||||
};
|
||||
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec};
|
||||
use paracord::ParaCord;
|
||||
use tokio::time::{self, Instant};
|
||||
|
||||
use crate::control_plane::messages::ColdStartInfo;
|
||||
@@ -222,7 +222,7 @@ pub enum CacheOutcome {
|
||||
#[derive(LabelGroup)]
|
||||
#[label(set = ConsoleRequestSet)]
|
||||
pub struct ConsoleRequest<'a> {
|
||||
#[label(dynamic_with = ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = ParaCord, default)]
|
||||
pub request: &'a str,
|
||||
}
|
||||
|
||||
@@ -345,7 +345,7 @@ pub struct ConnectionFailuresBreakdownGroup {
|
||||
#[derive(LabelGroup, Copy, Clone)]
|
||||
#[label(set = RedisErrorsSet)]
|
||||
pub struct RedisErrors<'a> {
|
||||
#[label(dynamic_with = ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = ParaCord, default)]
|
||||
pub channel: &'a str,
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::intern::{EndpointIdInt, EndpointIdTag, InternId};
|
||||
use crate::intern::EndpointIdInt;
|
||||
|
||||
macro_rules! smol_str_wrapper {
|
||||
($name:ident) => {
|
||||
@@ -85,7 +85,7 @@ impl EndpointId {
|
||||
|
||||
#[must_use]
|
||||
pub fn normalize_intern(&self) -> EndpointIdInt {
|
||||
EndpointIdTag::get_interner().get_or_intern(self.normalize_str())
|
||||
EndpointIdInt::new(self.normalize_str())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,10 +31,10 @@ humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
itertools.workspace = true
|
||||
json-structural-diff.workspace = true
|
||||
lasso.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
paracord.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
posthog_client_lite.workspace = true
|
||||
rand.workspace = true
|
||||
@@ -72,4 +72,4 @@ http-utils = { path = "../libs/http-utils/" }
|
||||
utils = { path = "../libs/utils/" }
|
||||
metrics = { path = "../libs/metrics/" }
|
||||
control_plane = { path = "../control_plane" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
|
||||
@@ -850,6 +850,31 @@ async fn handle_tenant_describe(
|
||||
json_response(StatusCode::OK, service.tenant_describe(tenant_id)?)
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
async fn handle_tenant_timeline_describe(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Scrubber)?;
|
||||
|
||||
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
||||
match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
}
|
||||
ForwardOutcome::NotForwarded(_req) => {}
|
||||
};
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
service
|
||||
.tenant_timeline_describe(tenant_id, timeline_id)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
async fn handle_tenant_list(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
@@ -2480,6 +2505,13 @@ pub fn make_router(
|
||||
)
|
||||
})
|
||||
// Timeline operations
|
||||
.get("/control/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
handle_tenant_timeline_describe,
|
||||
RequestName("v1_tenant_timeline_describe"),
|
||||
)
|
||||
})
|
||||
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
|
||||
@@ -183,9 +183,9 @@ impl Default for StorageControllerMetrics {
|
||||
#[derive(measured::LabelGroup, Clone)]
|
||||
#[label(set = NodeLabelGroupSet)]
|
||||
pub(crate) struct NodeLabelGroup<'a> {
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = paracord::ParaCord, default)]
|
||||
pub(crate) az: &'a str,
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = paracord::ParaCord, default)]
|
||||
pub(crate) node_id: &'a str,
|
||||
}
|
||||
|
||||
@@ -198,7 +198,7 @@ pub(crate) struct ReconcileCompleteLabelGroup {
|
||||
#[derive(measured::LabelGroup)]
|
||||
#[label(set = HttpRequestStatusLabelGroupSet)]
|
||||
pub(crate) struct HttpRequestStatusLabelGroup<'a> {
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = paracord::ParaCord, default)]
|
||||
pub(crate) path: &'a str,
|
||||
pub(crate) method: Method,
|
||||
pub(crate) status: StatusCode,
|
||||
@@ -207,7 +207,7 @@ pub(crate) struct HttpRequestStatusLabelGroup<'a> {
|
||||
#[derive(measured::LabelGroup)]
|
||||
#[label(set = HttpRequestLatencyLabelGroupSet)]
|
||||
pub(crate) struct HttpRequestLatencyLabelGroup<'a> {
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = paracord::ParaCord, default)]
|
||||
pub(crate) path: &'a str,
|
||||
pub(crate) method: Method,
|
||||
}
|
||||
@@ -215,9 +215,9 @@ pub(crate) struct HttpRequestLatencyLabelGroup<'a> {
|
||||
#[derive(measured::LabelGroup, Clone)]
|
||||
#[label(set = PageserverRequestLabelGroupSet)]
|
||||
pub(crate) struct PageserverRequestLabelGroup<'a> {
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = paracord::ParaCord, default)]
|
||||
pub(crate) pageserver_id: &'a str,
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = paracord::ParaCord, default)]
|
||||
pub(crate) path: &'a str,
|
||||
pub(crate) method: Method,
|
||||
}
|
||||
@@ -225,9 +225,9 @@ pub(crate) struct PageserverRequestLabelGroup<'a> {
|
||||
#[derive(measured::LabelGroup, Clone)]
|
||||
#[label(set = SafekeeperRequestLabelGroupSet)]
|
||||
pub(crate) struct SafekeeperRequestLabelGroup<'a> {
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = paracord::ParaCord, default)]
|
||||
pub(crate) safekeeper_id: &'a str,
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = paracord::ParaCord, default)]
|
||||
pub(crate) path: &'a str,
|
||||
pub(crate) method: Method,
|
||||
}
|
||||
@@ -254,11 +254,11 @@ pub(crate) struct LeadershipStatusGroup {
|
||||
#[derive(measured::LabelGroup, Clone)]
|
||||
#[label(set = ReconcileLongRunningLabelGroupSet)]
|
||||
pub(crate) struct ReconcileLongRunningLabelGroup<'a> {
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = paracord::ParaCord, default)]
|
||||
pub(crate) tenant_id: &'a str,
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = paracord::ParaCord, default)]
|
||||
pub(crate) shard_number: &'a str,
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = paracord::ParaCord, default)]
|
||||
pub(crate) sequence: &'a str,
|
||||
}
|
||||
|
||||
@@ -282,11 +282,11 @@ pub(crate) enum Method {
|
||||
#[derive(measured::LabelGroup, Clone)]
|
||||
#[label(set = SafekeeperReconcilerLabelGroupSet)]
|
||||
pub(crate) struct SafekeeperReconcilerLabelGroup<'a> {
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = paracord::ParaCord, default)]
|
||||
pub(crate) sk_az: &'a str,
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = paracord::ParaCord, default)]
|
||||
pub(crate) sk_node_id: &'a str,
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
#[label(dynamic_with = paracord::ParaCord, default)]
|
||||
pub(crate) sk_hostname: &'a str,
|
||||
}
|
||||
|
||||
|
||||
@@ -86,6 +86,23 @@ impl PageserverClient {
|
||||
)
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
pub(crate) async fn tenant_timeline_describe(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
) -> Result<TimelineInfo> {
|
||||
measured_request!(
|
||||
"tenant_timeline_describe",
|
||||
crate::metrics::Method::Get,
|
||||
&self.node_id_label,
|
||||
self.inner
|
||||
.tenant_timeline_describe(tenant_shard_id, timeline_id,)
|
||||
.await
|
||||
)
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
pub(crate) async fn tenant_scan_remote_storage(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -32,7 +32,7 @@ use pageserver_api::controller_api::{
|
||||
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
|
||||
SkSchedulingPolicy, TenantCreateRequest, TenantCreateResponse, TenantCreateResponseShard,
|
||||
TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
|
||||
TenantShardMigrateRequest, TenantShardMigrateResponse,
|
||||
TenantShardMigrateRequest, TenantShardMigrateResponse, TenantTimelineDescribeResponse,
|
||||
};
|
||||
use pageserver_api::models::{
|
||||
self, DetachBehavior, LocationConfig, LocationConfigListResponse, LocationConfigMode, LsnLease,
|
||||
@@ -5486,6 +5486,92 @@ impl Service {
|
||||
.ok_or_else(|| ApiError::NotFound(anyhow::anyhow!("Tenant {tenant_id} not found").into()))
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
pub(crate) async fn tenant_timeline_describe(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<TenantTimelineDescribeResponse, ApiError> {
|
||||
self.tenant_remote_mutation(tenant_id, |locations| async move {
|
||||
if locations.0.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
));
|
||||
};
|
||||
|
||||
let locations: Vec<(TenantShardId, Node)> = locations
|
||||
.0
|
||||
.iter()
|
||||
.map(|t| (*t.0, t.1.latest.node.clone()))
|
||||
.collect();
|
||||
let mut futs = FuturesUnordered::new();
|
||||
|
||||
for (shard_id, node) in locations {
|
||||
futs.push({
|
||||
async move {
|
||||
let result = node
|
||||
.with_client_retries(
|
||||
|client| async move {
|
||||
client
|
||||
.tenant_timeline_describe(&shard_id, &timeline_id)
|
||||
.await
|
||||
},
|
||||
&self.http_client,
|
||||
&self.config.pageserver_jwt_token,
|
||||
3,
|
||||
3,
|
||||
Duration::from_secs(30),
|
||||
&self.cancel,
|
||||
)
|
||||
.await;
|
||||
(result, shard_id, node.get_id())
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let mut results: Vec<TimelineInfo> = Vec::new();
|
||||
while let Some((result, tenant_shard_id, node_id)) = futs.next().await {
|
||||
match result {
|
||||
Some(Ok(timeline_info)) => results.push(timeline_info),
|
||||
Some(Err(e)) => {
|
||||
tracing::warn!(
|
||||
"Failed to describe tenant {} timeline {} for pageserver {}: {e}",
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node_id,
|
||||
);
|
||||
return Err(ApiError::ResourceUnavailable(format!("{e}").into()));
|
||||
}
|
||||
None => return Err(ApiError::Cancelled),
|
||||
}
|
||||
}
|
||||
let mut image_consistent_lsn: Option<Lsn> = Some(Lsn::MAX);
|
||||
for timeline_info in &results {
|
||||
if let Some(tline_image_consistent_lsn) = timeline_info.image_consistent_lsn {
|
||||
image_consistent_lsn = Some(std::cmp::min(
|
||||
image_consistent_lsn.unwrap(),
|
||||
tline_image_consistent_lsn,
|
||||
));
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"Timeline {} on shard {} does not have image consistent lsn",
|
||||
timeline_info.timeline_id,
|
||||
timeline_info.tenant_id
|
||||
);
|
||||
image_consistent_lsn = None;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(TenantTimelineDescribeResponse {
|
||||
shards: results,
|
||||
image_consistent_lsn,
|
||||
})
|
||||
})
|
||||
.await?
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
/// limit & offset are pagination parameters. Since we are walking an in-memory HashMap, `offset` does not
|
||||
/// avoid traversing data, it just avoid returning it. This is suitable for our purposes, since our in memory
|
||||
/// maps are small enough to traverse fast, our pagination is just to avoid serializing huge JSON responses
|
||||
|
||||
@@ -25,7 +25,8 @@ use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
|
||||
use safekeeper_api::PgVersionId;
|
||||
use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
|
||||
use safekeeper_api::models::{
|
||||
PullTimelineRequest, TimelineMembershipSwitchRequest, TimelineMembershipSwitchResponse,
|
||||
PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest,
|
||||
TimelineMembershipSwitchResponse,
|
||||
};
|
||||
use safekeeper_api::{INITIAL_TERM, Term};
|
||||
use safekeeper_client::mgmt_api;
|
||||
@@ -37,13 +38,6 @@ use utils::lsn::Lsn;
|
||||
|
||||
use super::Service;
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Clone)]
|
||||
pub struct TimelineLocateResponse {
|
||||
pub generation: SafekeeperGeneration,
|
||||
pub sk_set: Vec<NodeId>,
|
||||
pub new_sk_set: Option<Vec<NodeId>>,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
fn make_member_set(safekeepers: &[Safekeeper]) -> Result<MemberSet, ApiError> {
|
||||
let members = safekeepers
|
||||
|
||||
@@ -2342,6 +2342,20 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
# HADRON
|
||||
def tenant_timeline_describe(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
):
|
||||
response = self.request(
|
||||
"GET",
|
||||
f"{self.api}/control/v1/tenant/{tenant_id}/timeline/{timeline_id}",
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def nodes(self):
|
||||
"""
|
||||
:return: list of {"id": ""}
|
||||
|
||||
@@ -960,9 +960,9 @@ def get_layer_map(env, tenant_shard_id, timeline_id, ps_id):
|
||||
return image_layer_count, delta_layer_count
|
||||
|
||||
|
||||
def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder):
|
||||
def test_image_layer_force_creation_period(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Tests that page server can force creating new images if image creation timeout is enabled
|
||||
Tests that page server can force creating new images if image_layer_force_creation_period is enabled
|
||||
"""
|
||||
# use large knobs to disable L0 compaction/image creation except for the force image creation
|
||||
tenant_conf = {
|
||||
@@ -972,10 +972,10 @@ def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder):
|
||||
"checkpoint_distance": 10 * 1024,
|
||||
"checkpoint_timeout": "1s",
|
||||
"image_layer_force_creation_period": "1s",
|
||||
# The lsn for forced image layer creations is calculated once every 10 minutes.
|
||||
# Hence, drive compaction manually such that the test doesn't compute it at the
|
||||
# wrong time.
|
||||
"compaction_period": "0s",
|
||||
"pitr_interval": "10s",
|
||||
"gc_period": "1s",
|
||||
"compaction_period": "1s",
|
||||
"lsn_lease_length": "1s",
|
||||
}
|
||||
|
||||
# consider every tenant large to run the image layer generation check more eagerly
|
||||
@@ -1018,4 +1018,69 @@ def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
|
||||
|
||||
def test_image_consistent_lsn(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test the /v1/tenant/<tenant_id>/timeline/<timeline_id> endpoint and the computation of image_consistent_lsn
|
||||
"""
|
||||
# use large knobs to disable L0 compaction/image creation except for the force image creation
|
||||
tenant_conf = {
|
||||
"compaction_threshold": "100",
|
||||
"image_creation_threshold": "100",
|
||||
"image_layer_creation_check_threshold": "1",
|
||||
"checkpoint_distance": 10 * 1024,
|
||||
"checkpoint_timeout": "1s",
|
||||
"image_layer_force_creation_period": "1s",
|
||||
"pitr_interval": "10s",
|
||||
"gc_period": "1s",
|
||||
"compaction_period": "1s",
|
||||
"lsn_lease_length": "1s",
|
||||
}
|
||||
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start(
|
||||
initial_tenant_conf=tenant_conf,
|
||||
initial_tenant_shard_count=4,
|
||||
initial_tenant_shard_stripe_size=1,
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
endpoint.safe_psql("CREATE TABLE foo (id INTEGER, val text)")
|
||||
for v in range(10):
|
||||
endpoint.safe_psql(
|
||||
f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))", log_query=False
|
||||
)
|
||||
|
||||
response = env.storage_controller.tenant_timeline_describe(tenant_id, timeline_id)
|
||||
shards = response["shards"]
|
||||
for shard in shards:
|
||||
assert shard["image_consistent_lsn"] is not None
|
||||
image_consistent_lsn = response["image_consistent_lsn"]
|
||||
assert image_consistent_lsn is not None
|
||||
|
||||
# do more writes and wait for image_consistent_lsn to advance
|
||||
for v in range(100):
|
||||
endpoint.safe_psql(
|
||||
f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))", log_query=False
|
||||
)
|
||||
|
||||
def check_image_consistent_lsn_advanced():
|
||||
response = env.storage_controller.tenant_timeline_describe(tenant_id, timeline_id)
|
||||
new_image_consistent_lsn = response["image_consistent_lsn"]
|
||||
shards = response["shards"]
|
||||
for shard in shards:
|
||||
print(f"shard {shard['tenant_id']} image_consistent_lsn{shard['image_consistent_lsn']}")
|
||||
assert new_image_consistent_lsn != image_consistent_lsn
|
||||
|
||||
wait_until(check_image_consistent_lsn_advanced)
|
||||
|
||||
endpoint.stop_and_destroy()
|
||||
|
||||
for ps in env.pageservers:
|
||||
ps.allowed_errors.append(".*created delta file of size.*larger than double of target.*")
|
||||
|
||||
|
||||
# END_HADRON
|
||||
|
||||
@@ -72,6 +72,7 @@ num-rational = { version = "0.4", default-features = false, features = ["num-big
|
||||
num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
once_cell = { version = "1" }
|
||||
p256 = { version = "0.13", features = ["jwk"] }
|
||||
paracord = { version = "0.1", features = ["serde"] }
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
prost = { version = "0.13", features = ["no-recursion-limit", "prost-derive"] }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
|
||||
Reference in New Issue
Block a user