mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 20:50:37 +00:00
Compare commits
224 Commits
problame/b
...
release-42
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a113c3e433 | ||
|
|
e81fc598f4 | ||
|
|
48b845fa76 | ||
|
|
27096858dc | ||
|
|
87b8ac3ec3 | ||
|
|
6b1c4cc983 | ||
|
|
831fad46d5 | ||
|
|
53851ea8ec | ||
|
|
044375732a | ||
|
|
ea63b43009 | ||
|
|
a56fd45f56 | ||
|
|
582a42762b | ||
|
|
f5dfa6f140 | ||
|
|
f8d9bd8d14 | ||
|
|
04e6c09f14 | ||
|
|
54327bbeec | ||
|
|
35f243e787 | ||
|
|
b7a988ba46 | ||
|
|
a0e61145c8 | ||
|
|
6afbadc90e | ||
|
|
4430d0ae7d | ||
|
|
6e183aa0de | ||
|
|
fd6d0b7635 | ||
|
|
3710c32aae | ||
|
|
be83bee49d | ||
|
|
cf28e5922a | ||
|
|
7d384d6953 | ||
|
|
4b3b37b912 | ||
|
|
1d8d200f4d | ||
|
|
0d80d6ce18 | ||
|
|
f653ee039f | ||
|
|
e614a95853 | ||
|
|
850db4cc13 | ||
|
|
8a316b1277 | ||
|
|
4d13bae449 | ||
|
|
49377abd98 | ||
|
|
a6b2f4e54e | ||
|
|
face60d50b | ||
|
|
9768aa27f2 | ||
|
|
96b2e575e1 | ||
|
|
7222777784 | ||
|
|
5469fdede0 | ||
|
|
72aa6b9fdd | ||
|
|
ae0634b7be | ||
|
|
70711f32fa | ||
|
|
52a88af0aa | ||
|
|
b7a43bf817 | ||
|
|
dce91b33a4 | ||
|
|
23ee4f3050 | ||
|
|
46857e8282 | ||
|
|
368ab0ce54 | ||
|
|
a5987eebfd | ||
|
|
6686ede30f | ||
|
|
373c7057cc | ||
|
|
7d6ec16166 | ||
|
|
0e6fdc8a58 | ||
|
|
521438a5c6 | ||
|
|
07d7874bc8 | ||
|
|
1804111a02 | ||
|
|
cd0178efed | ||
|
|
333574be57 | ||
|
|
79a799a143 | ||
|
|
9da06af6c9 | ||
|
|
ce1753d036 | ||
|
|
67db8432b4 | ||
|
|
4e2e44e524 | ||
|
|
ed786104f3 | ||
|
|
84b74f2bd1 | ||
|
|
fec2ad6283 | ||
|
|
98eebd4682 | ||
|
|
2f74287c9b | ||
|
|
aee1bf95e3 | ||
|
|
b9de9d75ff | ||
|
|
7943b709e6 | ||
|
|
d7d066d493 | ||
|
|
e78ac22107 | ||
|
|
76a8f2bb44 | ||
|
|
8d59a8581f | ||
|
|
b1ddd01289 | ||
|
|
6eae4fc9aa | ||
|
|
765455bca2 | ||
|
|
4204960942 | ||
|
|
67345d66ea | ||
|
|
2266ee5971 | ||
|
|
b58445d855 | ||
|
|
36050e7f3d | ||
|
|
33360ed96d | ||
|
|
39a28d1108 | ||
|
|
efa6aa134f | ||
|
|
2c724e56e2 | ||
|
|
feff887c6f | ||
|
|
353d915fcf | ||
|
|
2e38098cbc | ||
|
|
a6fe5ea1ac | ||
|
|
05b0aed0c1 | ||
|
|
cd1705357d | ||
|
|
6bc7561290 | ||
|
|
fbd3ac14b5 | ||
|
|
e437787c8f | ||
|
|
3460dbf90b | ||
|
|
6b89d99677 | ||
|
|
6cc8ea86e4 | ||
|
|
e62a492d6f | ||
|
|
a475cdf642 | ||
|
|
7002c79a47 | ||
|
|
ee6cf357b4 | ||
|
|
e5c2086b5f | ||
|
|
5f1208296a | ||
|
|
88e8e473cd | ||
|
|
b0a77844f6 | ||
|
|
1baf464307 | ||
|
|
e9b8e81cea | ||
|
|
85d6194aa4 | ||
|
|
333a7a68ef | ||
|
|
6aa4e41bee | ||
|
|
840183e51f | ||
|
|
cbccc94b03 | ||
|
|
fce227df22 | ||
|
|
bd787e800f | ||
|
|
4a7704b4a3 | ||
|
|
ff1119da66 | ||
|
|
4c3ba1627b | ||
|
|
1407174fb2 | ||
|
|
ec9dcb1889 | ||
|
|
d11d781afc | ||
|
|
4e44565b71 | ||
|
|
4ed51ad33b | ||
|
|
1c1ebe5537 | ||
|
|
c19cb7f386 | ||
|
|
4b97d31b16 | ||
|
|
923ade3dd7 | ||
|
|
b04e711975 | ||
|
|
afd0a6b39a | ||
|
|
99752286d8 | ||
|
|
15df93363c | ||
|
|
bc0ab741af | ||
|
|
51d9dfeaa3 | ||
|
|
f63cb18155 | ||
|
|
0de603d88e | ||
|
|
240913912a | ||
|
|
91a4ea0de2 | ||
|
|
8608704f49 | ||
|
|
efef68ce99 | ||
|
|
8daefd24da | ||
|
|
46cc8b7982 | ||
|
|
38cd90dd0c | ||
|
|
a51b269f15 | ||
|
|
43bf6d0a0f | ||
|
|
15273a9b66 | ||
|
|
78aca668d0 | ||
|
|
acbf4148ea | ||
|
|
6508540561 | ||
|
|
a41b5244a8 | ||
|
|
2b3189be95 | ||
|
|
248563c595 | ||
|
|
14cd6ca933 | ||
|
|
eb36403e71 | ||
|
|
3c6f779698 | ||
|
|
f67f0c1c11 | ||
|
|
edb02d3299 | ||
|
|
664a69e65b | ||
|
|
478322ebf9 | ||
|
|
802f174072 | ||
|
|
47f9890bae | ||
|
|
262265daad | ||
|
|
300da5b872 | ||
|
|
7b22b5c433 | ||
|
|
ffca97bc1e | ||
|
|
cb356f3259 | ||
|
|
c85374295f | ||
|
|
4992160677 | ||
|
|
bd535b3371 | ||
|
|
d90c5a03af | ||
|
|
2d02cc9079 | ||
|
|
49ad94b99f | ||
|
|
948a217398 | ||
|
|
125381eae7 | ||
|
|
cd01bbc715 | ||
|
|
d8b5e3b88d | ||
|
|
06d25f2186 | ||
|
|
f759b561f3 | ||
|
|
ece0555600 | ||
|
|
73ea0a0b01 | ||
|
|
d8f6d6fd6f | ||
|
|
d24de169a7 | ||
|
|
0816168296 | ||
|
|
277b44d57a | ||
|
|
68c2c3880e | ||
|
|
49da498f65 | ||
|
|
2c76ba3dd7 | ||
|
|
dbe3dc69ad | ||
|
|
8e5bb3ed49 | ||
|
|
ab0be7b8da | ||
|
|
b4c55f5d24 | ||
|
|
ede70d833c | ||
|
|
70c3d18bb0 | ||
|
|
7a491f52c4 | ||
|
|
323c4ecb4f | ||
|
|
3d2466607e | ||
|
|
ed478b39f4 | ||
|
|
91585a558d | ||
|
|
93467eae1f | ||
|
|
f3aac81d19 | ||
|
|
979ad60c19 | ||
|
|
9316cb1b1f | ||
|
|
e7939a527a | ||
|
|
36d26665e1 | ||
|
|
873347f977 | ||
|
|
e814ac16f9 | ||
|
|
ad3055d386 | ||
|
|
94e03eb452 | ||
|
|
380f26ef79 | ||
|
|
3c5b7f59d7 | ||
|
|
fee89f80b5 | ||
|
|
41cce8eaf1 | ||
|
|
f88fe0218d | ||
|
|
cc856eca85 | ||
|
|
cf350c6002 | ||
|
|
0ce6b6a0a3 | ||
|
|
73f247d537 | ||
|
|
960be82183 | ||
|
|
806e5a6c19 | ||
|
|
8d5df07cce | ||
|
|
df7a9d1407 |
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -2,7 +2,7 @@ name: Create Release Branch
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: '0 7 * * 5'
|
||||
- cron: '0 6 * * 1'
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
|
||||
85
Cargo.lock
generated
85
Cargo.lock
generated
@@ -193,6 +193,8 @@ dependencies = [
|
||||
"memchr",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"zstd",
|
||||
"zstd-safe",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1955,20 +1957,6 @@ dependencies = [
|
||||
"hashbrown 0.13.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hdrhistogram"
|
||||
version = "7.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
|
||||
dependencies = [
|
||||
"base64 0.21.1",
|
||||
"byteorder",
|
||||
"crossbeam-channel",
|
||||
"flate2",
|
||||
"nom",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heapless"
|
||||
version = "0.8.0"
|
||||
@@ -2648,16 +2636,6 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.46.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
|
||||
dependencies = [
|
||||
"overload",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-bigint"
|
||||
version = "0.4.3"
|
||||
@@ -2918,32 +2896,6 @@ version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
|
||||
|
||||
[[package]]
|
||||
name = "overload"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||
|
||||
[[package]]
|
||||
name = "pagebench"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
"futures",
|
||||
"hdrhistogram",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"pageserver",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pagectl"
|
||||
version = "0.1.0"
|
||||
@@ -2955,6 +2907,8 @@ dependencies = [
|
||||
"git-version",
|
||||
"pageserver",
|
||||
"postgres_ffi",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"svg_fmt",
|
||||
"tokio",
|
||||
"utils",
|
||||
@@ -3029,13 +2983,10 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-io-timeout",
|
||||
"tokio-postgres",
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"tracing-chrome",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"utils",
|
||||
"walkdir",
|
||||
@@ -5271,17 +5222,6 @@ dependencies = [
|
||||
"syn 2.0.28",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-chrome"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "496b3cd5447f7ff527bbbf19b071ad542a000adf297d4127078b4dfdb931f41a"
|
||||
dependencies = [
|
||||
"serde_json",
|
||||
"tracing-core",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-core"
|
||||
version = "0.1.31"
|
||||
@@ -5302,17 +5242,6 @@ dependencies = [
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-flame"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0bae117ee14789185e129aaee5d93750abe67fdc5a9a62650452bfe4e122a3a9"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-futures"
|
||||
version = "0.2.5"
|
||||
@@ -5365,7 +5294,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex",
|
||||
"serde",
|
||||
@@ -5580,9 +5508,7 @@ dependencies = [
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"tracing-chrome",
|
||||
"tracing-error",
|
||||
"tracing-flame",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
"uuid",
|
||||
@@ -6109,6 +6035,9 @@ dependencies = [
|
||||
"tungstenite",
|
||||
"url",
|
||||
"uuid",
|
||||
"zstd",
|
||||
"zstd-safe",
|
||||
"zstd-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -5,7 +5,6 @@ members = [
|
||||
"control_plane",
|
||||
"pageserver",
|
||||
"pageserver/ctl",
|
||||
"pageserver/pagebench",
|
||||
"proxy",
|
||||
"safekeeper",
|
||||
"storage_broker",
|
||||
@@ -38,7 +37,7 @@ license = "Apache-2.0"
|
||||
[workspace.dependencies]
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
azure_core = "0.16"
|
||||
azure_identity = "0.16"
|
||||
azure_storage = "0.16"
|
||||
@@ -80,7 +79,6 @@ futures-util = "0.3"
|
||||
git-version = "0.3"
|
||||
hashbrown = "0.13"
|
||||
hashlink = "0.8.1"
|
||||
hdrhistogram = "7.5.2"
|
||||
hex = "0.4"
|
||||
hex-literal = "0.4"
|
||||
hmac = "0.12.1"
|
||||
|
||||
@@ -698,6 +698,7 @@ impl ComputeNode {
|
||||
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(spec, &mut client, self.connstr.as_str())?;
|
||||
handle_extensions(spec, &mut client)?;
|
||||
handle_extension_neon(&mut client)?;
|
||||
create_availability_check_data(&mut client)?;
|
||||
|
||||
// 'Close' connection
|
||||
@@ -742,6 +743,7 @@ impl ComputeNode {
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(&spec, &mut client, self.connstr.as_str())?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
handle_extension_neon(&mut client)?;
|
||||
}
|
||||
|
||||
// 'Close' connection
|
||||
|
||||
@@ -674,3 +674,33 @@ pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()>
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Run CREATE and ALTER EXTENSION neon UPDATE for postgres database
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
|
||||
info!("handle extension neon");
|
||||
|
||||
let mut query = "CREATE SCHEMA IF NOT EXISTS neon";
|
||||
client.simple_query(query)?;
|
||||
|
||||
query = "CREATE EXTENSION IF NOT EXISTS neon WITH SCHEMA neon";
|
||||
info!("create neon extension with query: {}", query);
|
||||
client.simple_query(query)?;
|
||||
|
||||
query = "UPDATE pg_extension SET extrelocatable = true WHERE extname = 'neon'";
|
||||
client.simple_query(query)?;
|
||||
|
||||
query = "ALTER EXTENSION neon SET SCHEMA neon";
|
||||
info!("alter neon extension schema with query: {}", query);
|
||||
client.simple_query(query)?;
|
||||
|
||||
// this will be a no-op if extension is already up to date,
|
||||
// which may happen in two cases:
|
||||
// - extension was just installed
|
||||
// - extension was already installed and is up to date
|
||||
let query = "ALTER EXTENSION neon UPDATE";
|
||||
info!("update neon extension schema with query: {}", query);
|
||||
client.simple_query(query)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -86,10 +86,7 @@ where
|
||||
.stdout(process_log_file)
|
||||
.stderr(same_file_for_stderr)
|
||||
.args(args);
|
||||
|
||||
let filled_cmd = fill_env_vars_prefixed_neon(fill_remote_storage_secrets_vars(
|
||||
fill_rust_env_vars(background_command),
|
||||
));
|
||||
let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
|
||||
filled_cmd.envs(envs);
|
||||
|
||||
let pid_file_to_check = match initial_pid_file {
|
||||
@@ -256,15 +253,6 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
|
||||
cmd
|
||||
}
|
||||
|
||||
fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command {
|
||||
for (var, val) in std::env::vars() {
|
||||
if var.starts_with("NEON_") {
|
||||
cmd = cmd.env(var, val);
|
||||
}
|
||||
}
|
||||
cmd
|
||||
}
|
||||
|
||||
/// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
|
||||
/// 1. Claims a pidfile with a fcntl lock on it and
|
||||
/// 2. Sets up the pidfile's file descriptor so that it (and the lock)
|
||||
|
||||
@@ -283,7 +283,7 @@ fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body,
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let _guard = logging::init(
|
||||
logging::init(
|
||||
LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
|
||||
@@ -487,8 +487,15 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
.copied()
|
||||
.context("Failed to parse postgres version from the argument string")?;
|
||||
|
||||
let timeline_info =
|
||||
pageserver.timeline_create(tenant_id, None, None, None, Some(pg_version))?;
|
||||
let new_timeline_id_opt = parse_timeline_id(create_match)?;
|
||||
|
||||
let timeline_info = pageserver.timeline_create(
|
||||
tenant_id,
|
||||
new_timeline_id_opt,
|
||||
None,
|
||||
None,
|
||||
Some(pg_version),
|
||||
)?;
|
||||
let new_timeline_id = timeline_info.timeline_id;
|
||||
|
||||
let last_record_lsn = timeline_info.last_record_lsn;
|
||||
@@ -1308,6 +1315,7 @@ fn cli() -> Command {
|
||||
.subcommand(Command::new("create")
|
||||
.about("Create a new blank timeline")
|
||||
.arg(tenant_id_arg.clone())
|
||||
.arg(timeline_id_arg.clone())
|
||||
.arg(branch_name_arg.clone())
|
||||
.arg(pg_version_arg.clone())
|
||||
)
|
||||
|
||||
@@ -140,35 +140,3 @@ impl Key {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl std::str::FromStr for Key {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
|
||||
Self::from_hex(s)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::key::Key;
|
||||
|
||||
#[test]
|
||||
fn display_fromstr_bijection() {
|
||||
let mut rng = rand::thread_rng();
|
||||
use rand::Rng;
|
||||
|
||||
let key = Key {
|
||||
field1: rng.gen(),
|
||||
field2: rng.gen(),
|
||||
field3: rng.gen(),
|
||||
field4: rng.gen(),
|
||||
field5: rng.gen(),
|
||||
field6: rng.gen(),
|
||||
};
|
||||
|
||||
assert_eq!(key, Key::from_str(&format!("{key}")).unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ use utils::{
|
||||
|
||||
use crate::{reltag::RelTag, shard::TenantShardId};
|
||||
use anyhow::bail;
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
|
||||
/// The state of a tenant in this pageserver.
|
||||
///
|
||||
@@ -371,8 +371,6 @@ pub struct TenantInfo {
|
||||
/// If a layer is present in both local FS and S3, it counts only once.
|
||||
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
|
||||
pub attachment_status: TenantAttachmentStatus,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generation: Option<u32>,
|
||||
}
|
||||
|
||||
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
|
||||
@@ -517,8 +515,6 @@ pub enum HistoricLayerInfo {
|
||||
lsn_end: Lsn,
|
||||
remote: bool,
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
remote_path: Option<String>,
|
||||
},
|
||||
Image {
|
||||
layer_file_name: String,
|
||||
@@ -527,8 +523,6 @@ pub enum HistoricLayerInfo {
|
||||
lsn_start: Lsn,
|
||||
remote: bool,
|
||||
access_stats: LayerAccessStats,
|
||||
|
||||
remote_path: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -773,36 +767,6 @@ impl PagestreamBeMessage {
|
||||
|
||||
bytes.into()
|
||||
}
|
||||
|
||||
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
|
||||
let mut buf = buf.reader();
|
||||
let msg_tag = buf.read_u8()?;
|
||||
match msg_tag {
|
||||
100 => todo!(),
|
||||
101 => todo!(),
|
||||
102 => {
|
||||
let buf = buf.get_ref();
|
||||
/* TODO use constant */
|
||||
if buf.len() == 8192 {
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page: buf.clone(),
|
||||
}))
|
||||
} else {
|
||||
anyhow::bail!("invalid page size: {}", buf.len());
|
||||
}
|
||||
}
|
||||
103 => {
|
||||
let buf = buf.get_ref();
|
||||
let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?;
|
||||
let rust_str = cstr.to_str()?;
|
||||
Ok(PagestreamBeMessage::Error(PagestreamErrorResponse {
|
||||
message: rust_str.to_owned(),
|
||||
}))
|
||||
}
|
||||
104 => todo!(),
|
||||
_ => bail!("unknown tag: {:?}", msg_tag),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -868,7 +832,6 @@ mod tests {
|
||||
state: TenantState::Active,
|
||||
current_physical_size: Some(42),
|
||||
attachment_status: TenantAttachmentStatus::Attached,
|
||||
generation: None,
|
||||
};
|
||||
let expected_active = json!({
|
||||
"id": original_active.id.to_string(),
|
||||
@@ -889,7 +852,6 @@ mod tests {
|
||||
},
|
||||
current_physical_size: Some(42),
|
||||
attachment_status: TenantAttachmentStatus::Attached,
|
||||
generation: None,
|
||||
};
|
||||
let expected_broken = json!({
|
||||
"id": original_broken.id.to_string(),
|
||||
|
||||
@@ -81,12 +81,6 @@ impl std::fmt::Display for RemotePath {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<RemotePath> for String {
|
||||
fn from(val: RemotePath) -> Self {
|
||||
val.0.into()
|
||||
}
|
||||
}
|
||||
|
||||
impl RemotePath {
|
||||
pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
|
||||
anyhow::ensure!(
|
||||
@@ -108,7 +102,7 @@ impl RemotePath {
|
||||
self.0.file_name()
|
||||
}
|
||||
|
||||
pub fn join<P: AsRef<Utf8Path>>(&self, segment: P) -> Self {
|
||||
pub fn join(&self, segment: &Utf8Path) -> Self {
|
||||
Self(self.0.join(segment))
|
||||
}
|
||||
|
||||
|
||||
@@ -278,7 +278,7 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res
|
||||
|
||||
fn ensure_logging_ready() {
|
||||
LOGGING_DONE.get_or_init(|| {
|
||||
let _ = utils::logging::init(
|
||||
utils::logging::init(
|
||||
utils::logging::LogFormat::Test,
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
|
||||
@@ -207,7 +207,7 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
|
||||
|
||||
fn ensure_logging_ready() {
|
||||
LOGGING_DONE.get_or_init(|| {
|
||||
let _ = utils::logging::init(
|
||||
utils::logging::init(
|
||||
utils::logging::LogFormat::Test,
|
||||
utils::logging::TracingErrorLayerEnablement::Disabled,
|
||||
utils::logging::Output::Stdout,
|
||||
|
||||
@@ -49,8 +49,6 @@ const_format.workspace = true
|
||||
# to use tokio channels as streams, this is faster to compile than async_stream
|
||||
# why is it only here? no other crate should use it, streams are rarely needed.
|
||||
tokio-stream = { version = "0.1.14" }
|
||||
tracing-chrome = "0.7.1"
|
||||
tracing-flame = "0.2.0"
|
||||
|
||||
[dev-dependencies]
|
||||
byteorder.workspace = true
|
||||
|
||||
21
libs/utils/scripts/restore_from_wal_initdb.sh
Executable file
21
libs/utils/scripts/restore_from_wal_initdb.sh
Executable file
@@ -0,0 +1,21 @@
|
||||
#!/bin/bash
|
||||
|
||||
# like restore_from_wal.sh, but takes existing initdb.tar.zst
|
||||
|
||||
set -euxo pipefail
|
||||
|
||||
PG_BIN=$1
|
||||
WAL_PATH=$2
|
||||
DATA_DIR=$3
|
||||
PORT=$4
|
||||
echo "port=$PORT" >> "$DATA_DIR"/postgresql.conf
|
||||
echo "shared_preload_libraries='\$libdir/neon_rmgr.so'" >> "$DATA_DIR"/postgresql.conf
|
||||
REDO_POS=0x$("$PG_BIN"/pg_controldata -D "$DATA_DIR" | grep -F "REDO location"| cut -c 42-)
|
||||
declare -i WAL_SIZE=$REDO_POS+114
|
||||
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" start
|
||||
"$PG_BIN"/pg_ctl -D "$DATA_DIR" -l "$DATA_DIR/logfile.log" stop -m immediate
|
||||
cp "$DATA_DIR"/pg_wal/000000010000000000000001 .
|
||||
cp "$WAL_PATH"/* "$DATA_DIR"/pg_wal/
|
||||
for partial in "$DATA_DIR"/pg_wal/*.partial ; do mv "$partial" "${partial%.partial}" ; done
|
||||
dd if=000000010000000000000001 of="$DATA_DIR"/pg_wal/000000010000000000000001 bs=$WAL_SIZE count=1 conv=notrunc
|
||||
rm -f 000000010000000000000001
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{io::BufWriter, str::FromStr};
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::Lazy;
|
||||
@@ -73,18 +73,11 @@ pub enum Output {
|
||||
Stderr,
|
||||
}
|
||||
|
||||
/// Keep alive and drop it before the program terminates.
|
||||
#[must_use]
|
||||
pub struct FlushGuard {
|
||||
_tracing_chrome_layer: Option<tracing_chrome::FlushGuard>,
|
||||
_tracing_flame_layer: Option<tracing_flame::FlushGuard<BufWriter<std::fs::File>>>,
|
||||
}
|
||||
|
||||
pub fn init(
|
||||
log_format: LogFormat,
|
||||
tracing_error_layer_enablement: TracingErrorLayerEnablement,
|
||||
output: Output,
|
||||
) -> anyhow::Result<FlushGuard> {
|
||||
) -> anyhow::Result<()> {
|
||||
// We fall back to printing all spans at info-level or above if
|
||||
// the RUST_LOG environment variable is not set.
|
||||
let rust_log_env_filter = || {
|
||||
@@ -92,51 +85,11 @@ pub fn init(
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
|
||||
};
|
||||
|
||||
// WIP: lift it up as an argument
|
||||
let enable_tracing_chrome = match std::env::var("NEON_PAGESERVER_ENABLE_TRACING_CHROME") {
|
||||
Ok(s) if s != "0" => true,
|
||||
Ok(_s) => false,
|
||||
Err(std::env::VarError::NotPresent) => false,
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var NEON_PAGESERVER_ENABLE_TRACING_CHROME not unicode")
|
||||
}
|
||||
};
|
||||
|
||||
// WIP: lift it up as an argument
|
||||
let enable_tracing_flame = match std::env::var("NEON_PAGESERVER_ENABLE_TRACING_FLAME") {
|
||||
Ok(s) if s != "0" => true,
|
||||
Ok(_s) => false,
|
||||
Err(std::env::VarError::NotPresent) => false,
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var NEON_PAGESERVER_ENABLE_TRACING_FLAME not unicode")
|
||||
}
|
||||
};
|
||||
|
||||
// NB: the order of the with() calls does not matter.
|
||||
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
|
||||
use tracing_subscriber::prelude::*;
|
||||
|
||||
// https://users.rust-lang.org/t/how-can-i-init-tracing-registry-dynamically-with-multiple-outputs/94307/6
|
||||
#[derive(Default)]
|
||||
struct LayerStack {
|
||||
layers:
|
||||
Option<Box<dyn tracing_subscriber::Layer<tracing_subscriber::Registry> + Sync + Send>>,
|
||||
}
|
||||
impl LayerStack {
|
||||
fn add_layer<L>(&mut self, new_layer: L)
|
||||
where
|
||||
L: tracing_subscriber::Layer<tracing_subscriber::Registry> + Send + Sync,
|
||||
{
|
||||
let new = match self.layers.take() {
|
||||
Some(layers) => Some(layers.and_then(new_layer).boxed()),
|
||||
None => Some(new_layer.boxed()),
|
||||
};
|
||||
self.layers = new;
|
||||
}
|
||||
}
|
||||
let mut layers = LayerStack::default();
|
||||
|
||||
layers.add_layer({
|
||||
let r = tracing_subscriber::registry();
|
||||
let r = r.with({
|
||||
let log_layer = tracing_subscriber::fmt::layer()
|
||||
.with_target(false)
|
||||
.with_ansi(false)
|
||||
@@ -153,47 +106,15 @@ pub fn init(
|
||||
};
|
||||
log_layer.with_filter(rust_log_env_filter())
|
||||
});
|
||||
|
||||
layers
|
||||
.add_layer(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
|
||||
|
||||
let tracing_chrome_layer_flush_guard = if enable_tracing_chrome {
|
||||
let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
|
||||
.trace_style(tracing_chrome::TraceStyle::Async)
|
||||
.build();
|
||||
layers.add_layer(layer.with_filter(rust_log_env_filter()));
|
||||
Some(guard)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let tracing_flame_flush_guard = if enable_tracing_flame {
|
||||
let (layer, guard) = tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
|
||||
let layer = layer
|
||||
.with_empty_samples(false)
|
||||
.with_module_path(false)
|
||||
.with_file_and_line(false)
|
||||
.with_threads_collapsed(true);
|
||||
layers.add_layer(layer.with_filter(rust_log_env_filter()));
|
||||
Some(guard)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let r = r.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
|
||||
match tracing_error_layer_enablement {
|
||||
TracingErrorLayerEnablement::EnableWithRustLogFilter => layers
|
||||
.add_layer(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter())),
|
||||
TracingErrorLayerEnablement::Disabled => (),
|
||||
TracingErrorLayerEnablement::EnableWithRustLogFilter => r
|
||||
.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
|
||||
.init(),
|
||||
TracingErrorLayerEnablement::Disabled => r.init(),
|
||||
}
|
||||
|
||||
let r = tracing_subscriber::registry();
|
||||
r.with(layers.layers.expect("we add at least one layer"))
|
||||
.init();
|
||||
|
||||
Ok(FlushGuard {
|
||||
_tracing_chrome_layer: tracing_chrome_layer_flush_guard,
|
||||
_tracing_flame_layer: tracing_flame_flush_guard,
|
||||
})
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Disable the default rust panic hook by using `set_hook`.
|
||||
|
||||
@@ -366,47 +366,6 @@ impl MonotonicCounter<Lsn> for RecordLsn {
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements [`rand::distributions::uniform::UniformSampler`] so we can sample [`Lsn`]s.
|
||||
pub struct LsnSampler(<u64 as rand::distributions::uniform::SampleUniform>::Sampler);
|
||||
|
||||
impl rand::distributions::uniform::SampleUniform for Lsn {
|
||||
type Sampler = LsnSampler;
|
||||
}
|
||||
|
||||
impl rand::distributions::uniform::UniformSampler for LsnSampler {
|
||||
type X = Lsn;
|
||||
|
||||
fn new<B1, B2>(low: B1, high: B2) -> Self
|
||||
where
|
||||
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
{
|
||||
Self(
|
||||
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new(
|
||||
low.borrow().0,
|
||||
high.borrow().0,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn new_inclusive<B1, B2>(low: B1, high: B2) -> Self
|
||||
where
|
||||
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
|
||||
{
|
||||
Self(
|
||||
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new_inclusive(
|
||||
low.borrow().0,
|
||||
high.borrow().0,
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn sample<R: rand::prelude::Rng + ?Sized>(&self, rng: &mut R) -> Self::X {
|
||||
Lsn(self.0.sample(rng))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::bin_ser::BeSer;
|
||||
|
||||
@@ -82,9 +82,6 @@ enum-map.workspace = true
|
||||
enumset.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
|
||||
tracing-chrome = "0.7.1"
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
|
||||
@@ -18,3 +18,5 @@ tokio.workspace = true
|
||||
utils.workspace = true
|
||||
svg_fmt.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
38
pageserver/ctl/src/index_part.rs
Normal file
38
pageserver/ctl/src/index_part.rs
Normal file
@@ -0,0 +1,38 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver::tenant::remote_timeline_client::index::IndexLayerMetadata;
|
||||
use pageserver::tenant::storage_layer::LayerFileName;
|
||||
use pageserver::tenant::{metadata::TimelineMetadata, IndexPart};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
#[derive(clap::Subcommand)]
|
||||
pub(crate) enum IndexPartCmd {
|
||||
Dump { path: Utf8PathBuf },
|
||||
}
|
||||
|
||||
pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
|
||||
match cmd {
|
||||
IndexPartCmd::Dump { path } => {
|
||||
let bytes = tokio::fs::read(path).await.context("read file")?;
|
||||
let des: IndexPart = IndexPart::from_s3_bytes(&bytes).context("deserialize")?;
|
||||
#[derive(serde::Serialize)]
|
||||
struct Output<'a> {
|
||||
layer_metadata: &'a HashMap<LayerFileName, IndexLayerMetadata>,
|
||||
disk_consistent_lsn: Lsn,
|
||||
timeline_metadata: &'a TimelineMetadata,
|
||||
}
|
||||
|
||||
let output = Output {
|
||||
layer_metadata: &des.layer_metadata,
|
||||
disk_consistent_lsn: des.get_disk_consistent_lsn(),
|
||||
timeline_metadata: &des.metadata,
|
||||
};
|
||||
|
||||
let output = serde_json::to_string_pretty(&output).context("serialize output")?;
|
||||
println!("{output}");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,15 +1,13 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::Result;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use camino::Utf8Path;
|
||||
use clap::Subcommand;
|
||||
use pageserver::context::{DownloadBehavior, RequestContext};
|
||||
use pageserver::task_mgr::TaskKind;
|
||||
use pageserver::tenant::block_io::BlockCursor;
|
||||
use pageserver::tenant::disk_btree::DiskBtreeReader;
|
||||
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
|
||||
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
|
||||
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
|
||||
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
use pageserver::{
|
||||
@@ -22,7 +20,6 @@ use pageserver::{
|
||||
};
|
||||
use std::fs;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use crate::layer_map_analyzer::parse_filename;
|
||||
|
||||
@@ -48,13 +45,6 @@ pub(crate) enum LayerCmd {
|
||||
/// The id from list-layer command
|
||||
id: usize,
|
||||
},
|
||||
RewriteSummary {
|
||||
layer_file_path: Utf8PathBuf,
|
||||
#[clap(long)]
|
||||
new_tenant_id: Option<TenantId>,
|
||||
#[clap(long)]
|
||||
new_timeline_id: Option<TimelineId>,
|
||||
},
|
||||
}
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
||||
@@ -110,7 +100,6 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
println!("- timeline {}", timeline.file_name().to_string_lossy());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::ListLayer {
|
||||
path,
|
||||
@@ -139,7 +128,6 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::DumpLayer {
|
||||
path,
|
||||
@@ -180,63 +168,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::RewriteSummary {
|
||||
layer_file_path,
|
||||
new_tenant_id,
|
||||
new_timeline_id,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
macro_rules! rewrite_closure {
|
||||
($($summary_ty:tt)*) => {{
|
||||
|summary| $($summary_ty)* {
|
||||
tenant_id: new_tenant_id.unwrap_or(summary.tenant_id),
|
||||
timeline_id: new_timeline_id.unwrap_or(summary.timeline_id),
|
||||
..summary
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
let res = ImageLayer::rewrite_summary(
|
||||
layer_file_path,
|
||||
rewrite_closure!(image_layer::Summary),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
match res {
|
||||
Ok(()) => {
|
||||
println!("Successfully rewrote summary of image layer {layer_file_path}");
|
||||
return Ok(());
|
||||
}
|
||||
Err(image_layer::RewriteSummaryError::MagicMismatch) => (), // fallthrough
|
||||
Err(image_layer::RewriteSummaryError::Other(e)) => {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
let res = DeltaLayer::rewrite_summary(
|
||||
layer_file_path,
|
||||
rewrite_closure!(delta_layer::Summary),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
match res {
|
||||
Ok(()) => {
|
||||
println!("Successfully rewrote summary of delta layer {layer_file_path}");
|
||||
return Ok(());
|
||||
}
|
||||
Err(delta_layer::RewriteSummaryError::MagicMismatch) => (), // fallthrough
|
||||
Err(delta_layer::RewriteSummaryError::Other(e)) => {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
anyhow::bail!("not an image or delta layer: {layer_file_path}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -5,11 +5,13 @@
|
||||
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
|
||||
|
||||
mod draw_timeline_dir;
|
||||
mod index_part;
|
||||
mod layer_map_analyzer;
|
||||
mod layers;
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::{Parser, Subcommand};
|
||||
use index_part::IndexPartCmd;
|
||||
use layers::LayerCmd;
|
||||
use pageserver::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
@@ -38,6 +40,8 @@ struct CliOpts {
|
||||
#[derive(Subcommand)]
|
||||
enum Commands {
|
||||
Metadata(MetadataCmd),
|
||||
#[command(subcommand)]
|
||||
IndexPart(IndexPartCmd),
|
||||
PrintLayerFile(PrintLayerFileCmd),
|
||||
DrawTimeline {},
|
||||
AnalyzeLayerMap(AnalyzeLayerMapCmd),
|
||||
@@ -83,6 +87,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
Commands::Metadata(cmd) => {
|
||||
handle_metadata(&cmd)?;
|
||||
}
|
||||
Commands::IndexPart(cmd) => {
|
||||
index_part::main(&cmd).await?;
|
||||
}
|
||||
Commands::DrawTimeline {} => {
|
||||
draw_timeline_dir::main()?;
|
||||
}
|
||||
|
||||
@@ -1,23 +0,0 @@
|
||||
[package]
|
||||
name = "pagebench"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
clap.workspace = true
|
||||
futures.workspace = true
|
||||
hdrhistogram.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
rand.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
|
||||
pageserver = { path = ".." }
|
||||
utils = { path = "../../libs/utils/" }
|
||||
@@ -1,402 +0,0 @@
|
||||
use anyhow::Context;
|
||||
use pageserver::client::page_service::BasebackupRequest;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use rand::prelude::*;
|
||||
use tokio::sync::Barrier;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{debug, info, instrument};
|
||||
use utils::id::TenantId;
|
||||
use utils::logging;
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::util::tenant_timeline_id::TenantTimelineId;
|
||||
|
||||
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "localhost:64000")]
|
||||
page_service_host_port: String,
|
||||
#[clap(long)]
|
||||
pageserver_jwt: Option<String>,
|
||||
#[clap(long, default_value = "1")]
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long, default_value = "1.0")]
|
||||
gzip_probability: f64,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct LiveStats {
|
||||
completed_requests: AtomicU64,
|
||||
}
|
||||
|
||||
impl LiveStats {
|
||||
fn inc(&self) {
|
||||
self.completed_requests.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct Output {
|
||||
total: PerTaskOutput,
|
||||
}
|
||||
|
||||
const LATENCY_PERCENTILES: [f64; 4] = [95.0, 99.00, 99.90, 99.99];
|
||||
|
||||
struct LatencyPercentiles {
|
||||
latency_percentiles: [Duration; 4],
|
||||
}
|
||||
|
||||
impl serde::Serialize for LatencyPercentiles {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
|
||||
for p in LATENCY_PERCENTILES {
|
||||
ser.serialize_entry(
|
||||
&format!("p{p}"),
|
||||
&format!(
|
||||
"{}",
|
||||
&humantime::format_duration(self.latency_percentiles[0])
|
||||
),
|
||||
)?;
|
||||
}
|
||||
ser.end()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct PerTaskOutput {
|
||||
request_count: u64,
|
||||
#[serde(with = "humantime_serde")]
|
||||
latency_mean: Duration,
|
||||
latency_percentiles: LatencyPercentiles,
|
||||
}
|
||||
|
||||
struct ThreadLocalStats {
|
||||
latency_histo: hdrhistogram::Histogram<u64>,
|
||||
}
|
||||
|
||||
impl ThreadLocalStats {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
// Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram,
|
||||
// which would skew the benchmark results.
|
||||
latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(),
|
||||
}
|
||||
}
|
||||
fn observe(&mut self, latency: Duration) -> anyhow::Result<()> {
|
||||
let micros: u64 = latency
|
||||
.as_micros()
|
||||
.try_into()
|
||||
.context("latency greater than u64")?;
|
||||
self.latency_histo
|
||||
.record(micros)
|
||||
.context("add to histogram")?;
|
||||
Ok(())
|
||||
}
|
||||
fn output(&self) -> PerTaskOutput {
|
||||
let latency_percentiles = std::array::from_fn(|idx| {
|
||||
let micros = self
|
||||
.latency_histo
|
||||
.value_at_percentile(LATENCY_PERCENTILES[idx]);
|
||||
Duration::from_micros(micros)
|
||||
});
|
||||
PerTaskOutput {
|
||||
request_count: self.latency_histo.len(),
|
||||
latency_mean: Duration::from_micros(self.latency_histo.mean() as u64),
|
||||
latency_percentiles: LatencyPercentiles {
|
||||
latency_percentiles,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn add(&mut self, other: &Self) {
|
||||
let Self {
|
||||
ref mut latency_histo,
|
||||
} = self;
|
||||
latency_histo.add(&other.latency_histo).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
pub static STATS: RefCell<Arc<Mutex<ThreadLocalStats>>> = std::cell::RefCell::new(
|
||||
Arc::new(Mutex::new(ThreadLocalStats::new()))
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
|
||||
let _guard = logging::init(
|
||||
logging::LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stderr,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let thread_local_stats = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.on_thread_start({
|
||||
let thread_local_stats = Arc::clone(&thread_local_stats);
|
||||
move || {
|
||||
// pre-initialize the histograms
|
||||
STATS.with(|stats| {
|
||||
let stats: Arc<_> = Arc::clone(&*stats.borrow());
|
||||
thread_local_stats.lock().unwrap().push(stats);
|
||||
});
|
||||
}
|
||||
})
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let main_task = rt.spawn(main_impl(args, thread_local_stats));
|
||||
rt.block_on(main_task).unwrap()
|
||||
}
|
||||
|
||||
struct Target {
|
||||
timeline: TenantTimelineId,
|
||||
lsn_range: Option<Range<Lsn>>,
|
||||
}
|
||||
|
||||
async fn main_impl(
|
||||
args: Args,
|
||||
thread_local_stats: Arc<Mutex<Vec<Arc<Mutex<ThreadLocalStats>>>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver::client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let mut timelines: Vec<TenantTimelineId> = Vec::new();
|
||||
if args.targets.is_some() {
|
||||
timelines = args.targets.clone().unwrap();
|
||||
} else {
|
||||
let tenants: Vec<TenantId> = mgmt_api_client
|
||||
.list_tenants()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|ti| ti.id)
|
||||
.collect();
|
||||
let mut js = JoinSet::new();
|
||||
for tenant_id in tenants {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
async move {
|
||||
(
|
||||
tenant_id,
|
||||
mgmt_api_client.list_timelines(tenant_id).await.unwrap(),
|
||||
)
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = js.join_next().await {
|
||||
let (tenant_id, tl_infos) = res.unwrap();
|
||||
for tl in tl_infos {
|
||||
timelines.push(TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id: tl.timeline_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("timelines:\n{:?}", timelines);
|
||||
|
||||
let mut js = JoinSet::new();
|
||||
for timeline in &timelines {
|
||||
js.spawn({
|
||||
let timeline = *timeline;
|
||||
let info = mgmt_api_client
|
||||
.timeline_info(timeline.tenant_id, timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
async move {
|
||||
anyhow::Ok(Target {
|
||||
timeline,
|
||||
lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
|
||||
})
|
||||
}
|
||||
});
|
||||
}
|
||||
let mut all_targets: Vec<Target> = Vec::new();
|
||||
while let Some(res) = js.join_next().await {
|
||||
all_targets.push(res.unwrap().unwrap());
|
||||
}
|
||||
|
||||
let live_stats = Arc::new(LiveStats::default());
|
||||
|
||||
let num_client_tasks = timelines.len();
|
||||
let num_live_stats_dump = 1;
|
||||
let num_work_sender_tasks = 1;
|
||||
|
||||
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
|
||||
num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
|
||||
));
|
||||
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
|
||||
|
||||
tokio::spawn({
|
||||
let stats = Arc::clone(&live_stats);
|
||||
let start_work_barrier = Arc::clone(&start_work_barrier);
|
||||
async move {
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
|
||||
let elapsed = start.elapsed();
|
||||
info!(
|
||||
"RPS: {:.0}",
|
||||
completed_requests as f64 / elapsed.as_secs_f64()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut work_senders = HashMap::new();
|
||||
let mut tasks = Vec::new();
|
||||
for tl in &timelines {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
|
||||
work_senders.insert(tl, sender);
|
||||
tasks.push(tokio::spawn(client(
|
||||
args,
|
||||
*tl,
|
||||
Arc::clone(&start_work_barrier),
|
||||
receiver,
|
||||
Arc::clone(&all_work_done_barrier),
|
||||
Arc::clone(&live_stats),
|
||||
)));
|
||||
}
|
||||
|
||||
let work_sender = async move {
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let (timeline, work) = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let target = all_targets.choose(&mut rng).unwrap();
|
||||
let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
|
||||
(
|
||||
target.timeline,
|
||||
Work {
|
||||
lsn,
|
||||
gzip: rng.gen_bool(args.gzip_probability),
|
||||
},
|
||||
)
|
||||
};
|
||||
let sender = work_senders.get(&timeline).unwrap();
|
||||
// TODO: what if this blocks?
|
||||
sender.send(work).await.ok().unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(runtime) = args.runtime {
|
||||
match tokio::time::timeout(runtime.into(), work_sender).await {
|
||||
Ok(()) => unreachable!("work sender never terminates"),
|
||||
Err(_timeout) => {
|
||||
// this implicitly drops the work_senders, making all the clients exit
|
||||
}
|
||||
}
|
||||
} else {
|
||||
work_sender.await;
|
||||
unreachable!("work sender never terminates");
|
||||
}
|
||||
|
||||
for t in tasks {
|
||||
t.await.unwrap();
|
||||
}
|
||||
|
||||
let output = Output {
|
||||
total: {
|
||||
let mut agg_stats = ThreadLocalStats::new();
|
||||
for stats in thread_local_stats.lock().unwrap().iter() {
|
||||
let stats = stats.lock().unwrap();
|
||||
agg_stats.add(&*stats);
|
||||
}
|
||||
agg_stats.output()
|
||||
},
|
||||
};
|
||||
|
||||
let output = serde_json::to_string_pretty(&output).unwrap();
|
||||
println!("{output}");
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
struct Work {
|
||||
lsn: Option<Lsn>,
|
||||
gzip: bool,
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn client(
|
||||
args: &'static Args,
|
||||
timeline: TenantTimelineId,
|
||||
start_work_barrier: Arc<Barrier>,
|
||||
mut work: tokio::sync::mpsc::Receiver<Work>,
|
||||
all_work_done_barrier: Arc<Barrier>,
|
||||
live_stats: Arc<LiveStats>,
|
||||
) {
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
let client =
|
||||
pageserver::client::page_service::Client::new(crate::util::connstring::connstring(
|
||||
&args.page_service_host_port,
|
||||
args.pageserver_jwt.as_deref(),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(Work { lsn, gzip }) = work.recv().await {
|
||||
let start = Instant::now();
|
||||
let copy_out_stream = client
|
||||
.basebackup(&BasebackupRequest {
|
||||
tenant_id: timeline.tenant_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
lsn,
|
||||
gzip,
|
||||
})
|
||||
.await
|
||||
.with_context(|| format!("start basebackup for {timeline}"))
|
||||
.unwrap();
|
||||
|
||||
use futures::StreamExt;
|
||||
let size = Arc::new(AtomicUsize::new(0));
|
||||
copy_out_stream
|
||||
.for_each({
|
||||
|r| {
|
||||
let size = Arc::clone(&size);
|
||||
async move {
|
||||
let size = Arc::clone(&size);
|
||||
size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
debug!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
|
||||
let elapsed = start.elapsed();
|
||||
live_stats.inc();
|
||||
STATS.with(|stats| {
|
||||
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
all_work_done_barrier.wait().await;
|
||||
}
|
||||
@@ -1,404 +0,0 @@
|
||||
use anyhow::Context;
|
||||
use pageserver::client::page_service::RelTagBlockNo;
|
||||
use pageserver::pgdatadir_mapping::{is_rel_block_key, key_to_rel_block};
|
||||
use pageserver::repository;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use rand::prelude::*;
|
||||
use tokio::sync::Barrier;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{info, instrument};
|
||||
use utils::id::TenantId;
|
||||
use utils::logging;
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::util::tenant_timeline_id::TenantTimelineId;
|
||||
|
||||
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
|
||||
page_service_connstring: String,
|
||||
#[clap(long, default_value = "1")]
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct LiveStats {
|
||||
completed_requests: AtomicU64,
|
||||
}
|
||||
|
||||
impl LiveStats {
|
||||
fn inc(&self) {
|
||||
self.completed_requests.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct Output {
|
||||
total: PerTaskOutput,
|
||||
}
|
||||
|
||||
const LATENCY_PERCENTILES: [f64; 4] = [95.0, 99.00, 99.90, 99.99];
|
||||
|
||||
struct LatencyPercentiles {
|
||||
latency_percentiles: [Duration; 4],
|
||||
}
|
||||
|
||||
impl serde::Serialize for LatencyPercentiles {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
|
||||
for p in LATENCY_PERCENTILES {
|
||||
ser.serialize_entry(
|
||||
&format!("p{p}"),
|
||||
&format!(
|
||||
"{}",
|
||||
&humantime::format_duration(self.latency_percentiles[0])
|
||||
),
|
||||
)?;
|
||||
}
|
||||
ser.end()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(serde::Serialize)]
|
||||
struct PerTaskOutput {
|
||||
request_count: u64,
|
||||
#[serde(with = "humantime_serde")]
|
||||
latency_mean: Duration,
|
||||
latency_percentiles: LatencyPercentiles,
|
||||
}
|
||||
|
||||
struct ThreadLocalStats {
|
||||
latency_histo: hdrhistogram::Histogram<u64>,
|
||||
}
|
||||
|
||||
impl ThreadLocalStats {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
// Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram,
|
||||
// which would skew the benchmark results.
|
||||
latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(),
|
||||
}
|
||||
}
|
||||
fn observe(&mut self, latency: Duration) -> anyhow::Result<()> {
|
||||
let micros: u64 = latency
|
||||
.as_micros()
|
||||
.try_into()
|
||||
.context("latency greater than u64")?;
|
||||
self.latency_histo
|
||||
.record(micros)
|
||||
.context("add to histogram")?;
|
||||
Ok(())
|
||||
}
|
||||
fn output(&self) -> PerTaskOutput {
|
||||
let latency_percentiles = std::array::from_fn(|idx| {
|
||||
let micros = self
|
||||
.latency_histo
|
||||
.value_at_percentile(LATENCY_PERCENTILES[idx]);
|
||||
Duration::from_micros(micros)
|
||||
});
|
||||
PerTaskOutput {
|
||||
request_count: self.latency_histo.len(),
|
||||
latency_mean: Duration::from_micros(self.latency_histo.mean() as u64),
|
||||
latency_percentiles: LatencyPercentiles {
|
||||
latency_percentiles,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn add(&mut self, other: &Self) {
|
||||
let Self {
|
||||
ref mut latency_histo,
|
||||
} = self;
|
||||
latency_histo.add(&other.latency_histo).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
pub static STATS: RefCell<Arc<Mutex<ThreadLocalStats>>> = std::cell::RefCell::new(
|
||||
Arc::new(Mutex::new(ThreadLocalStats::new()))
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
|
||||
let _guard = logging::init(
|
||||
logging::LogFormat::Plain,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stderr,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let thread_local_stats = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.on_thread_start({
|
||||
let thread_local_stats = Arc::clone(&thread_local_stats);
|
||||
move || {
|
||||
// pre-initialize the histograms
|
||||
STATS.with(|stats| {
|
||||
let stats: Arc<_> = Arc::clone(&*stats.borrow());
|
||||
thread_local_stats.lock().unwrap().push(stats);
|
||||
});
|
||||
}
|
||||
})
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let main_task = rt.spawn(main_impl(args, thread_local_stats));
|
||||
rt.block_on(main_task).unwrap()
|
||||
}
|
||||
|
||||
struct KeyRange {
|
||||
timeline: TenantTimelineId,
|
||||
timeline_lsn: Lsn,
|
||||
start: i128,
|
||||
end: i128,
|
||||
}
|
||||
|
||||
impl KeyRange {
|
||||
fn len(&self) -> i128 {
|
||||
self.end - self.start
|
||||
}
|
||||
}
|
||||
|
||||
async fn main_impl(
|
||||
args: Args,
|
||||
thread_local_stats: Arc<Mutex<Vec<Arc<Mutex<ThreadLocalStats>>>>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let args: &'static Args = Box::leak(Box::new(args));
|
||||
|
||||
let mgmt_api_client = Arc::new(pageserver::client::mgmt_api::Client::new(
|
||||
args.mgmt_api_endpoint.clone(),
|
||||
None, // TODO: support jwt in args
|
||||
));
|
||||
|
||||
// discover targets
|
||||
let mut timelines: Vec<TenantTimelineId> = Vec::new();
|
||||
if args.targets.is_some() {
|
||||
timelines = args.targets.clone().unwrap();
|
||||
} else {
|
||||
let tenants: Vec<TenantId> = mgmt_api_client
|
||||
.list_tenants()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|ti| ti.id)
|
||||
.collect();
|
||||
let mut js = JoinSet::new();
|
||||
for tenant_id in tenants {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
async move {
|
||||
(
|
||||
tenant_id,
|
||||
mgmt_api_client.list_timelines(tenant_id).await.unwrap(),
|
||||
)
|
||||
}
|
||||
});
|
||||
}
|
||||
while let Some(res) = js.join_next().await {
|
||||
let (tenant_id, tl_infos) = res.unwrap();
|
||||
for tl in tl_infos {
|
||||
timelines.push(TenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id: tl.timeline_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("timelines:\n{:?}", timelines);
|
||||
|
||||
let mut js = JoinSet::new();
|
||||
for timeline in &timelines {
|
||||
js.spawn({
|
||||
let mgmt_api_client = Arc::clone(&mgmt_api_client);
|
||||
let timeline = *timeline;
|
||||
async move {
|
||||
let partitioning = mgmt_api_client
|
||||
.keyspace(timeline.tenant_id, timeline.timeline_id)
|
||||
.await?;
|
||||
let lsn = partitioning.at_lsn;
|
||||
|
||||
let ranges = partitioning
|
||||
.keys
|
||||
.ranges
|
||||
.iter()
|
||||
.filter_map(|r| {
|
||||
let start = r.start;
|
||||
let end = r.end;
|
||||
// filter out non-relblock keys
|
||||
match (is_rel_block_key(start), is_rel_block_key(end)) {
|
||||
(true, true) => Some(KeyRange {
|
||||
timeline,
|
||||
timeline_lsn: lsn,
|
||||
start: start.to_i128(),
|
||||
end: end.to_i128(),
|
||||
}),
|
||||
(true, false) | (false, true) => {
|
||||
unimplemented!("split up range")
|
||||
}
|
||||
(false, false) => None,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
anyhow::Ok(ranges)
|
||||
}
|
||||
});
|
||||
}
|
||||
let mut all_ranges: Vec<KeyRange> = Vec::new();
|
||||
while let Some(res) = js.join_next().await {
|
||||
all_ranges.extend(res.unwrap().unwrap());
|
||||
}
|
||||
let weights =
|
||||
rand::distributions::weighted::WeightedIndex::new(all_ranges.iter().map(|v| v.len()))
|
||||
.unwrap();
|
||||
|
||||
let live_stats = Arc::new(LiveStats::default());
|
||||
|
||||
let num_client_tasks = timelines.len();
|
||||
let num_live_stats_dump = 1;
|
||||
let num_work_sender_tasks = 1;
|
||||
|
||||
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
|
||||
num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
|
||||
));
|
||||
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
|
||||
|
||||
tokio::spawn({
|
||||
let stats = Arc::clone(&live_stats);
|
||||
let start_work_barrier = Arc::clone(&start_work_barrier);
|
||||
async move {
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
|
||||
let elapsed = start.elapsed();
|
||||
info!(
|
||||
"RPS: {:.0}",
|
||||
completed_requests as f64 / elapsed.as_secs_f64()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut work_senders = HashMap::new();
|
||||
let mut tasks = Vec::new();
|
||||
for tl in &timelines {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel(10); // TODO: not sure what the implications of this are
|
||||
work_senders.insert(tl, sender);
|
||||
tasks.push(tokio::spawn(client(
|
||||
args,
|
||||
*tl,
|
||||
Arc::clone(&start_work_barrier),
|
||||
receiver,
|
||||
Arc::clone(&all_work_done_barrier),
|
||||
Arc::clone(&live_stats),
|
||||
)));
|
||||
}
|
||||
|
||||
let work_sender = async move {
|
||||
start_work_barrier.wait().await;
|
||||
loop {
|
||||
let (range, key) = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &all_ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = repository::Key::from_i128(key);
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
(r, RelTagBlockNo { rel_tag, block_no })
|
||||
};
|
||||
let sender = work_senders.get(&range.timeline).unwrap();
|
||||
// TODO: what if this blocks?
|
||||
sender.send((key, range.timeline_lsn)).await.ok().unwrap();
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(runtime) = args.runtime {
|
||||
match tokio::time::timeout(runtime.into(), work_sender).await {
|
||||
Ok(()) => unreachable!("work sender never terminates"),
|
||||
Err(_timeout) => {
|
||||
// this implicitly drops the work_senders, making all the clients exit
|
||||
}
|
||||
}
|
||||
} else {
|
||||
work_sender.await;
|
||||
unreachable!("work sender never terminates");
|
||||
}
|
||||
|
||||
for t in tasks {
|
||||
t.await.unwrap();
|
||||
}
|
||||
|
||||
let output = Output {
|
||||
total: {
|
||||
let mut agg_stats = ThreadLocalStats::new();
|
||||
for stats in thread_local_stats.lock().unwrap().iter() {
|
||||
let stats = stats.lock().unwrap();
|
||||
agg_stats.add(&*stats);
|
||||
}
|
||||
agg_stats.output()
|
||||
},
|
||||
};
|
||||
|
||||
let output = serde_json::to_string_pretty(&output).unwrap();
|
||||
println!("{output}");
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn client(
|
||||
args: &'static Args,
|
||||
timeline: TenantTimelineId,
|
||||
start_work_barrier: Arc<Barrier>,
|
||||
mut work: tokio::sync::mpsc::Receiver<(RelTagBlockNo, Lsn)>,
|
||||
all_work_done_barrier: Arc<Barrier>,
|
||||
live_stats: Arc<LiveStats>,
|
||||
) {
|
||||
start_work_barrier.wait().await;
|
||||
|
||||
let client =
|
||||
pageserver::client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(timeline.tenant_id, timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some((key, lsn)) = work.recv().await {
|
||||
let start = Instant::now();
|
||||
client
|
||||
.getpage(key, lsn)
|
||||
.await
|
||||
.with_context(|| format!("getpage for {timeline}"))
|
||||
.unwrap();
|
||||
let elapsed = start.elapsed();
|
||||
live_stats.inc();
|
||||
STATS.with(|stats| {
|
||||
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
all_work_done_barrier.wait().await;
|
||||
}
|
||||
@@ -1,22 +0,0 @@
|
||||
use clap::Parser;
|
||||
|
||||
pub(crate) mod util;
|
||||
|
||||
mod basebackup;
|
||||
mod getpage_latest_lsn;
|
||||
|
||||
/// Component-level performance test for pageserver.
|
||||
#[derive(clap::Parser)]
|
||||
enum Args {
|
||||
GetPageLatestLsn(getpage_latest_lsn::Args),
|
||||
Basebackup(basebackup::Args),
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let args = Args::parse();
|
||||
match args {
|
||||
Args::GetPageLatestLsn(args) => getpage_latest_lsn::main(args),
|
||||
Args::Basebackup(args) => basebackup::main(args),
|
||||
}
|
||||
.unwrap()
|
||||
}
|
||||
@@ -1,2 +0,0 @@
|
||||
pub(crate) mod tenant_timeline_id;
|
||||
pub(crate) mod connstring;
|
||||
@@ -1,8 +0,0 @@
|
||||
pub(crate) fn connstring(host_port: &str, jwt: Option<&str>) -> String {
|
||||
let colon_and_jwt = if let Some(jwt) = jwt {
|
||||
format!(":{jwt}") // TODO: urlescape
|
||||
} else {
|
||||
format!("")
|
||||
};
|
||||
format!("postgres://postgres{colon_and_jwt}@{host_port}")
|
||||
}
|
||||
@@ -1,36 +0,0 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use utils::id::TenantId;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
|
||||
pub(crate) struct TenantTimelineId {
|
||||
pub(crate) tenant_id: TenantId,
|
||||
pub(crate) timeline_id: TimelineId,
|
||||
}
|
||||
|
||||
impl FromStr for TenantTimelineId {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let (tenant_id, timeline_id) = s
|
||||
.split_once("/")
|
||||
.context("tenant and timeline id must be separated by `/`")?;
|
||||
let tenant_id = TenantId::from_str(&tenant_id)
|
||||
.with_context(|| format!("invalid tenant id: {tenant_id:?}"))?;
|
||||
let timeline_id = TimelineId::from_str(&timeline_id)
|
||||
.with_context(|| format!("invalid timeline id: {timeline_id:?}"))?;
|
||||
Ok(Self {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TenantTimelineId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}/{}", self.tenant_id, self.timeline_id)
|
||||
}
|
||||
}
|
||||
@@ -166,111 +166,71 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Gather non-relational files from object storage pages");
|
||||
// Gather non-relational files from object storage pages.
|
||||
for kind in [
|
||||
SlruKind::Clog,
|
||||
SlruKind::MultiXactOffsets,
|
||||
SlruKind::MultiXactMembers,
|
||||
] {
|
||||
async {
|
||||
debug!("list slru segments");
|
||||
for segno in self
|
||||
.timeline
|
||||
.list_slru_segments(kind, self.lsn, self.ctx)
|
||||
.await?
|
||||
{
|
||||
async {
|
||||
debug!("add slru segment");
|
||||
self.add_slru_segment(kind, segno).await?;
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(debug_span!("slru segment", ?segno))
|
||||
.await?;
|
||||
}
|
||||
anyhow::Ok(())
|
||||
for segno in self
|
||||
.timeline
|
||||
.list_slru_segments(kind, self.lsn, self.ctx)
|
||||
.await?
|
||||
{
|
||||
self.add_slru_segment(kind, segno).await?;
|
||||
}
|
||||
.instrument(debug_span!("non-rel file", ?kind))
|
||||
.await?;
|
||||
}
|
||||
|
||||
let mut min_restart_lsn: Lsn = Lsn::MAX;
|
||||
debug!("Create tablespace directories");
|
||||
// Create tablespace directories
|
||||
for ((spcnode, dbnode), has_relmap_file) in
|
||||
self.timeline.list_dbdirs(self.lsn, self.ctx).await?
|
||||
{
|
||||
async {
|
||||
debug!("iter");
|
||||
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
|
||||
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
|
||||
|
||||
// If full backup is requested, include all relation files.
|
||||
// Otherwise only include init forks of unlogged relations.
|
||||
debug!("list rels");
|
||||
let rels = self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?;
|
||||
for &rel in rels.iter() {
|
||||
async {
|
||||
debug!("iter");
|
||||
// Send init fork as main fork to provide well formed empty
|
||||
// contents of UNLOGGED relations. Postgres copies it in
|
||||
// `reinit.c` during recovery.
|
||||
if rel.forknum == INIT_FORKNUM {
|
||||
// I doubt we need _init fork itself, but having it at least
|
||||
// serves as a marker relation is unlogged.
|
||||
self.add_rel(rel, rel).await?;
|
||||
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if self.full_backup {
|
||||
if rel.forknum == MAIN_FORKNUM
|
||||
&& rels.contains(&rel.with_forknum(INIT_FORKNUM))
|
||||
{
|
||||
// skip this, will include it when we reach the init fork
|
||||
return Ok(());
|
||||
}
|
||||
self.add_rel(rel, rel).await?;
|
||||
}
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(debug_span!("process rel", ?rel))
|
||||
.await?;
|
||||
// If full backup is requested, include all relation files.
|
||||
// Otherwise only include init forks of unlogged relations.
|
||||
let rels = self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?;
|
||||
for &rel in rels.iter() {
|
||||
// Send init fork as main fork to provide well formed empty
|
||||
// contents of UNLOGGED relations. Postgres copies it in
|
||||
// `reinit.c` during recovery.
|
||||
if rel.forknum == INIT_FORKNUM {
|
||||
// I doubt we need _init fork itself, but having it at least
|
||||
// serves as a marker relation is unlogged.
|
||||
self.add_rel(rel, rel).await?;
|
||||
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
debug!("list aux files");
|
||||
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
|
||||
async {
|
||||
debug!("iter");
|
||||
if path.starts_with("pg_replslot") {
|
||||
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
|
||||
let restart_lsn = Lsn(u64::from_le_bytes(
|
||||
content[offs..offs + 8].try_into().unwrap(),
|
||||
));
|
||||
info!("Replication slot {} restart LSN={}", path, restart_lsn);
|
||||
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
|
||||
}
|
||||
let header = new_tar_header(&path, content.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, &*content)
|
||||
.await
|
||||
.context("could not add aux file to basebackup tarball")?;
|
||||
anyhow::Ok(())
|
||||
if self.full_backup {
|
||||
if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
|
||||
{
|
||||
// skip this, will include it when we reach the init fork
|
||||
continue;
|
||||
}
|
||||
.instrument(debug_span!("process aux file", ?path))
|
||||
.await?;
|
||||
self.add_rel(rel, rel).await?;
|
||||
}
|
||||
|
||||
debug!("done");
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(debug_span!(
|
||||
"process tablespace directory",
|
||||
?spcnode,
|
||||
?dbnode
|
||||
))
|
||||
.await?;
|
||||
|
||||
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
|
||||
if path.starts_with("pg_replslot") {
|
||||
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
|
||||
let restart_lsn = Lsn(u64::from_le_bytes(
|
||||
content[offs..offs + 8].try_into().unwrap(),
|
||||
));
|
||||
info!("Replication slot {} restart LSN={}", path, restart_lsn);
|
||||
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
|
||||
}
|
||||
let header = new_tar_header(&path, content.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, &*content)
|
||||
.await
|
||||
.context("could not add aux file to basebackup tarball")?;
|
||||
}
|
||||
}
|
||||
if min_restart_lsn != Lsn::MAX {
|
||||
info!(
|
||||
@@ -284,25 +244,19 @@ where
|
||||
.await
|
||||
.context("could not add restart.lsn file to basebackup tarball")?;
|
||||
}
|
||||
debug!("list twophase files");
|
||||
for xid in self
|
||||
.timeline
|
||||
.list_twophase_files(self.lsn, self.ctx)
|
||||
.await?
|
||||
{
|
||||
async {
|
||||
self.add_twophase_file(xid).await?;
|
||||
anyhow::Ok(())
|
||||
}
|
||||
.instrument(debug_span!("process twophase file", ?xid))
|
||||
.await?;
|
||||
self.add_twophase_file(xid).await?;
|
||||
}
|
||||
|
||||
fail_point!("basebackup-before-control-file", |_| {
|
||||
bail!("failpoint basebackup-before-control-file")
|
||||
});
|
||||
|
||||
debug!("Generate pg_control and bootstrap WAL segment.");
|
||||
// Generate pg_control and bootstrap WAL segment.
|
||||
self.add_pgcontrol_file().await?;
|
||||
self.ar.finish().await?;
|
||||
debug!("all tarred up!");
|
||||
|
||||
@@ -103,7 +103,7 @@ fn main() -> anyhow::Result<()> {
|
||||
} else {
|
||||
TracingErrorLayerEnablement::Disabled
|
||||
};
|
||||
let _guard = logging::init(
|
||||
logging::init(
|
||||
conf.log_format,
|
||||
tracing_error_layer_enablement,
|
||||
logging::Output::Stdout,
|
||||
@@ -625,6 +625,7 @@ fn start_pageserver(
|
||||
conf.synthetic_size_calculation_interval,
|
||||
conf.id,
|
||||
local_disk_storage,
|
||||
cancel,
|
||||
metrics_ctx,
|
||||
)
|
||||
.instrument(info_span!("metrics_collection"))
|
||||
|
||||
@@ -1,2 +0,0 @@
|
||||
pub mod mgmt_api;
|
||||
pub mod page_service;
|
||||
@@ -1,89 +0,0 @@
|
||||
use anyhow::Context;
|
||||
|
||||
use hyper::{client::HttpConnector, Uri};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
pub struct Client {
|
||||
mgmt_api_endpoint: String,
|
||||
authorization_header: Option<String>,
|
||||
client: hyper::Client<HttpConnector, hyper::Body>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
|
||||
Self {
|
||||
mgmt_api_endpoint,
|
||||
authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
|
||||
client: hyper::client::Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list_tenants(&self) -> anyhow::Result<Vec<pageserver_api::models::TenantInfo>> {
|
||||
let uri = Uri::try_from(format!("{}/v1/tenant", self.mgmt_api_endpoint))?;
|
||||
let resp = self.get(uri).await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("status error");
|
||||
}
|
||||
let body = hyper::body::to_bytes(resp).await?;
|
||||
Ok(serde_json::from_slice(&body)?)
|
||||
}
|
||||
|
||||
pub async fn list_timelines(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
) -> anyhow::Result<Vec<pageserver_api::models::TimelineInfo>> {
|
||||
let uri = Uri::try_from(format!(
|
||||
"{}/v1/tenant/{tenant_id}/timeline",
|
||||
self.mgmt_api_endpoint
|
||||
))?;
|
||||
let resp = self.get(uri).await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("status error");
|
||||
}
|
||||
let body = hyper::body::to_bytes(resp).await?;
|
||||
Ok(serde_json::from_slice(&body)?)
|
||||
}
|
||||
|
||||
pub async fn timeline_info(
|
||||
&self, tenant_id: TenantId, timeline_id: TimelineId,
|
||||
) -> anyhow::Result<pageserver_api::models::TimelineInfo> {
|
||||
let uri = Uri::try_from(format!(
|
||||
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
|
||||
self.mgmt_api_endpoint
|
||||
))?;
|
||||
let resp = self.get(uri).await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("status error");
|
||||
}
|
||||
let body = hyper::body::to_bytes(resp).await?;
|
||||
Ok(serde_json::from_slice(&body)?)
|
||||
}
|
||||
|
||||
pub async fn keyspace(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<crate::http::models::partitioning::Partitioning> {
|
||||
let uri = Uri::try_from(format!(
|
||||
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace?check_serialization_roundtrip=true",
|
||||
self.mgmt_api_endpoint
|
||||
))?;
|
||||
let resp = self.get(uri).await?;
|
||||
if !resp.status().is_success() {
|
||||
anyhow::bail!("status error");
|
||||
}
|
||||
let body = hyper::body::to_bytes(resp).await?;
|
||||
Ok(serde_json::from_slice(&body).context("deserialize")?)
|
||||
}
|
||||
|
||||
async fn get(&self, uri: Uri) -> hyper::Result<hyper::Response<hyper::Body>> {
|
||||
let req = hyper::Request::builder().uri(uri).method("GET");
|
||||
let req = if let Some(value) = &self.authorization_header {
|
||||
req.header("Authorization", value)
|
||||
} else {
|
||||
req
|
||||
};
|
||||
let req = req.body(hyper::Body::default());
|
||||
self.client.request(req.unwrap()).await
|
||||
}
|
||||
}
|
||||
@@ -1,145 +0,0 @@
|
||||
use std::pin::Pin;
|
||||
|
||||
use futures::SinkExt;
|
||||
use pageserver_api::{
|
||||
models::{
|
||||
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
|
||||
PagestreamGetPageResponse,
|
||||
},
|
||||
reltag::RelTag,
|
||||
};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_postgres::CopyOutStream;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
pub struct Client {
|
||||
client: tokio_postgres::Client,
|
||||
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
|
||||
conn_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
pub struct BasebackupRequest {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub lsn: Option<Lsn>,
|
||||
pub gzip: bool,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub async fn new(connstring: String) -> anyhow::Result<Self> {
|
||||
let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?;
|
||||
|
||||
let conn_task_cancel = CancellationToken::new();
|
||||
let conn_task = tokio::spawn({
|
||||
let conn_task_cancel = conn_task_cancel.clone();
|
||||
async move {
|
||||
tokio::select! {
|
||||
_ = conn_task_cancel.cancelled() => { }
|
||||
res = connection => {
|
||||
res.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(Self {
|
||||
cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
|
||||
conn_task,
|
||||
client,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn pagestream(
|
||||
self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> anyhow::Result<PagestreamClient> {
|
||||
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
|
||||
.client
|
||||
.copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
|
||||
.await?;
|
||||
let Client {
|
||||
cancel_on_client_drop,
|
||||
conn_task,
|
||||
client: _,
|
||||
} = self;
|
||||
Ok(PagestreamClient {
|
||||
copy_both: Box::pin(copy_both),
|
||||
conn_task,
|
||||
cancel_on_client_drop,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result<CopyOutStream> {
|
||||
let BasebackupRequest {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
gzip,
|
||||
} = req;
|
||||
let mut args = Vec::with_capacity(5);
|
||||
args.push("basebackup".to_string());
|
||||
args.push(format!("{tenant_id}"));
|
||||
args.push(format!("{timeline_id}"));
|
||||
if let Some(lsn) = lsn {
|
||||
args.push(format!("{lsn}"));
|
||||
}
|
||||
if *gzip {
|
||||
args.push(format!("--gzip"))
|
||||
}
|
||||
Ok(self.client.copy_out(&args.join(" ")).await?)
|
||||
}
|
||||
}
|
||||
|
||||
/// Create using [`Client::pagestream`].
|
||||
pub struct PagestreamClient {
|
||||
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
|
||||
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
|
||||
conn_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
pub struct RelTagBlockNo {
|
||||
pub rel_tag: RelTag,
|
||||
pub block_no: u32,
|
||||
}
|
||||
|
||||
impl PagestreamClient {
|
||||
pub async fn shutdown(mut self) {
|
||||
let _ = self.cancel_on_client_drop.take();
|
||||
self.conn_task.await.unwrap();
|
||||
}
|
||||
|
||||
pub async fn getpage(
|
||||
&mut self,
|
||||
key: RelTagBlockNo,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
let req = PagestreamGetPageRequest {
|
||||
latest: false,
|
||||
rel: key.rel_tag,
|
||||
blkno: key.block_no,
|
||||
lsn,
|
||||
};
|
||||
let req = PagestreamFeMessage::GetPage(req);
|
||||
let req: bytes::Bytes = req.serialize();
|
||||
// let mut req = tokio_util::io::ReaderStream::new(&req);
|
||||
let mut req = tokio_stream::once(Ok(req));
|
||||
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next = next.unwrap().unwrap();
|
||||
|
||||
match PagestreamBeMessage::deserialize(next)? {
|
||||
PagestreamBeMessage::Exists(_) => todo!(),
|
||||
PagestreamBeMessage::Nblocks(_) => todo!(),
|
||||
PagestreamBeMessage::GetPage(p) => Ok(p),
|
||||
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
|
||||
PagestreamBeMessage::DbSize(_) => todo!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,7 +3,7 @@
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::tasks::BackgroundLoopKind;
|
||||
use crate::tenant::{mgr, LogicalSizeCalculationCause};
|
||||
use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError};
|
||||
use camino::Utf8PathBuf;
|
||||
use consumption_metrics::EventType;
|
||||
use pageserver_api::models::TenantState;
|
||||
@@ -12,6 +12,7 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::id::NodeId;
|
||||
|
||||
@@ -37,6 +38,7 @@ type RawMetric = (MetricsKey, (EventType, u64));
|
||||
type Cache = HashMap<MetricsKey, (EventType, u64)>;
|
||||
|
||||
/// Main thread that serves metrics collection
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn collect_metrics(
|
||||
metric_collection_endpoint: &Url,
|
||||
metric_collection_interval: Duration,
|
||||
@@ -44,6 +46,7 @@ pub async fn collect_metrics(
|
||||
synthetic_size_calculation_interval: Duration,
|
||||
node_id: NodeId,
|
||||
local_disk_storage: Utf8PathBuf,
|
||||
cancel: CancellationToken,
|
||||
ctx: RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
if _cached_metric_collection_interval != Duration::ZERO {
|
||||
@@ -63,9 +66,13 @@ pub async fn collect_metrics(
|
||||
"synthetic size calculation",
|
||||
false,
|
||||
async move {
|
||||
calculate_synthetic_size_worker(synthetic_size_calculation_interval, &worker_ctx)
|
||||
.instrument(info_span!("synthetic_size_worker"))
|
||||
.await?;
|
||||
calculate_synthetic_size_worker(
|
||||
synthetic_size_calculation_interval,
|
||||
&cancel,
|
||||
&worker_ctx,
|
||||
)
|
||||
.instrument(info_span!("synthetic_size_worker"))
|
||||
.await?;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
@@ -241,6 +248,7 @@ async fn reschedule(
|
||||
/// Caclculate synthetic size for each active tenant
|
||||
async fn calculate_synthetic_size_worker(
|
||||
synthetic_size_calculation_interval: Duration,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("starting calculate_synthetic_size_worker");
|
||||
@@ -261,7 +269,7 @@ async fn calculate_synthetic_size_worker(
|
||||
}
|
||||
};
|
||||
|
||||
for (tenant_id, tenant_state, _gen) in tenants {
|
||||
for (tenant_id, tenant_state) in tenants {
|
||||
if tenant_state != TenantState::Active {
|
||||
continue;
|
||||
}
|
||||
@@ -272,7 +280,12 @@ async fn calculate_synthetic_size_worker(
|
||||
// Same for the loop that fetches computed metrics.
|
||||
// By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
|
||||
// which turns out is really handy to understand the system.
|
||||
if let Err(e) = tenant.calculate_synthetic_size(cause, ctx).await {
|
||||
if let Err(e) = tenant.calculate_synthetic_size(cause, cancel, ctx).await {
|
||||
if let Some(PageReconstructError::Cancelled) =
|
||||
e.downcast_ref::<PageReconstructError>()
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
error!("failed to calculate synthetic size for tenant {tenant_id}: {e:#}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -197,7 +197,7 @@ pub(super) async fn collect_all_metrics(
|
||||
}
|
||||
};
|
||||
|
||||
let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
|
||||
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
|
||||
if state != TenantState::Active {
|
||||
None
|
||||
} else {
|
||||
|
||||
@@ -345,7 +345,7 @@ impl DeletionList {
|
||||
result.extend(
|
||||
timeline_layers
|
||||
.into_iter()
|
||||
.map(|l| timeline_remote_path.join(Utf8PathBuf::from(l))),
|
||||
.map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -513,6 +513,7 @@ impl DeletionQueueClient {
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
if current_generation.is_none() {
|
||||
debug!("Enqueuing deletions in legacy mode, skipping queue");
|
||||
|
||||
let mut layer_paths = Vec::new();
|
||||
for (layer, generation) in layers {
|
||||
layer_paths.push(remote_layer_path(
|
||||
|
||||
@@ -541,7 +541,7 @@ async fn collect_eviction_candidates(
|
||||
|
||||
let mut candidates = Vec::new();
|
||||
|
||||
for (tenant_id, _state, _gen) in &tenants {
|
||||
for (tenant_id, _state) in &tenants {
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(EvictionCandidates::Cancelled);
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
pub mod routes;
|
||||
pub use routes::make_router;
|
||||
|
||||
pub mod models;
|
||||
pub use pageserver_api::models;
|
||||
|
||||
@@ -1,3 +0,0 @@
|
||||
//! If possible, use `::pageserver_api::models` instead.
|
||||
|
||||
pub mod partitioning;
|
||||
@@ -1,112 +0,0 @@
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct Partitioning {
|
||||
pub keys: crate::keyspace::KeySpace,
|
||||
|
||||
pub at_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl serde::Serialize for Partitioning {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
pub struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
|
||||
|
||||
impl<'a> serde::Serialize for KeySpace<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeSeq;
|
||||
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
|
||||
for kr in &self.0.ranges {
|
||||
seq.serialize_element(&KeyRange(kr))?;
|
||||
}
|
||||
seq.end()
|
||||
}
|
||||
}
|
||||
|
||||
use serde::ser::SerializeMap;
|
||||
let mut map = serializer.serialize_map(Some(2))?;
|
||||
map.serialize_key("keys")?;
|
||||
map.serialize_value(&KeySpace(&self.keys))?;
|
||||
map.serialize_key("at_lsn")?;
|
||||
map.serialize_value(&WithDisplay(&self.at_lsn))?;
|
||||
map.end()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WithDisplay<'a, T>(&'a T);
|
||||
|
||||
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
serializer.collect_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
|
||||
|
||||
impl<'a> serde::Serialize for KeyRange<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeTuple;
|
||||
let mut t = serializer.serialize_tuple(2)?;
|
||||
t.serialize_element(&WithDisplay(&self.0.start))?;
|
||||
t.serialize_element(&WithDisplay(&self.0.end))?;
|
||||
t.end()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> serde::Deserialize<'a> for Partitioning {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'a>,
|
||||
{
|
||||
pub struct KeySpace(crate::keyspace::KeySpace);
|
||||
|
||||
impl<'de> serde::Deserialize<'de> for KeySpace {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
#[serde_with::serde_as]
|
||||
#[derive(serde::Deserialize)]
|
||||
#[serde(transparent)]
|
||||
struct Key(#[serde_as(as = "serde_with::DisplayFromStr")] crate::repository::Key);
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(serde::Deserialize)]
|
||||
struct Range(Key, Key);
|
||||
|
||||
let ranges: Vec<Range> = serde::Deserialize::deserialize(deserializer)?;
|
||||
Ok(Self(crate::keyspace::KeySpace {
|
||||
ranges: ranges
|
||||
.into_iter()
|
||||
.map(|Range(start, end)| (start.0..end.0))
|
||||
.collect(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_with::serde_as]
|
||||
#[derive(serde::Deserialize)]
|
||||
struct De {
|
||||
keys: KeySpace,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
at_lsn: Lsn,
|
||||
}
|
||||
|
||||
let de: De = serde::Deserialize::deserialize(deserializer)?;
|
||||
Ok(Self {
|
||||
at_lsn: de.at_lsn,
|
||||
keys: de.keys.0,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use enumset::EnumSet;
|
||||
use futures::TryFutureExt;
|
||||
use humantime::format_rfc3339;
|
||||
use hyper::header;
|
||||
@@ -26,6 +27,10 @@ use utils::http::endpoint::request_span;
|
||||
use utils::http::json::json_request_or_empty_body;
|
||||
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
|
||||
|
||||
use super::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
|
||||
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
|
||||
};
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
|
||||
@@ -38,14 +43,11 @@ use crate::tenant::mgr::{
|
||||
};
|
||||
use crate::tenant::size::ModelInputs;
|
||||
use crate::tenant::storage_layer::LayerAccessStatsReset;
|
||||
use crate::tenant::timeline::CompactFlags;
|
||||
use crate::tenant::timeline::Timeline;
|
||||
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSharedResources};
|
||||
use crate::{config::PageServerConf, tenant::mgr};
|
||||
use crate::{disk_usage_eviction_task, tenant};
|
||||
use pageserver_api::models::{
|
||||
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
|
||||
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
|
||||
};
|
||||
use utils::{
|
||||
auth::SwappableJwtAuth,
|
||||
generation::Generation,
|
||||
@@ -61,7 +63,7 @@ use utils::{
|
||||
};
|
||||
|
||||
// Imports only used for testing APIs
|
||||
use pageserver_api::models::ConfigureFailpointsRequest;
|
||||
use super::models::ConfigureFailpointsRequest;
|
||||
|
||||
pub struct State {
|
||||
conf: &'static PageServerConf,
|
||||
@@ -548,7 +550,7 @@ async fn timeline_detail_handler(
|
||||
|
||||
async fn get_lsn_by_timestamp_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
@@ -564,7 +566,9 @@ async fn get_lsn_by_timestamp_handler(
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
let result = timeline.find_lsn_for_timestamp(timestamp_pg, &ctx).await?;
|
||||
let result = timeline
|
||||
.find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx)
|
||||
.await?;
|
||||
|
||||
if version.unwrap_or(0) > 1 {
|
||||
#[derive(serde::Serialize)]
|
||||
@@ -764,12 +768,11 @@ async fn tenant_list_handler(
|
||||
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
|
||||
})?
|
||||
.iter()
|
||||
.map(|(id, state, gen)| TenantInfo {
|
||||
.map(|(id, state)| TenantInfo {
|
||||
id: *id,
|
||||
state: state.clone(),
|
||||
current_physical_size: None,
|
||||
attachment_status: state.attachment_status(),
|
||||
generation: (*gen).into(),
|
||||
})
|
||||
.collect::<Vec<TenantInfo>>();
|
||||
|
||||
@@ -798,7 +801,6 @@ async fn tenant_status(
|
||||
state: state.clone(),
|
||||
current_physical_size: Some(current_physical_size),
|
||||
attachment_status: state.attachment_status(),
|
||||
generation: tenant.generation().into(),
|
||||
})
|
||||
}
|
||||
.instrument(info_span!("tenant_status_handler", %tenant_id))
|
||||
@@ -842,7 +844,7 @@ async fn tenant_delete_handler(
|
||||
/// without modifying anything anyway.
|
||||
async fn tenant_size_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
@@ -858,6 +860,7 @@ async fn tenant_size_handler(
|
||||
.gather_size_inputs(
|
||||
retention_period,
|
||||
LogicalSizeCalculationCause::TenantSizeHandler,
|
||||
&cancel,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1242,7 +1245,7 @@ async fn failpoints_handler(
|
||||
// Run GC immediately on given timeline.
|
||||
async fn timeline_gc_handler(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
@@ -1251,7 +1254,7 @@ async fn timeline_gc_handler(
|
||||
let gc_req: TimelineGcRequest = json_request(&mut request).await?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let wait_task_done = mgr::immediate_gc(tenant_id, timeline_id, gc_req, &ctx).await?;
|
||||
let wait_task_done = mgr::immediate_gc(tenant_id, timeline_id, gc_req, cancel, &ctx).await?;
|
||||
let gc_result = wait_task_done
|
||||
.await
|
||||
.context("wait for gc task")
|
||||
@@ -1270,11 +1273,15 @@ async fn timeline_compact_handler(
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let mut flags = EnumSet::empty();
|
||||
if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
|
||||
flags |= CompactFlags::ForceRepartition;
|
||||
}
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
timeline
|
||||
.compact(&cancel, &ctx)
|
||||
.compact(&cancel, flags, &ctx)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
json_response(StatusCode::OK, ())
|
||||
@@ -1291,6 +1298,11 @@ async fn timeline_checkpoint_handler(
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let mut flags = EnumSet::empty();
|
||||
if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? {
|
||||
flags |= CompactFlags::ForceRepartition;
|
||||
}
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
@@ -1299,7 +1311,7 @@ async fn timeline_checkpoint_handler(
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
timeline
|
||||
.compact(&cancel, &ctx)
|
||||
.compact(&cancel, flags, &ctx)
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
@@ -1424,10 +1436,70 @@ async fn timeline_collect_keyspace(
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
|
||||
struct Partitioning {
|
||||
keys: crate::keyspace::KeySpace,
|
||||
|
||||
let check_serialization_roundtrip: bool =
|
||||
parse_query_param(&request, "check_serialization_roundtrip")?.unwrap_or(false);
|
||||
at_lsn: Lsn,
|
||||
}
|
||||
|
||||
impl serde::Serialize for Partitioning {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut map = serializer.serialize_map(Some(2))?;
|
||||
map.serialize_key("keys")?;
|
||||
map.serialize_value(&KeySpace(&self.keys))?;
|
||||
map.serialize_key("at_lsn")?;
|
||||
map.serialize_value(&WithDisplay(&self.at_lsn))?;
|
||||
map.end()
|
||||
}
|
||||
}
|
||||
|
||||
struct WithDisplay<'a, T>(&'a T);
|
||||
|
||||
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
serializer.collect_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
|
||||
|
||||
impl<'a> serde::Serialize for KeySpace<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeSeq;
|
||||
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
|
||||
for kr in &self.0.ranges {
|
||||
seq.serialize_element(&KeyRange(kr))?;
|
||||
}
|
||||
seq.end()
|
||||
}
|
||||
}
|
||||
|
||||
struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
|
||||
|
||||
impl<'a> serde::Serialize for KeyRange<'a> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
use serde::ser::SerializeTuple;
|
||||
let mut t = serializer.serialize_tuple(2)?;
|
||||
t.serialize_element(&WithDisplay(&self.0.start))?;
|
||||
t.serialize_element(&WithDisplay(&self.0.end))?;
|
||||
t.end()
|
||||
}
|
||||
}
|
||||
|
||||
let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
|
||||
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
@@ -1438,20 +1510,7 @@ async fn timeline_collect_keyspace(
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?;
|
||||
|
||||
let res = crate::http::models::partitioning::Partitioning { keys, at_lsn };
|
||||
if check_serialization_roundtrip {
|
||||
(|| {
|
||||
let ser = serde_json::ser::to_vec(&res).context("serialize")?;
|
||||
let de: crate::http::models::partitioning::Partitioning =
|
||||
serde_json::from_slice(&ser).context("deserialize")?;
|
||||
anyhow::ensure!(de == res, "not equal");
|
||||
info!("passed serialization rountrip check");
|
||||
Ok(())
|
||||
})()
|
||||
.context("serialization rountrip")
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
}
|
||||
json_response(StatusCode::OK, res)
|
||||
json_response(StatusCode::OK, Partitioning { keys, at_lsn })
|
||||
}
|
||||
.instrument(info_span!("timeline_collect_keyspace", %tenant_id, %timeline_id))
|
||||
.await
|
||||
@@ -1630,8 +1689,24 @@ where
|
||||
let token_cloned = token.clone();
|
||||
let result = handler(r, token).await;
|
||||
if token_cloned.is_cancelled() {
|
||||
info!("Cancelled request finished");
|
||||
// dropguard has executed: we will never turn this result into response.
|
||||
//
|
||||
// at least temporarily do {:?} logging; these failures are rare enough but
|
||||
// could hide difficult errors.
|
||||
match &result {
|
||||
Ok(response) => {
|
||||
let status = response.status();
|
||||
info!(%status, "Cancelled request finished successfully")
|
||||
}
|
||||
Err(e) => error!("Cancelled request finished with an error: {e:?}"),
|
||||
}
|
||||
}
|
||||
// only logging for cancelled panicked request handlers is the tracing_panic_hook,
|
||||
// which should suffice.
|
||||
//
|
||||
// there is still a chance to lose the result due to race between
|
||||
// returning from here and the actual connection closing happening
|
||||
// before outer task gets to execute. leaving that up for #5815.
|
||||
result
|
||||
}
|
||||
.in_current_span(),
|
||||
|
||||
@@ -3,18 +3,25 @@
|
||||
//! a neon Timeline.
|
||||
//!
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
use std::task::{self, Poll};
|
||||
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use async_compression::{tokio::write::ZstdEncoder, zstd::CParameter, Level};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use futures::StreamExt;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt};
|
||||
use nix::NixPath;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
use tokio_tar::Archive;
|
||||
use tokio_tar::Builder;
|
||||
use tokio_tar::HeaderMode;
|
||||
use tracing::*;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::*;
|
||||
use crate::tenant::remote_timeline_client::INITDB_PATH;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::walingest::WalIngest;
|
||||
use crate::walrecord::DecodedWALRecord;
|
||||
@@ -33,7 +40,9 @@ use utils::lsn::Lsn;
|
||||
pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
|
||||
// Read control file to extract the LSN
|
||||
let controlfile_path = path.join("global").join("pg_control");
|
||||
let controlfile = ControlFileData::decode(&std::fs::read(controlfile_path)?)?;
|
||||
let controlfile_buf = std::fs::read(&controlfile_path)
|
||||
.with_context(|| format!("reading controlfile: {controlfile_path}"))?;
|
||||
let controlfile = ControlFileData::decode(&controlfile_buf)?;
|
||||
let lsn = controlfile.checkPoint;
|
||||
|
||||
Ok(Lsn(lsn))
|
||||
@@ -618,3 +627,108 @@ async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes>
|
||||
reader.read_to_end(&mut buf).await?;
|
||||
Ok(Bytes::from(buf))
|
||||
}
|
||||
|
||||
/// An in-memory buffer implementing `AsyncWrite`, inserting yields every now and then
|
||||
///
|
||||
/// The number of yields is bounded by above by the number of times poll_write is called,
|
||||
/// so calling it with 8 KB chunks and 8 MB chunks gives the same number of yields in total.
|
||||
/// This is an explicit choice as the `YieldingVec` is meant to give the async executor
|
||||
/// breathing room between units of CPU intensive preparation of buffers to be written.
|
||||
/// Once a write call is issued, the whole buffer has been prepared already, so there is no
|
||||
/// gain in splitting up the memcopy further.
|
||||
struct YieldingVec {
|
||||
yield_budget: usize,
|
||||
// the buffer written into
|
||||
buf: Vec<u8>,
|
||||
}
|
||||
|
||||
impl YieldingVec {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
yield_budget: 0,
|
||||
buf: Vec::new(),
|
||||
}
|
||||
}
|
||||
// Whether we should yield for a read operation of given size
|
||||
fn should_yield(&mut self, add_buf_len: usize) -> bool {
|
||||
// Set this limit to a small value so that we are a
|
||||
// good async citizen and yield repeatedly (but not
|
||||
// too often for many small writes to cause many yields)
|
||||
const YIELD_DIST: usize = 1024;
|
||||
|
||||
let target_buf_len = self.buf.len() + add_buf_len;
|
||||
let ret = self.yield_budget / YIELD_DIST < target_buf_len / YIELD_DIST;
|
||||
if self.yield_budget < target_buf_len {
|
||||
self.yield_budget += add_buf_len;
|
||||
}
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for YieldingVec {
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut task::Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<std::io::Result<usize>> {
|
||||
if self.should_yield(buf.len()) {
|
||||
cx.waker().wake_by_ref();
|
||||
return Poll::Pending;
|
||||
}
|
||||
self.get_mut().buf.extend_from_slice(buf);
|
||||
Poll::Ready(Ok(buf.len()))
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<std::io::Result<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut task::Context<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_tar_zst(pgdata_path: &Utf8Path) -> Result<Vec<u8>> {
|
||||
let mut paths = Vec::new();
|
||||
for entry in WalkDir::new(pgdata_path) {
|
||||
let entry = entry?;
|
||||
let metadata = entry.metadata().expect("error getting dir entry metadata");
|
||||
// Also allow directories so that we also get empty directories
|
||||
if !(metadata.is_file() || metadata.is_dir()) {
|
||||
continue;
|
||||
}
|
||||
let path = entry.into_path();
|
||||
paths.push(path);
|
||||
}
|
||||
// Do a sort to get a more consistent listing
|
||||
paths.sort_unstable();
|
||||
let zstd = ZstdEncoder::with_quality_and_params(
|
||||
YieldingVec::new(),
|
||||
Level::Default,
|
||||
&[CParameter::enable_long_distance_matching(true)],
|
||||
);
|
||||
let mut builder = Builder::new(zstd);
|
||||
// Use reproducible header mode
|
||||
builder.mode(HeaderMode::Deterministic);
|
||||
for path in paths {
|
||||
let rel_path = path.strip_prefix(pgdata_path)?;
|
||||
if rel_path.is_empty() {
|
||||
// The top directory should not be compressed,
|
||||
// the tar crate doesn't like that
|
||||
continue;
|
||||
}
|
||||
builder.append_path_with_name(&path, rel_path).await?;
|
||||
}
|
||||
let mut zstd = builder.into_inner().await?;
|
||||
zstd.shutdown().await?;
|
||||
let compressed = zstd.into_inner();
|
||||
let compressed_len = compressed.buf.len();
|
||||
const INITDB_TAR_ZST_WARN_LIMIT: usize = 2_000_000;
|
||||
if compressed_len > INITDB_TAR_ZST_WARN_LIMIT {
|
||||
warn!("compressed {INITDB_PATH} size of {compressed_len} is above limit {INITDB_TAR_ZST_WARN_LIMIT}.");
|
||||
}
|
||||
Ok(compressed.buf)
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::ops::Range;
|
||||
///
|
||||
/// Represents a set of Keys, in a compact form.
|
||||
///
|
||||
#[derive(Clone, Debug, Default, PartialEq, Eq)]
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct KeySpace {
|
||||
/// Contiguous ranges of keys that belong to the key space. In key order,
|
||||
/// and with no overlap.
|
||||
|
||||
@@ -25,7 +25,6 @@ pub mod walingest;
|
||||
pub mod walrecord;
|
||||
pub mod walredo;
|
||||
|
||||
pub mod client;
|
||||
pub mod failpoint_support;
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
@@ -21,6 +21,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, trace, warn};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
@@ -365,6 +366,7 @@ impl Timeline {
|
||||
pub async fn find_lsn_for_timestamp(
|
||||
&self,
|
||||
search_timestamp: TimestampTz,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<LsnForTimestamp, PageReconstructError> {
|
||||
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
|
||||
@@ -383,6 +385,9 @@ impl Timeline {
|
||||
let mut found_smaller = false;
|
||||
let mut found_larger = false;
|
||||
while low < high {
|
||||
if cancel.is_cancelled() {
|
||||
return Err(PageReconstructError::Cancelled);
|
||||
}
|
||||
// cannot overflow, high and low are both smaller than u64::MAX / 2
|
||||
let mid = (high + low) / 2;
|
||||
|
||||
@@ -1749,7 +1754,6 @@ const AUX_FILES_KEY: Key = Key {
|
||||
// Reverse mappings for a few Keys.
|
||||
// These are needed by WAL redo manager.
|
||||
|
||||
/// Guaranteed to return `Ok()` if [[is_rel_block_key]] returns `true` for `key`.
|
||||
pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
|
||||
Ok(match key.field1 {
|
||||
0x00 => (
|
||||
@@ -1765,8 +1769,7 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
|
||||
})
|
||||
}
|
||||
|
||||
/// See [[key_to_rel_block]].
|
||||
pub fn is_rel_block_key(key: Key) -> bool {
|
||||
fn is_rel_block_key(key: Key) -> bool {
|
||||
key.field1 == 0x00 && key.field4 != 0
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,9 @@
|
||||
//!
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use bytes::Bytes;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use enumset::EnumSet;
|
||||
use futures::FutureExt;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use remote_storage::DownloadError;
|
||||
@@ -23,6 +25,7 @@ use tokio::sync::watch;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::backoff;
|
||||
use utils::completion;
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::fs_ext;
|
||||
@@ -1629,6 +1632,7 @@ impl Tenant {
|
||||
target_timeline_id: Option<TimelineId>,
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<GcResult> {
|
||||
// Don't start doing work during shutdown
|
||||
@@ -1651,7 +1655,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
self.gc_iteration_internal(target_timeline_id, horizon, pitr, ctx)
|
||||
self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -1699,7 +1703,7 @@ impl Tenant {
|
||||
|
||||
for (timeline_id, timeline) in &timelines_to_compact {
|
||||
timeline
|
||||
.compact(cancel, ctx)
|
||||
.compact(cancel, EnumSet::empty(), ctx)
|
||||
.instrument(info_span!("compact_timeline", %timeline_id))
|
||||
.await?;
|
||||
}
|
||||
@@ -1715,10 +1719,6 @@ impl Tenant {
|
||||
self.current_state() == TenantState::Active
|
||||
}
|
||||
|
||||
pub fn generation(&self) -> Generation {
|
||||
self.generation
|
||||
}
|
||||
|
||||
/// Changes tenant status to active, unless shutdown was already requested.
|
||||
///
|
||||
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
|
||||
@@ -1858,6 +1858,7 @@ impl Tenant {
|
||||
});
|
||||
})
|
||||
};
|
||||
// test_long_timeline_create_then_tenant_delete is leaning on this message
|
||||
tracing::info!("Waiting for timelines...");
|
||||
while let Some(res) = js.join_next().await {
|
||||
match res {
|
||||
@@ -2572,14 +2573,30 @@ impl Tenant {
|
||||
target_timeline_id: Option<TimelineId>,
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<GcResult> {
|
||||
let mut totals: GcResult = Default::default();
|
||||
let now = Instant::now();
|
||||
|
||||
let gc_timelines = self
|
||||
.refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx)
|
||||
.await?;
|
||||
let gc_timelines = match self
|
||||
.refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
|
||||
.await
|
||||
{
|
||||
Ok(result) => result,
|
||||
Err(e) => {
|
||||
if let Some(PageReconstructError::Cancelled) =
|
||||
e.downcast_ref::<PageReconstructError>()
|
||||
{
|
||||
// Handle cancellation
|
||||
totals.elapsed = now.elapsed();
|
||||
return Ok(totals);
|
||||
} else {
|
||||
// Propagate other errors
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
crate::failpoint_support::sleep_millis_async!(
|
||||
"gc_iteration_internal_after_getting_gc_timelines"
|
||||
@@ -2603,7 +2620,7 @@ impl Tenant {
|
||||
// See comments in [`Tenant::branch_timeline`] for more information
|
||||
// about why branch creation task can run concurrently with timeline's GC iteration.
|
||||
for timeline in gc_timelines {
|
||||
if task_mgr::is_shutdown_requested() {
|
||||
if task_mgr::is_shutdown_requested() || cancel.is_cancelled() {
|
||||
// We were requested to shut down. Stop and return with the progress we
|
||||
// made.
|
||||
break;
|
||||
@@ -2623,6 +2640,7 @@ impl Tenant {
|
||||
/// This is usually executed as part of periodic gc, but can now be triggered more often.
|
||||
pub async fn refresh_gc_info(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<Arc<Timeline>>> {
|
||||
// since this method can now be called at different rates than the configured gc loop, it
|
||||
@@ -2634,7 +2652,7 @@ impl Tenant {
|
||||
// refresh all timelines
|
||||
let target_timeline_id = None;
|
||||
|
||||
self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx)
|
||||
self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -2643,6 +2661,7 @@ impl Tenant {
|
||||
target_timeline_id: Option<TimelineId>,
|
||||
horizon: u64,
|
||||
pitr: Duration,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<Arc<Timeline>>> {
|
||||
// grab mutex to prevent new timelines from being created here.
|
||||
@@ -2716,7 +2735,7 @@ impl Tenant {
|
||||
.map(|&x| x.1)
|
||||
.collect();
|
||||
timeline
|
||||
.update_gc_info(branchpoints, cutoff, pitr, ctx)
|
||||
.update_gc_info(branchpoints, cutoff, pitr, cancel, ctx)
|
||||
.await?;
|
||||
|
||||
gc_timelines.push(timeline);
|
||||
@@ -2879,7 +2898,7 @@ impl Tenant {
|
||||
}
|
||||
|
||||
/// - run initdb to init temporary instance and get bootstrap data
|
||||
/// - after initialization complete, remove the temp dir.
|
||||
/// - after initialization completes, tar up the temp dir and upload it to S3.
|
||||
///
|
||||
/// The caller is responsible for activating the returned timeline.
|
||||
async fn bootstrap_timeline(
|
||||
@@ -2920,6 +2939,30 @@ impl Tenant {
|
||||
let pgdata_path = &initdb_path;
|
||||
let pgdata_lsn = import_datadir::get_lsn_from_controlfile(pgdata_path)?.align();
|
||||
|
||||
// Upload the created data dir to S3
|
||||
if let Some(storage) = &self.remote_storage {
|
||||
let pgdata_zstd = import_datadir::create_tar_zst(pgdata_path).await?;
|
||||
let pgdata_zstd = Bytes::from(pgdata_zstd);
|
||||
backoff::retry(
|
||||
|| async {
|
||||
self::remote_timeline_client::upload_initdb_dir(
|
||||
storage,
|
||||
&self.tenant_id,
|
||||
&timeline_id,
|
||||
pgdata_zstd.clone(),
|
||||
)
|
||||
.await
|
||||
},
|
||||
|_| false,
|
||||
3,
|
||||
u32::MAX,
|
||||
"persist_initdb_tar_zst",
|
||||
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
|
||||
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Import the contents of the data directory at the initial checkpoint
|
||||
// LSN, and any WAL after that.
|
||||
// Initdb lsn will be equal to last_record_lsn which will be set after import.
|
||||
@@ -3129,6 +3172,7 @@ impl Tenant {
|
||||
// (only if it is shorter than the real cutoff).
|
||||
max_retention_period: Option<u64>,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<size::ModelInputs> {
|
||||
let logical_sizes_at_once = self
|
||||
@@ -3151,6 +3195,7 @@ impl Tenant {
|
||||
max_retention_period,
|
||||
&mut shared_cache,
|
||||
cause,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -3163,9 +3208,10 @@ impl Tenant {
|
||||
pub async fn calculate_synthetic_size(
|
||||
&self,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<u64> {
|
||||
let inputs = self.gather_size_inputs(None, cause, ctx).await?;
|
||||
let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
|
||||
|
||||
let size = inputs.calculate()?;
|
||||
|
||||
@@ -3937,7 +3983,13 @@ mod tests {
|
||||
// and compaction works. But it does set the 'cutoff' point so that the cross check
|
||||
// below should fail.
|
||||
tenant
|
||||
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
|
||||
.gc_iteration(
|
||||
Some(TIMELINE_ID),
|
||||
0x10,
|
||||
Duration::ZERO,
|
||||
&CancellationToken::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// try to branch at lsn 25, should fail because we already garbage collected the data
|
||||
@@ -4040,7 +4092,13 @@ mod tests {
|
||||
tline.set_broken("test".to_owned());
|
||||
|
||||
tenant
|
||||
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
|
||||
.gc_iteration(
|
||||
Some(TIMELINE_ID),
|
||||
0x10,
|
||||
Duration::ZERO,
|
||||
&CancellationToken::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// The branchpoints should contain all timelines, even ones marked
|
||||
@@ -4086,7 +4144,13 @@ mod tests {
|
||||
.expect("Should have a local timeline");
|
||||
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
||||
tenant
|
||||
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
|
||||
.gc_iteration(
|
||||
Some(TIMELINE_ID),
|
||||
0x10,
|
||||
Duration::ZERO,
|
||||
&CancellationToken::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
assert!(newtline.get(*TEST_KEY, Lsn(0x25), &ctx).await.is_ok());
|
||||
|
||||
@@ -4114,7 +4178,13 @@ mod tests {
|
||||
|
||||
// run gc on parent
|
||||
tenant
|
||||
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
|
||||
.gc_iteration(
|
||||
Some(TIMELINE_ID),
|
||||
0x10,
|
||||
Duration::ZERO,
|
||||
&CancellationToken::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Check that the data is still accessible on the branch.
|
||||
@@ -4303,7 +4373,9 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline
|
||||
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
@@ -4318,7 +4390,9 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline
|
||||
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
@@ -4333,7 +4407,9 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline
|
||||
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer
|
||||
@@ -4348,7 +4424,9 @@ mod tests {
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline
|
||||
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
|
||||
@@ -4416,10 +4494,18 @@ mod tests {
|
||||
let cutoff = tline.get_last_record_lsn();
|
||||
|
||||
tline
|
||||
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
|
||||
.update_gc_info(
|
||||
Vec::new(),
|
||||
cutoff,
|
||||
Duration::ZERO,
|
||||
&CancellationToken::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline
|
||||
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
|
||||
.await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
@@ -4496,10 +4582,18 @@ mod tests {
|
||||
// Perform a cycle of flush, compact, and GC
|
||||
let cutoff = tline.get_last_record_lsn();
|
||||
tline
|
||||
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
|
||||
.update_gc_info(
|
||||
Vec::new(),
|
||||
cutoff,
|
||||
Duration::ZERO,
|
||||
&CancellationToken::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline
|
||||
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
|
||||
.await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
@@ -4586,10 +4680,18 @@ mod tests {
|
||||
// Perform a cycle of flush, compact, and GC
|
||||
let cutoff = tline.get_last_record_lsn();
|
||||
tline
|
||||
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
|
||||
.update_gc_info(
|
||||
Vec::new(),
|
||||
cutoff,
|
||||
Duration::ZERO,
|
||||
&CancellationToken::new(),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
tline.freeze_and_flush().await?;
|
||||
tline.compact(&CancellationToken::new(), &ctx).await?;
|
||||
tline
|
||||
.compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
|
||||
.await?;
|
||||
tline.gc().await?;
|
||||
}
|
||||
|
||||
|
||||
@@ -20,14 +20,12 @@ use std::io::{Error, ErrorKind};
|
||||
|
||||
impl<'a> BlockCursor<'a> {
|
||||
/// Read a blob into a new buffer.
|
||||
#[tracing::instrument(skip_all, fields(%offset), level = tracing::Level::DEBUG)]
|
||||
pub async fn read_blob(
|
||||
&self,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<u8>, std::io::Error> {
|
||||
let mut buf = Vec::new();
|
||||
tracing::debug!("reading blob");
|
||||
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
@@ -141,7 +141,6 @@ impl<'a> BlockCursor<'a> {
|
||||
/// access to the contents of the page. (For the page cache, the
|
||||
/// lease object represents a lock on the buffer.)
|
||||
#[inline(always)]
|
||||
#[tracing::instrument(skip_all, level = tracing::Level::DEBUG)]
|
||||
pub async fn read_blk(
|
||||
&self,
|
||||
blknum: u32,
|
||||
|
||||
@@ -181,7 +181,6 @@ impl LayerMap {
|
||||
/// NOTE: This only searches the 'historic' layers, *not* the
|
||||
/// 'open' and 'frozen' layers!
|
||||
///
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
|
||||
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
|
||||
let latest_delta = version.delta_coverage.query(key.to_i128());
|
||||
|
||||
@@ -1397,8 +1397,7 @@ pub(crate) enum TenantMapListError {
|
||||
///
|
||||
/// Get list of tenants, for the mgmt API
|
||||
///
|
||||
pub(crate) async fn list_tenants(
|
||||
) -> Result<Vec<(TenantId, TenantState, Generation)>, TenantMapListError> {
|
||||
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||
let tenants = TENANTS.read().unwrap();
|
||||
let m = match &*tenants {
|
||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
||||
@@ -1406,12 +1405,12 @@ pub(crate) async fn list_tenants(
|
||||
};
|
||||
Ok(m.iter()
|
||||
.filter_map(|(id, tenant)| match tenant {
|
||||
TenantSlot::Attached(tenant) => Some((id, tenant.current_state(), tenant.generation())),
|
||||
TenantSlot::Attached(tenant) => Some((id, tenant.current_state())),
|
||||
TenantSlot::Secondary => None,
|
||||
TenantSlot::InProgress(_) => None,
|
||||
})
|
||||
// TODO(sharding): make callers of this function shard-aware
|
||||
.map(|(a, b, c)| (a.tenant_id, b, c))
|
||||
.map(|(k, v)| (k.tenant_id, v))
|
||||
.collect())
|
||||
}
|
||||
|
||||
@@ -1945,6 +1944,7 @@ pub(crate) async fn immediate_gc(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
gc_req: TimelineGcRequest,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
|
||||
let guard = TENANTS.read().unwrap();
|
||||
@@ -1971,7 +1971,7 @@ pub(crate) async fn immediate_gc(
|
||||
async move {
|
||||
fail::fail_point!("immediate_gc_task_pre");
|
||||
let result = tenant
|
||||
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &ctx)
|
||||
.gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
|
||||
.instrument(info_span!("manual_gc", %tenant_id, %timeline_id))
|
||||
.await;
|
||||
// FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
|
||||
|
||||
@@ -190,6 +190,7 @@ use chrono::{NaiveDateTime, Utc};
|
||||
|
||||
use scopeguard::ScopeGuard;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
pub(crate) use upload::upload_initdb_dir;
|
||||
use utils::backoff::{
|
||||
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
|
||||
};
|
||||
@@ -249,6 +250,8 @@ pub(crate) const FAILED_REMOTE_OP_RETRIES: u32 = 10;
|
||||
// retries. Uploads and deletions are retried forever, though.
|
||||
pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
|
||||
|
||||
pub(crate) const INITDB_PATH: &str = "initdb.tar.zst";
|
||||
|
||||
pub enum MaybeDeletedIndexPart {
|
||||
IndexPart(IndexPart),
|
||||
Deleted(IndexPart),
|
||||
@@ -816,7 +819,7 @@ impl RemoteTimelineClient {
|
||||
let mut receiver = {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
self.schedule_barrier(upload_queue)
|
||||
self.schedule_barrier0(upload_queue)
|
||||
};
|
||||
|
||||
if receiver.changed().await.is_err() {
|
||||
@@ -825,7 +828,14 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn schedule_barrier(
|
||||
pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
self.schedule_barrier0(upload_queue);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn schedule_barrier0(
|
||||
self: &Arc<Self>,
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
) -> tokio::sync::watch::Receiver<()> {
|
||||
@@ -1229,16 +1239,18 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
res
|
||||
}
|
||||
UploadOp::Delete(delete) => self
|
||||
.deletion_queue_client
|
||||
.push_layers(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.generation,
|
||||
delete.layers.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e)),
|
||||
UploadOp::Delete(delete) => {
|
||||
pausable_failpoint!("before-delete-layer-pausable");
|
||||
self.deletion_queue_client
|
||||
.push_layers(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.generation,
|
||||
delete.layers.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
}
|
||||
UploadOp::Barrier(_) => {
|
||||
// unreachable. Barrier operations are handled synchronously in
|
||||
// launch_queued_tasks
|
||||
@@ -1528,6 +1540,13 @@ pub fn remote_layer_path(
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_initdb_archive_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
|
||||
RemotePath::from_string(&format!(
|
||||
"tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{INITDB_PATH}"
|
||||
))
|
||||
.expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_index_path(
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
|
||||
@@ -128,6 +128,14 @@ impl IndexPart {
|
||||
pub fn get_disk_consistent_lsn(&self) -> Lsn {
|
||||
self.disk_consistent_lsn
|
||||
}
|
||||
|
||||
pub fn from_s3_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
|
||||
serde_json::from_slice::<IndexPart>(bytes)
|
||||
}
|
||||
|
||||
pub fn to_s3_bytes(&self) -> serde_json::Result<Vec<u8>> {
|
||||
serde_json::to_vec(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&UploadQueueInitialized> for IndexPart {
|
||||
@@ -201,7 +209,7 @@ mod tests {
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
let part = serde_json::from_str::<IndexPart>(example).unwrap();
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
@@ -239,7 +247,7 @@ mod tests {
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
let part = serde_json::from_str::<IndexPart>(example).unwrap();
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
@@ -279,7 +287,7 @@ mod tests {
|
||||
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
|
||||
};
|
||||
|
||||
let part = serde_json::from_str::<IndexPart>(example).unwrap();
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
|
||||
@@ -323,7 +331,7 @@ mod tests {
|
||||
deleted_at: None,
|
||||
};
|
||||
|
||||
let empty_layers_parsed = serde_json::from_str::<IndexPart>(empty_layers_json).unwrap();
|
||||
let empty_layers_parsed = IndexPart::from_s3_bytes(empty_layers_json.as_bytes()).unwrap();
|
||||
|
||||
assert_eq!(empty_layers_parsed, expected);
|
||||
}
|
||||
@@ -361,7 +369,7 @@ mod tests {
|
||||
"2023-07-31T09:00:00.123000000", "%Y-%m-%dT%H:%M:%S.%f").unwrap())
|
||||
};
|
||||
|
||||
let part = serde_json::from_str::<IndexPart>(example).unwrap();
|
||||
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
|
||||
assert_eq!(part, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! Helper functions to upload files to remote storage with a RemoteStorage
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8Path;
|
||||
use fail::fail_point;
|
||||
use std::io::ErrorKind;
|
||||
@@ -9,7 +10,9 @@ use tokio::fs;
|
||||
use super::Generation;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path},
|
||||
tenant::remote_timeline_client::{
|
||||
index::IndexPart, remote_index_path, remote_initdb_archive_path, remote_path,
|
||||
},
|
||||
};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -33,8 +36,9 @@ pub(super) async fn upload_index_part<'a>(
|
||||
});
|
||||
pausable_failpoint!("before-upload-index-pausable");
|
||||
|
||||
let index_part_bytes =
|
||||
serde_json::to_vec(&index_part).context("serialize index part file into bytes")?;
|
||||
let index_part_bytes = index_part
|
||||
.to_s3_bytes()
|
||||
.context("serialize index part file into bytes")?;
|
||||
let index_part_size = index_part_bytes.len();
|
||||
let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes));
|
||||
|
||||
@@ -103,3 +107,22 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Uploads the given `initdb` data to the remote storage.
|
||||
pub(crate) async fn upload_initdb_dir(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
initdb_dir: Bytes,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing::trace!("uploading initdb dir");
|
||||
|
||||
let size = initdb_dir.len();
|
||||
let bytes = tokio::io::BufReader::new(std::io::Cursor::new(initdb_dir));
|
||||
|
||||
let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
|
||||
storage
|
||||
.upload_storage_object(bytes, size, &remote_path)
|
||||
.await
|
||||
.with_context(|| format!("upload initdb dir for '{tenant_id} / {timeline_id}'"))
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ use std::sync::Arc;
|
||||
use anyhow::{bail, Context};
|
||||
use tokio::sync::oneshot::error::RecvError;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
@@ -113,11 +114,12 @@ pub(super) async fn gather_inputs(
|
||||
max_retention_period: Option<u64>,
|
||||
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ModelInputs> {
|
||||
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
|
||||
tenant
|
||||
.refresh_gc_info(ctx)
|
||||
.refresh_gc_info(cancel, ctx)
|
||||
.await
|
||||
.context("Failed to refresh gc_info before gathering inputs")?;
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
pub mod delta_layer;
|
||||
mod filename;
|
||||
pub mod image_layer;
|
||||
mod image_layer;
|
||||
mod inmemory_layer;
|
||||
mod layer;
|
||||
mod layer_desc;
|
||||
|
||||
@@ -69,13 +69,13 @@ use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct Summary {
|
||||
/// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
|
||||
pub magic: u16,
|
||||
pub format_version: u16,
|
||||
magic: u16,
|
||||
format_version: u16,
|
||||
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub key_range: Range<Key>,
|
||||
pub lsn_range: Range<Lsn>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
/// Block number where the 'index' part of the file begins.
|
||||
pub index_start_blk: u32,
|
||||
@@ -611,61 +611,6 @@ impl Drop for DeltaLayerWriter {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteSummaryError {
|
||||
#[error("magic mismatch")]
|
||||
MagicMismatch,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for RewriteSummaryError {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
Self::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayer {
|
||||
pub async fn rewrite_summary<F>(
|
||||
path: &Utf8Path,
|
||||
rewrite: F,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), RewriteSummaryError>
|
||||
where
|
||||
F: Fn(Summary) -> Summary,
|
||||
{
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
&*std::fs::OpenOptions::new().read(true).write(true),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
let mut file = file.file;
|
||||
if actual_summary.magic != DELTA_FILE_MAGIC {
|
||||
return Err(RewriteSummaryError::MagicMismatch);
|
||||
}
|
||||
|
||||
let new_summary = rewrite(actual_summary);
|
||||
|
||||
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
|
||||
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
|
||||
if buf.spilled() {
|
||||
// The code in DeltaLayerWriterInner just warn!()s for this.
|
||||
// It should probably error out as well.
|
||||
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
|
||||
"Used more than one page size for summary buffer: {}",
|
||||
buf.len()
|
||||
)));
|
||||
}
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&buf).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
|
||||
/// - inner has the success or transient failure
|
||||
|
||||
@@ -67,20 +67,20 @@ use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
|
||||
/// the 'index' starts at the block indicated by 'index_start_blk'
|
||||
///
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct Summary {
|
||||
pub(super) struct Summary {
|
||||
/// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
|
||||
pub magic: u16,
|
||||
pub format_version: u16,
|
||||
magic: u16,
|
||||
format_version: u16,
|
||||
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub key_range: Range<Key>,
|
||||
pub lsn: Lsn,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn: Lsn,
|
||||
|
||||
/// Block number where the 'index' part of the file begins.
|
||||
pub index_start_blk: u32,
|
||||
index_start_blk: u32,
|
||||
/// Block within the 'index', where the B-tree root page is stored
|
||||
pub index_root_blk: u32,
|
||||
index_root_blk: u32,
|
||||
// the 'values' part starts after the summary header, on block 1.
|
||||
}
|
||||
|
||||
@@ -296,61 +296,6 @@ impl ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteSummaryError {
|
||||
#[error("magic mismatch")]
|
||||
MagicMismatch,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for RewriteSummaryError {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
Self::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayer {
|
||||
pub async fn rewrite_summary<F>(
|
||||
path: &Utf8Path,
|
||||
rewrite: F,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), RewriteSummaryError>
|
||||
where
|
||||
F: Fn(Summary) -> Summary,
|
||||
{
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
&*std::fs::OpenOptions::new().read(true).write(true),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
let mut file = file.file;
|
||||
if actual_summary.magic != IMAGE_FILE_MAGIC {
|
||||
return Err(RewriteSummaryError::MagicMismatch);
|
||||
}
|
||||
|
||||
let new_summary = rewrite(actual_summary);
|
||||
|
||||
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
|
||||
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
|
||||
if buf.spilled() {
|
||||
// The code in ImageLayerWriterInner just warn!()s for this.
|
||||
// It should probably error out as well.
|
||||
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
|
||||
"Used more than one page size for summary buffer: {}",
|
||||
buf.len()
|
||||
)));
|
||||
}
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&buf).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
|
||||
/// - inner has the success or transient failure
|
||||
|
||||
@@ -3,7 +3,6 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::models::{
|
||||
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use remote_storage::RemotePath;
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
@@ -227,7 +226,6 @@ impl Layer {
|
||||
///
|
||||
/// It is up to the caller to collect more data from the previous layer and
|
||||
/// perform WAL redo, if necessary.
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
pub(crate) async fn get_value_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
@@ -307,12 +305,6 @@ impl Layer {
|
||||
&self.0.path
|
||||
}
|
||||
|
||||
/// This can return None even though it should return Some in some edge cases.
|
||||
#[allow(unused)]
|
||||
pub(crate) fn remote_path(&self) -> Option<RemotePath> {
|
||||
self.0.remote_path()
|
||||
}
|
||||
|
||||
pub(crate) fn metadata(&self) -> LayerFileMetadata {
|
||||
self.0.metadata()
|
||||
}
|
||||
@@ -926,17 +918,6 @@ impl LayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
/// This can return None even though it should return Some in some edge cases.
|
||||
fn remote_path(&self) -> Option<RemotePath> {
|
||||
let tl = self.timeline.upgrade()?; // TODO: should distinguish this case, but, accuracy doesn't matter for this field.
|
||||
Some(crate::tenant::remote_timeline_client::remote_layer_path(
|
||||
&tl.tenant_id,
|
||||
&tl.timeline_id,
|
||||
&self.desc.filename(),
|
||||
self.generation,
|
||||
))
|
||||
}
|
||||
|
||||
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
|
||||
let layer_file_name = self.desc.filename().file_name();
|
||||
|
||||
@@ -956,7 +937,6 @@ impl LayerInner {
|
||||
lsn_end: lsn_range.end,
|
||||
remote,
|
||||
access_stats,
|
||||
remote_path: self.remote_path().map(|p| p.into()),
|
||||
}
|
||||
} else {
|
||||
let lsn = self.desc.image_layer_lsn();
|
||||
@@ -967,7 +947,6 @@ impl LayerInner {
|
||||
lsn_start: lsn,
|
||||
remote,
|
||||
access_stats,
|
||||
remote_path: self.remote_path().map(|p| p.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -261,7 +261,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
} else {
|
||||
// Run gc
|
||||
let res = tenant
|
||||
.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &ctx)
|
||||
.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx)
|
||||
.await;
|
||||
if let Err(e) = res {
|
||||
let wait_duration = backoff::exponential_backoff_duration_seconds(
|
||||
|
||||
@@ -10,6 +10,7 @@ mod walreceiver;
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::models::{
|
||||
@@ -437,6 +438,11 @@ pub enum LogicalSizeCalculationCause {
|
||||
TenantSizeHandler,
|
||||
}
|
||||
|
||||
#[derive(enumset::EnumSetType)]
|
||||
pub(crate) enum CompactFlags {
|
||||
ForceRepartition,
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
/// Get the LSN where this branch was created
|
||||
@@ -468,7 +474,6 @@ impl Timeline {
|
||||
/// an ancestor branch, for example, or waste a lot of cycles chasing the
|
||||
/// non-existing key.
|
||||
///
|
||||
#[instrument(skip_all, fields(%key, %lsn), level = tracing::Level::DEBUG)]
|
||||
pub async fn get(
|
||||
&self,
|
||||
key: Key,
|
||||
@@ -695,6 +700,7 @@ impl Timeline {
|
||||
pub(crate) async fn compact(
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
// this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
@@ -767,6 +773,7 @@ impl Timeline {
|
||||
.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
self.get_compaction_target_size(),
|
||||
flags,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
@@ -1712,6 +1719,30 @@ impl Timeline {
|
||||
if let Some(rtc) = self.remote_client.as_ref() {
|
||||
rtc.schedule_layer_file_deletion(&needs_cleanup)?;
|
||||
rtc.schedule_index_upload_for_file_changes()?;
|
||||
// This barrier orders above DELETEs before any later operations.
|
||||
// This is critical because code executing after the barrier might
|
||||
// create again objects with the same key that we just scheduled for deletion.
|
||||
// For example, if we just scheduled deletion of an image layer "from the future",
|
||||
// later compaction might run again and re-create the same image layer.
|
||||
// "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
|
||||
// "same" here means same key range and LSN.
|
||||
//
|
||||
// Without a barrier between above DELETEs and the re-creation's PUTs,
|
||||
// the upload queue may execute the PUT first, then the DELETE.
|
||||
// In our example, we will end up with an IndexPart referencing a non-existent object.
|
||||
//
|
||||
// 1. a future image layer is created and uploaded
|
||||
// 2. ps restart
|
||||
// 3. the future layer from (1) is deleted during load layer map
|
||||
// 4. image layer is re-created and uploaded
|
||||
// 5. deletion queue would like to delete (1) but actually deletes (4)
|
||||
// 6. delete by name works as expected, but it now deletes the wrong (later) version
|
||||
//
|
||||
// See https://github.com/neondatabase/neon/issues/5878
|
||||
//
|
||||
// NB: generation numbers naturally protect against this because they disambiguate
|
||||
// (1) and (4)
|
||||
rtc.schedule_barrier()?;
|
||||
// Tenant::create_timeline will wait for these uploads to happen before returning, or
|
||||
// on retry.
|
||||
}
|
||||
@@ -2045,7 +2076,6 @@ impl Timeline {
|
||||
///
|
||||
/// This function takes the current timeline's locked LayerMap as an argument,
|
||||
/// so callers can avoid potential race conditions.
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
async fn get_reconstruct_data(
|
||||
&self,
|
||||
key: Key,
|
||||
@@ -2080,8 +2110,7 @@ impl Timeline {
|
||||
let mut cont_lsn = Lsn(request_lsn.0 + 1);
|
||||
|
||||
'outer: loop {
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(PageReconstructError::Cancelled);
|
||||
}
|
||||
|
||||
@@ -2528,7 +2557,12 @@ impl Timeline {
|
||||
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
|
||||
// require downloading anything during initial import.
|
||||
let (partitioning, _lsn) = self
|
||||
.repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
|
||||
.repartition(
|
||||
self.initdb_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
EnumSet::empty(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
@@ -2566,6 +2600,8 @@ impl Timeline {
|
||||
)
|
||||
};
|
||||
|
||||
pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
@@ -2747,12 +2783,16 @@ impl Timeline {
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
partition_size: u64,
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(KeyPartitioning, Lsn)> {
|
||||
{
|
||||
let partitioning_guard = self.partitioning.lock().unwrap();
|
||||
let distance = lsn.0 - partitioning_guard.1 .0;
|
||||
if partitioning_guard.1 != Lsn(0) && distance <= self.repartition_threshold {
|
||||
if partitioning_guard.1 != Lsn(0)
|
||||
&& distance <= self.repartition_threshold
|
||||
&& !flags.contains(CompactFlags::ForceRepartition)
|
||||
{
|
||||
debug!(
|
||||
distance,
|
||||
threshold = self.repartition_threshold,
|
||||
@@ -3688,6 +3728,7 @@ impl Timeline {
|
||||
retain_lsns: Vec<Lsn>,
|
||||
cutoff_horizon: Lsn,
|
||||
pitr: Duration,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
// First, calculate pitr_cutoff_timestamp and then convert it to LSN.
|
||||
@@ -3701,7 +3742,10 @@ impl Timeline {
|
||||
if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
|
||||
let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
|
||||
|
||||
match self.find_lsn_for_timestamp(pitr_timestamp, ctx).await? {
|
||||
match self
|
||||
.find_lsn_for_timestamp(pitr_timestamp, cancel, ctx)
|
||||
.await?
|
||||
{
|
||||
LsnForTimestamp::Present(lsn) => lsn,
|
||||
LsnForTimestamp::Future(lsn) => {
|
||||
// The timestamp is in the future. That sounds impossible,
|
||||
|
||||
@@ -351,7 +351,7 @@ impl Timeline {
|
||||
match state.last_layer_access_imitation {
|
||||
Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
|
||||
_ => {
|
||||
self.imitate_synthetic_size_calculation_worker(&tenant, ctx, cancel)
|
||||
self.imitate_synthetic_size_calculation_worker(&tenant, cancel, ctx)
|
||||
.await;
|
||||
state.last_layer_access_imitation = Some(tokio::time::Instant::now());
|
||||
}
|
||||
@@ -417,8 +417,8 @@ impl Timeline {
|
||||
async fn imitate_synthetic_size_calculation_worker(
|
||||
&self,
|
||||
tenant: &Arc<Tenant>,
|
||||
ctx: &RequestContext,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
if self.conf.metric_collection_endpoint.is_none() {
|
||||
// We don't start the consumption metrics task if this is not set in the config.
|
||||
@@ -457,6 +457,7 @@ impl Timeline {
|
||||
None,
|
||||
&mut throwaway_cache,
|
||||
LogicalSizeCalculationCause::EvictionTaskImitation,
|
||||
cancel,
|
||||
ctx,
|
||||
)
|
||||
.instrument(info_span!("gather_inputs"));
|
||||
|
||||
@@ -45,12 +45,20 @@ impl<'t> UninitializedTimeline<'t> {
|
||||
let timeline_id = self.timeline_id;
|
||||
let tenant_id = self.owning_tenant.tenant_id;
|
||||
|
||||
let (new_timeline, uninit_mark) = self.raw_timeline.take().with_context(|| {
|
||||
format!("No timeline for initalization found for {tenant_id}/{timeline_id}")
|
||||
})?;
|
||||
if self.raw_timeline.is_none() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"No timeline for initialization found for {tenant_id}/{timeline_id}"
|
||||
));
|
||||
}
|
||||
|
||||
// Check that the caller initialized disk_consistent_lsn
|
||||
let new_disk_consistent_lsn = new_timeline.get_disk_consistent_lsn();
|
||||
let new_disk_consistent_lsn = self
|
||||
.raw_timeline
|
||||
.as_ref()
|
||||
.expect("checked above")
|
||||
.0
|
||||
.get_disk_consistent_lsn();
|
||||
|
||||
anyhow::ensure!(
|
||||
new_disk_consistent_lsn.is_valid(),
|
||||
"new timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
|
||||
@@ -62,6 +70,13 @@ impl<'t> UninitializedTimeline<'t> {
|
||||
"Found freshly initialized timeline {tenant_id}/{timeline_id} in the tenant map"
|
||||
),
|
||||
Entry::Vacant(v) => {
|
||||
// after taking here should be no fallible operations, because the drop guard will not
|
||||
// cleanup after and would block for example the tenant deletion
|
||||
let (new_timeline, uninit_mark) =
|
||||
self.raw_timeline.take().expect("already checked");
|
||||
|
||||
// this is the mutual exclusion between different retries to create the timeline;
|
||||
// this should be an assertion.
|
||||
uninit_mark.remove_uninit_mark().with_context(|| {
|
||||
format!(
|
||||
"Failed to remove uninit mark file for timeline {tenant_id}/{timeline_id}"
|
||||
@@ -70,10 +85,10 @@ impl<'t> UninitializedTimeline<'t> {
|
||||
v.insert(Arc::clone(&new_timeline));
|
||||
|
||||
new_timeline.maybe_spawn_flush_loop();
|
||||
|
||||
Ok(new_timeline)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(new_timeline)
|
||||
}
|
||||
|
||||
/// Prepares timeline data by loading it from the basebackup archive.
|
||||
|
||||
@@ -20,7 +20,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
|
||||
SHLIB_LINK = -lcurl
|
||||
|
||||
EXTENSION = neon
|
||||
DATA = neon--1.0.sql
|
||||
DATA = neon--1.0.sql neon--1.0--1.1.sql
|
||||
PGFILEDESC = "neon - cloud storage for PostgreSQL"
|
||||
|
||||
EXTRA_CLEAN = \
|
||||
|
||||
20
pgxn/neon/README.md
Normal file
20
pgxn/neon/README.md
Normal file
@@ -0,0 +1,20 @@
|
||||
neon extension consists of several parts:
|
||||
|
||||
### shared preload library `neon.so`
|
||||
|
||||
- implements storage manager API and network communications with remote page server.
|
||||
|
||||
- walproposer: implements broadcast protocol between postgres and WAL safekeepers.
|
||||
|
||||
- control plane connector: Captures updates to roles/databases using ProcessUtility_hook and sends them to the control ProcessUtility_hook.
|
||||
|
||||
- remote extension server: Request compute_ctl to download extension files.
|
||||
|
||||
- file_cache: Local file cache is used to temporary store relations pages in local file system for better performance.
|
||||
|
||||
- relsize_cache: Relation size cache for better neon performance.
|
||||
|
||||
### SQL functions in `neon--*.sql`
|
||||
|
||||
Utility functions to expose neon specific information to user and metrics collection.
|
||||
This extension is created in all databases in the cluster by default.
|
||||
@@ -32,11 +32,13 @@
|
||||
#include "storage/latch.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/dynahash.h"
|
||||
#include "utils/guc.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "pgstat.h"
|
||||
|
||||
/*
|
||||
* Local file cache is used to temporary store relations pages in local file system.
|
||||
@@ -65,6 +67,7 @@
|
||||
typedef struct FileCacheEntry
|
||||
{
|
||||
BufferTag key;
|
||||
uint32 hash;
|
||||
uint32 offset;
|
||||
uint32 access_count;
|
||||
uint32 bitmap[BLOCKS_PER_CHUNK/32];
|
||||
@@ -76,6 +79,10 @@ typedef struct FileCacheControl
|
||||
uint64 generation; /* generation is needed to handle correct hash reenabling */
|
||||
uint32 size; /* size of cache file in chunks */
|
||||
uint32 used; /* number of used chunks */
|
||||
uint32 limit; /* shared copy of lfc_size_limit */
|
||||
uint64 hits;
|
||||
uint64 misses;
|
||||
uint64 writes;
|
||||
dlist_head lru; /* double linked list for LRU replacement algorithm */
|
||||
} FileCacheControl;
|
||||
|
||||
@@ -91,10 +98,12 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
static shmem_request_hook_type prev_shmem_request_hook;
|
||||
#endif
|
||||
|
||||
void FileCacheMonitorMain(Datum main_arg);
|
||||
#define LFC_ENABLED() (lfc_ctl->limit != 0)
|
||||
|
||||
void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg);
|
||||
|
||||
/*
|
||||
* Local file cache is mandatory and Neon can work without it.
|
||||
* Local file cache is optional and Neon can work without it.
|
||||
* In case of any any errors with this cache, we should disable it but to not throw error.
|
||||
* Also we should allow re-enable it if source of failure (lack of disk space, permissions,...) is fixed.
|
||||
* All cache content should be invalidated to avoid reading of stale or corrupted data
|
||||
@@ -102,49 +111,77 @@ void FileCacheMonitorMain(Datum main_arg);
|
||||
static void
|
||||
lfc_disable(char const* op)
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
FileCacheEntry* entry;
|
||||
|
||||
int fd;
|
||||
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
|
||||
|
||||
/* Invalidate hash */
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
FileCacheEntry* entry;
|
||||
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
hash_search_with_hash_value(lfc_hash, &entry->key, entry->hash, HASH_REMOVE, NULL);
|
||||
}
|
||||
lfc_ctl->generation += 1;
|
||||
lfc_ctl->size = 0;
|
||||
lfc_ctl->used = 0;
|
||||
lfc_ctl->limit = 0;
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
|
||||
if (lfc_desc > 0)
|
||||
{
|
||||
/* If the reason of error is ENOSPC, then truncation of file may help to reclaim some space */
|
||||
int rc = ftruncate(lfc_desc, 0);
|
||||
if (rc < 0)
|
||||
elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path);
|
||||
}
|
||||
}
|
||||
/* We need to use unlink to to avoid races in LFC write, because it is not protectedby */
|
||||
unlink(lfc_path);
|
||||
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC);
|
||||
if (fd < 0)
|
||||
elog(WARNING, "Failed to recreate local file cache %s: %m", lfc_path);
|
||||
else
|
||||
close(fd);
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
if (lfc_desc > 0)
|
||||
close(lfc_desc);
|
||||
|
||||
lfc_desc = -1;
|
||||
lfc_size_limit = 0;
|
||||
}
|
||||
|
||||
/* Invalidate hash */
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
hash_search(lfc_hash, &entry->key, HASH_REMOVE, NULL);
|
||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
||||
}
|
||||
hash_seq_term(&status);
|
||||
lfc_ctl->generation += 1;
|
||||
lfc_ctl->size = 0;
|
||||
lfc_ctl->used = 0;
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
/*
|
||||
* This check is done without obtaining lfc_lock, so it is unreliable
|
||||
*/
|
||||
static bool
|
||||
lfc_maybe_disabled(void)
|
||||
{
|
||||
return !lfc_ctl || !LFC_ENABLED();
|
||||
}
|
||||
|
||||
static bool
|
||||
lfc_ensure_opened(void)
|
||||
{
|
||||
bool enabled = !lfc_maybe_disabled();
|
||||
/* Open cache file if not done yet */
|
||||
if (lfc_desc <= 0)
|
||||
if (lfc_desc <= 0 && enabled)
|
||||
{
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
|
||||
|
||||
if (lfc_desc < 0) {
|
||||
lfc_disable("open");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
return enabled;
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -163,6 +200,7 @@ lfc_shmem_startup(void)
|
||||
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
|
||||
if (!found)
|
||||
{
|
||||
int fd;
|
||||
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
|
||||
lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock");
|
||||
info.keysize = sizeof(BufferTag);
|
||||
@@ -175,10 +213,23 @@ lfc_shmem_startup(void)
|
||||
lfc_ctl->generation = 0;
|
||||
lfc_ctl->size = 0;
|
||||
lfc_ctl->used = 0;
|
||||
lfc_ctl->hits = 0;
|
||||
lfc_ctl->misses = 0;
|
||||
lfc_ctl->writes = 0;
|
||||
dlist_init(&lfc_ctl->lru);
|
||||
|
||||
/* Remove file cache on restart */
|
||||
(void)unlink(lfc_path);
|
||||
/* Recreate file cache on restart */
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC);
|
||||
if (fd < 0)
|
||||
{
|
||||
elog(WARNING, "Failed to create local file cache %s: %m", lfc_path);
|
||||
lfc_ctl->limit = 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
close(fd);
|
||||
lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit);
|
||||
}
|
||||
}
|
||||
LWLockRelease(AddinShmemInitLock);
|
||||
}
|
||||
@@ -195,6 +246,17 @@ lfc_shmem_request(void)
|
||||
RequestNamedLWLockTranche("lfc_lock", 1);
|
||||
}
|
||||
|
||||
static bool
|
||||
is_normal_backend(void)
|
||||
{
|
||||
/*
|
||||
* Stats collector detach shared memory, so we should not try to access shared memory here.
|
||||
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
|
||||
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
|
||||
*/
|
||||
return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker();
|
||||
}
|
||||
|
||||
static bool
|
||||
lfc_check_limit_hook(int *newval, void **extra, GucSource source)
|
||||
{
|
||||
@@ -210,25 +272,15 @@ static void
|
||||
lfc_change_limit_hook(int newval, void *extra)
|
||||
{
|
||||
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
|
||||
/*
|
||||
* Stats collector detach shared memory, so we should not try to access shared memory here.
|
||||
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
|
||||
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
|
||||
*/
|
||||
if (!lfc_ctl || !MyProc || !UsedShmemSegAddr || IsParallelWorker())
|
||||
|
||||
if (!is_normal_backend())
|
||||
return;
|
||||
|
||||
if (!lfc_ensure_opened())
|
||||
return;
|
||||
|
||||
/* Open cache file if not done yet */
|
||||
if (lfc_desc <= 0)
|
||||
{
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
|
||||
if (lfc_desc < 0) {
|
||||
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
|
||||
lfc_size_limit = 0; /* disable file cache */
|
||||
return;
|
||||
}
|
||||
}
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
|
||||
{
|
||||
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
|
||||
@@ -238,10 +290,12 @@ lfc_change_limit_hook(int newval, void *extra)
|
||||
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0)
|
||||
elog(LOG, "Failed to punch hole in file: %m");
|
||||
#endif
|
||||
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
|
||||
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
|
||||
lfc_ctl->used -= 1;
|
||||
}
|
||||
lfc_ctl->limit = new_size;
|
||||
elog(DEBUG1, "set local file cache limit to %d", new_size);
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
@@ -255,6 +309,7 @@ lfc_init(void)
|
||||
if (!process_shared_preload_libraries_in_progress)
|
||||
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
|
||||
|
||||
|
||||
DefineCustomIntVariable("neon.max_file_cache_size",
|
||||
"Maximal size of Neon local file cache",
|
||||
NULL,
|
||||
@@ -315,10 +370,10 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
BufferTag tag;
|
||||
FileCacheEntry* entry;
|
||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||
bool found;
|
||||
bool found = false;
|
||||
uint32 hash;
|
||||
|
||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
||||
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||
return false;
|
||||
|
||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||
@@ -327,8 +382,11 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
return found;
|
||||
}
|
||||
@@ -345,7 +403,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||
uint32 hash;
|
||||
|
||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
||||
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||
return;
|
||||
|
||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||
@@ -355,6 +413,13 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (!LFC_ENABLED())
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
|
||||
|
||||
if (!found)
|
||||
@@ -405,7 +470,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
|
||||
/*
|
||||
* Try to read page from local cache.
|
||||
* Returns true if page is found in local cache.
|
||||
* In case of error lfc_size_limit is set to zero to disable any further opera-tins with cache.
|
||||
* In case of error local file cache is disabled (lfc->limit is set to zero).
|
||||
*/
|
||||
bool
|
||||
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
@@ -420,7 +485,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
uint64 generation;
|
||||
uint32 entry_offset;
|
||||
|
||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
||||
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||
return false;
|
||||
|
||||
if (!lfc_ensure_opened())
|
||||
@@ -432,10 +497,18 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (!LFC_ENABLED())
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return false;
|
||||
}
|
||||
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
|
||||
{
|
||||
/* Page is not cached */
|
||||
lfc_ctl->misses += 1;
|
||||
LWLockRelease(lfc_lock);
|
||||
return false;
|
||||
}
|
||||
@@ -456,8 +529,11 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
/* Place entry to the head of LRU list */
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (lfc_ctl->generation == generation)
|
||||
{
|
||||
Assert(LFC_ENABLED());
|
||||
lfc_ctl->hits += 1;
|
||||
Assert(entry->access_count > 0);
|
||||
if (--entry->access_count == 0)
|
||||
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
|
||||
@@ -488,8 +564,10 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
bool found;
|
||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||
uint32 hash;
|
||||
uint64 generation;
|
||||
uint32 entry_offset;
|
||||
|
||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
||||
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||
return;
|
||||
|
||||
if (!lfc_ensure_opened())
|
||||
@@ -497,12 +575,17 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
|
||||
tag.forkNum = forkNum;
|
||||
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
|
||||
|
||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (!LFC_ENABLED())
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
|
||||
|
||||
if (found)
|
||||
@@ -521,13 +604,13 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
* there are should be very large number of concurrent IO operations and them are limited by max_connections,
|
||||
* we prefer not to complicate code and use second approach.
|
||||
*/
|
||||
if (lfc_ctl->used >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru))
|
||||
if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru))
|
||||
{
|
||||
/* Cache overflow: evict least recently used chunk */
|
||||
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
|
||||
Assert(victim->access_count == 0);
|
||||
entry->offset = victim->offset; /* grab victim's chunk */
|
||||
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
|
||||
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
|
||||
elog(DEBUG2, "Swap file cache page");
|
||||
}
|
||||
else
|
||||
@@ -536,27 +619,140 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */
|
||||
}
|
||||
entry->access_count = 1;
|
||||
entry->hash = hash;
|
||||
memset(entry->bitmap, 0, sizeof entry->bitmap);
|
||||
}
|
||||
|
||||
rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t)entry->offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ);
|
||||
generation = lfc_ctl->generation;
|
||||
entry_offset = entry->offset;
|
||||
lfc_ctl->writes += 1;
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
rc = pwrite(lfc_desc, buffer, BLCKSZ, ((off_t)entry_offset*BLOCKS_PER_CHUNK + chunk_offs)*BLCKSZ);
|
||||
if (rc != BLCKSZ)
|
||||
{
|
||||
LWLockRelease(lfc_lock);
|
||||
lfc_disable("write");
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Place entry to the head of LRU list */
|
||||
Assert(entry->access_count > 0);
|
||||
if (--entry->access_count == 0)
|
||||
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
if (lfc_ctl->generation == generation)
|
||||
{
|
||||
Assert(LFC_ENABLED());
|
||||
/* Place entry to the head of LRU list */
|
||||
Assert(entry->access_count > 0);
|
||||
if (--entry->access_count == 0)
|
||||
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
|
||||
|
||||
entry->bitmap[chunk_offs >> 5] |= (1 << (chunk_offs & 31));
|
||||
}
|
||||
|
||||
entry->bitmap[chunk_offs >> 5] |= (1 << (chunk_offs & 31));
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct
|
||||
{
|
||||
TupleDesc tupdesc;
|
||||
} NeonGetStatsCtx;
|
||||
|
||||
#define NUM_NEON_GET_STATS_COLS 2
|
||||
#define NUM_NEON_GET_STATS_ROWS 3
|
||||
|
||||
PG_FUNCTION_INFO_V1(neon_get_lfc_stats);
|
||||
Datum
|
||||
neon_get_lfc_stats(PG_FUNCTION_ARGS)
|
||||
{
|
||||
FuncCallContext *funcctx;
|
||||
NeonGetStatsCtx* fctx;
|
||||
MemoryContext oldcontext;
|
||||
TupleDesc tupledesc;
|
||||
Datum result;
|
||||
HeapTuple tuple;
|
||||
char const* key;
|
||||
uint64 value;
|
||||
Datum values[NUM_NEON_GET_STATS_COLS];
|
||||
bool nulls[NUM_NEON_GET_STATS_COLS];
|
||||
|
||||
if (SRF_IS_FIRSTCALL())
|
||||
{
|
||||
funcctx = SRF_FIRSTCALL_INIT();
|
||||
|
||||
/* Switch context when allocating stuff to be used in later calls */
|
||||
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
|
||||
|
||||
/* Create a user function context for cross-call persistence */
|
||||
fctx = (NeonGetStatsCtx*) palloc(sizeof(NeonGetStatsCtx));
|
||||
|
||||
/* Construct a tuple descriptor for the result rows. */
|
||||
tupledesc = CreateTemplateTupleDesc(NUM_NEON_GET_STATS_COLS);
|
||||
|
||||
TupleDescInitEntry(tupledesc, (AttrNumber) 1, "lfc_key",
|
||||
TEXTOID, -1, 0);
|
||||
TupleDescInitEntry(tupledesc, (AttrNumber) 2, "lfc_value",
|
||||
INT8OID, -1, 0);
|
||||
|
||||
fctx->tupdesc = BlessTupleDesc(tupledesc);
|
||||
funcctx->max_calls = NUM_NEON_GET_STATS_ROWS;
|
||||
funcctx->user_fctx = fctx;
|
||||
|
||||
/* Return to original context when allocating transient memory */
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
|
||||
funcctx = SRF_PERCALL_SETUP();
|
||||
|
||||
/* Get the saved state */
|
||||
fctx = (NeonGetStatsCtx*) funcctx->user_fctx;
|
||||
|
||||
switch (funcctx->call_cntr)
|
||||
{
|
||||
case 0:
|
||||
key = "file_cache_misses";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->misses;
|
||||
break;
|
||||
case 1:
|
||||
key = "file_cache_hits";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->hits;
|
||||
break;
|
||||
case 2:
|
||||
key = "file_cache_used";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->used;
|
||||
break;
|
||||
case 3:
|
||||
key = "file_cache_writes";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->writes;
|
||||
break;
|
||||
default:
|
||||
SRF_RETURN_DONE(funcctx);
|
||||
}
|
||||
values[0] = PointerGetDatum(cstring_to_text(key));
|
||||
nulls[0] = false;
|
||||
if (lfc_ctl)
|
||||
{
|
||||
nulls[1] = false;
|
||||
values[1] = Int64GetDatum(value);
|
||||
}
|
||||
else
|
||||
nulls[1] = true;
|
||||
|
||||
tuple = heap_form_tuple(fctx->tupdesc, values, nulls);
|
||||
result = HeapTupleGetDatum(tuple);
|
||||
SRF_RETURN_NEXT(funcctx, result);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Function returning data from the local file cache
|
||||
* relation node/tablespace/database/blocknum and access_counter
|
||||
*/
|
||||
PG_FUNCTION_INFO_V1(local_cache_pages);
|
||||
|
||||
/*
|
||||
* Record structure holding the to be exposed cache data.
|
||||
*/
|
||||
@@ -580,11 +776,6 @@ typedef struct
|
||||
LocalCachePagesRec *record;
|
||||
} LocalCachePagesContext;
|
||||
|
||||
/*
|
||||
* Function returning data from the local file cache
|
||||
* relation node/tablespace/database/blocknum and access_counter
|
||||
*/
|
||||
PG_FUNCTION_INFO_V1(local_cache_pages);
|
||||
|
||||
#define NUM_LOCALCACHE_PAGES_ELEM 7
|
||||
|
||||
@@ -651,15 +842,20 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
||||
|
||||
fctx->tupdesc = BlessTupleDesc(tupledesc);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
if (lfc_ctl)
|
||||
{
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0;
|
||||
LWLockAcquire(lfc_lock, LW_SHARED);
|
||||
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK/32; i++)
|
||||
n_pages += pg_popcount32(entry->bitmap[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
hash_seq_term(&status);
|
||||
fctx->record = (LocalCachePagesRec *)
|
||||
MemoryContextAllocHuge(CurrentMemoryContext,
|
||||
sizeof(LocalCachePagesRec) * n_pages);
|
||||
@@ -671,36 +867,35 @@ local_cache_pages(PG_FUNCTION_ARGS)
|
||||
/* Return to original context when allocating transient memory */
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
/*
|
||||
* Scan through all the buffers, saving the relevant fields in the
|
||||
* fctx->record structure.
|
||||
*
|
||||
* We don't hold the partition locks, so we don't get a consistent
|
||||
* snapshot across all buffers, but we do grab the buffer header
|
||||
* locks, so the information of each buffer is self-consistent.
|
||||
*/
|
||||
n_pages = 0;
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
if (n_pages != 0)
|
||||
{
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
/*
|
||||
* Scan through all the cache entries, saving the relevant fields in the
|
||||
* fctx->record structure.
|
||||
*/
|
||||
uint32 n = 0;
|
||||
hash_seq_init(&status, lfc_hash);
|
||||
while ((entry = hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
|
||||
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
|
||||
{
|
||||
fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
|
||||
fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n_pages].forknum = entry->key.forkNum;
|
||||
fctx->record[n_pages].blocknum = entry->key.blockNum + i;
|
||||
fctx->record[n_pages].accesscount = entry->access_count;
|
||||
n_pages += 1;
|
||||
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
|
||||
{
|
||||
fctx->record[n].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
|
||||
fctx->record[n].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
|
||||
fctx->record[n].forknum = entry->key.forkNum;
|
||||
fctx->record[n].blocknum = entry->key.blockNum + i;
|
||||
fctx->record[n].accesscount = entry->access_count;
|
||||
n += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
Assert(n_pages == n);
|
||||
}
|
||||
hash_seq_term(&status);
|
||||
Assert(n_pages == funcctx->max_calls);
|
||||
LWLockRelease(lfc_lock);
|
||||
if (lfc_ctl)
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
funcctx = SRF_PERCALL_SETUP();
|
||||
|
||||
10
pgxn/neon/neon--1.0--1.1.sql
Normal file
10
pgxn/neon/neon--1.0--1.1.sql
Normal file
@@ -0,0 +1,10 @@
|
||||
\echo Use "ALTER EXTENSION neon UPDATE TO '1.1'" to load this file. \quit
|
||||
|
||||
CREATE FUNCTION neon_get_lfc_stats()
|
||||
RETURNS SETOF RECORD
|
||||
AS 'MODULE_PATHNAME', 'neon_get_lfc_stats'
|
||||
LANGUAGE C PARALLEL SAFE;
|
||||
|
||||
-- Create a view for convenient access.
|
||||
CREATE VIEW neon_lfc_stats AS
|
||||
SELECT P.* FROM neon_get_lfc_stats() AS P (lfc_key text, lfc_value bigint);
|
||||
@@ -1,4 +1,5 @@
|
||||
# neon extension
|
||||
comment = 'cloud storage for PostgreSQL'
|
||||
default_version = '1.0'
|
||||
default_version = '1.1'
|
||||
module_pathname = '$libdir/neon'
|
||||
relocatable = true
|
||||
|
||||
@@ -168,9 +168,18 @@ async fn task_main(
|
||||
.instrument(tracing::info_span!("handle_client", ?session_id))
|
||||
);
|
||||
}
|
||||
Some(Err(e)) = connections.join_next(), if !connections.is_empty() => {
|
||||
if !e.is_panic() && !e.is_cancelled() {
|
||||
warn!("unexpected error from joined connection task: {e:?}");
|
||||
// Don't modify this unless you read https://docs.rs/tokio/latest/tokio/macro.select.html carefully.
|
||||
// If this future completes and the pattern doesn't match, this branch is disabled for this call to `select!`.
|
||||
// This only counts for this loop and it will be enabled again on next `select!`.
|
||||
//
|
||||
// Prior code had this as `Some(Err(e))` which _looks_ equivalent to the current setup, but it's not.
|
||||
// When `connections.join_next()` returned `Some(Ok(()))` (which we expect), it would disable the join_next and it would
|
||||
// not get called again, even if there are more connections to remove.
|
||||
Some(res) = connections.join_next() => {
|
||||
if let Err(e) = res {
|
||||
if !e.is_panic() && !e.is_cancelled() {
|
||||
warn!("unexpected error from joined connection task: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = cancellation_token.cancelled() => {
|
||||
|
||||
@@ -294,9 +294,18 @@ pub async fn task_main(
|
||||
}),
|
||||
);
|
||||
}
|
||||
Some(Err(e)) = connections.join_next(), if !connections.is_empty() => {
|
||||
if !e.is_panic() && !e.is_cancelled() {
|
||||
warn!("unexpected error from joined connection task: {e:?}");
|
||||
// Don't modify this unless you read https://docs.rs/tokio/latest/tokio/macro.select.html carefully.
|
||||
// If this future completes and the pattern doesn't match, this branch is disabled for this call to `select!`.
|
||||
// This only counts for this loop and it will be enabled again on next `select!`.
|
||||
//
|
||||
// Prior code had this as `Some(Err(e))` which _looks_ equivalent to the current setup, but it's not.
|
||||
// When `connections.join_next()` returned `Some(Ok(()))` (which we expect), it would disable the join_next and it would
|
||||
// not get called again, even if there are more connections to remove.
|
||||
Some(res) = connections.join_next() => {
|
||||
if let Err(e) = res {
|
||||
if !e.is_panic() && !e.is_cancelled() {
|
||||
warn!("unexpected error from joined connection task: {e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = cancellation_token.cancelled() => {
|
||||
|
||||
@@ -199,7 +199,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
// 1. init logging
|
||||
// 2. tracing panic hook
|
||||
// 3. sentry
|
||||
let _guard = logging::init(
|
||||
logging::init(
|
||||
LogFormat::from_config(&args.log_format)?,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
|
||||
@@ -1,41 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
if [ "$(cat /sys/class/block/nvme1n1/device/model)" != "Amazon EC2 NVMe Instance Storage " ]; then
|
||||
echo "nvme1n1 is not Amazon EC2 NVMe Instance Storage: '$(cat /sys/class/block/nvme1n1/device/model)'"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
rmdir bench_repo_dir || true
|
||||
|
||||
sudo mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 /dev/nvme1n1
|
||||
|
||||
sudo mount /dev/nvme1n1 /mnt
|
||||
sudo chown -R "$(id -u)":"$(id -g)" /mnt
|
||||
|
||||
mkdir /mnt/bench_repo_dir
|
||||
mkdir bench_repo_dir
|
||||
sudo mount --bind /mnt/bench_repo_dir bench_repo_dir
|
||||
|
||||
mkdir /mnt/test_output
|
||||
|
||||
mkdir /mnt/many_tenants
|
||||
|
||||
echo run the following commands
|
||||
|
||||
cat <<EOF
|
||||
# test suite run
|
||||
export TEST_OUTPUT="/mnt/test_output"
|
||||
DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/test_pageserver.py
|
||||
|
||||
# for interactive use
|
||||
export NEON_REPO_DIR="$(readlink -f ./bench_repo_dir)/repo"
|
||||
cargo build_testing --release
|
||||
./target/release/neon_local init
|
||||
# ... create tenant, seed it using pgbench
|
||||
# then duplicate the tenant using
|
||||
# poetry run python3 ./test_runner/duplicate_tenant.py TENANT_ID 200 8
|
||||
EOF
|
||||
|
||||
|
||||
@@ -431,7 +431,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// 1. init logging
|
||||
// 2. tracing panic hook
|
||||
// 3. sentry
|
||||
let _guard = logging::init(
|
||||
logging::init(
|
||||
LogFormat::from_config(&args.log_format)?,
|
||||
logging::TracingErrorLayerEnablement::Disabled,
|
||||
logging::Output::Stdout,
|
||||
|
||||
@@ -1,69 +0,0 @@
|
||||
# Usage from top of repo:
|
||||
# poetry run python3 ./test_runner/duplicate_tenant.py c66e2e233057f7f05563caff664ecb14 .neon/remote_storage_local_fs
|
||||
import argparse
|
||||
import shutil
|
||||
import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.types import TenantId
|
||||
|
||||
parser = argparse.ArgumentParser(description="Duplicate tenant script.")
|
||||
parser.add_argument("initial_tenant", type=str, help="Initial tenant")
|
||||
parser.add_argument("remote_storage_local_fs_root", type=Path, help="Remote storage local fs root")
|
||||
parser.add_argument("--ncopies", type=int, help="Number of copies")
|
||||
parser.add_argument("--numthreads", type=int, default=1, help="Number of threads")
|
||||
parser.add_argument("--port", type=int, default=9898, help="Pageserver management api port")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
initial_tenant = args.initial_tenant
|
||||
remote_storage_local_fs_root: Path = args.remote_storage_local_fs_root
|
||||
ncopies = args.ncopies
|
||||
numthreads = args.numthreads
|
||||
|
||||
new_tenant = TenantId.generate()
|
||||
print(f"New tenant: {new_tenant}")
|
||||
|
||||
client = PageserverHttpClient(args.port, lambda: None)
|
||||
|
||||
src_tenant_gen = int(client.tenant_status(initial_tenant)["generation"])
|
||||
|
||||
assert remote_storage_local_fs_root.is_dir(), f"{remote_storage_local_fs_root} is not a directory"
|
||||
|
||||
src_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / initial_tenant / "timelines"
|
||||
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
|
||||
|
||||
dst_timelines_dir: Path = remote_storage_local_fs_root / "tenants" / str(new_tenant) / "timelines"
|
||||
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
|
||||
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
|
||||
|
||||
for tl in src_timelines_dir.iterdir():
|
||||
src_tl_dir = src_timelines_dir / tl.name
|
||||
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
|
||||
dst_tl_dir = dst_timelines_dir / tl.name
|
||||
dst_tl_dir.mkdir(parents=False, exist_ok=False)
|
||||
for file in tl.iterdir():
|
||||
shutil.copy2(file, dst_tl_dir)
|
||||
if "__" in file.name:
|
||||
cmd = [
|
||||
"./target/debug/pagectl", # TODO: abstract this like the other binaries
|
||||
"layer",
|
||||
"rewrite-summary",
|
||||
str(dst_tl_dir / file.name),
|
||||
"--new-tenant-id",
|
||||
str(new_tenant),
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
|
||||
client.tenant_attach(new_tenant, generation=src_tenant_gen)
|
||||
|
||||
while True:
|
||||
status = client.tenant_status(new_tenant)
|
||||
if status["state"]["slug"] == "Active":
|
||||
break
|
||||
print("Waiting for tenant to be active..., is: " + status["state"]["slug"])
|
||||
time.sleep(1)
|
||||
|
||||
print("Tenant is active: " + str(new_tenant))
|
||||
@@ -41,7 +41,12 @@ from urllib3.util.retry import Retry
|
||||
|
||||
from fixtures.broker import NeonBroker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.allowed_errors import (
|
||||
DEFAULT_PAGESERVER_ALLOWED_ERRORS,
|
||||
scan_pageserver_log_for_errors,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.types import IndexPartDump
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
@@ -702,6 +707,7 @@ class NeonEnv:
|
||||
self.port_distributor = config.port_distributor
|
||||
self.s3_mock_server = config.mock_s3_server
|
||||
self.neon_cli = NeonCli(env=self)
|
||||
self.pagectl = Pagectl(env=self)
|
||||
self.endpoints = EndpointFactory(self)
|
||||
self.safekeepers: List[Safekeeper] = []
|
||||
self.pageservers: List[NeonPageserver] = []
|
||||
@@ -724,10 +730,13 @@ class NeonEnv:
|
||||
self.initial_tenant = config.initial_tenant
|
||||
self.initial_timeline = config.initial_timeline
|
||||
|
||||
self.control_plane_api: Optional[str] = None
|
||||
self.attachment_service: Optional[NeonAttachmentService] = None
|
||||
if config.enable_generations:
|
||||
self.enable_generations()
|
||||
attachment_service_port = self.port_distributor.get_port()
|
||||
self.control_plane_api: Optional[str] = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service: Optional[NeonAttachmentService] = NeonAttachmentService(self)
|
||||
else:
|
||||
self.control_plane_api = None
|
||||
self.attachment_service = None
|
||||
|
||||
# Create a config file corresponding to the options
|
||||
toml = textwrap.dedent(
|
||||
@@ -816,18 +825,6 @@ class NeonEnv:
|
||||
log.info(f"Config: {toml}")
|
||||
self.neon_cli.init(toml)
|
||||
|
||||
def enable_generations(self, start=False):
|
||||
if not start:
|
||||
# TODO: assert that we haven't `self.start()`ed yet
|
||||
pass
|
||||
assert self.control_plane_api is None
|
||||
assert self.attachment_service is None
|
||||
attachment_service_port = self.port_distributor.get_port()
|
||||
self.control_plane_api = f"http://127.0.0.1:{attachment_service_port}"
|
||||
self.attachment_service = NeonAttachmentService(self)
|
||||
if start:
|
||||
self.attachment_service.start()
|
||||
|
||||
def start(self):
|
||||
# Start up broker, pageserver and all safekeepers
|
||||
self.broker.try_start()
|
||||
@@ -1231,6 +1228,7 @@ class NeonCli(AbstractNeonCli):
|
||||
self,
|
||||
new_branch_name: str,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
) -> TimelineId:
|
||||
cmd = [
|
||||
"timeline",
|
||||
@@ -1243,6 +1241,9 @@ class NeonCli(AbstractNeonCli):
|
||||
self.env.pg_version,
|
||||
]
|
||||
|
||||
if timeline_id is not None:
|
||||
cmd.extend(["--timeline-id", str(timeline_id)])
|
||||
|
||||
res = self.raw_cli(cmd)
|
||||
res.check_returncode()
|
||||
|
||||
@@ -1567,15 +1568,18 @@ class ComputeCtl(AbstractNeonCli):
|
||||
COMMAND = "compute_ctl"
|
||||
|
||||
|
||||
# class GetpageBenchLibpq(AbstractNeonCli):
|
||||
# """
|
||||
# A typed wrapper around the `getpage_bench_libpq` CLI.
|
||||
# """
|
||||
#
|
||||
# COMMAND = "getpage_bench_libpq"
|
||||
#
|
||||
# def run(self):
|
||||
# pass
|
||||
class Pagectl(AbstractNeonCli):
|
||||
"""
|
||||
A typed wrapper around the `pagectl` utility CLI tool.
|
||||
"""
|
||||
|
||||
COMMAND = "pagectl"
|
||||
|
||||
def dump_index_part(self, path: Path) -> IndexPartDump:
|
||||
res = self.raw_cli(["index-part", "dump", str(path)])
|
||||
res.check_returncode()
|
||||
parsed = json.loads(res.stdout)
|
||||
return IndexPartDump.from_json(parsed)
|
||||
|
||||
|
||||
class NeonAttachmentService:
|
||||
@@ -1642,57 +1646,7 @@ class NeonPageserver(PgProtocol):
|
||||
# env.pageserver.allowed_errors.append(".*could not open garage door.*")
|
||||
#
|
||||
# The entries in the list are regular experessions.
|
||||
self.allowed_errors = [
|
||||
# All tests print these, when starting up or shutting down
|
||||
".*wal receiver task finished with an error: walreceiver connection handling failure.*",
|
||||
".*Shutdown task error: walreceiver connection handling failure.*",
|
||||
".*wal_connection_manager.*tcp connect error: Connection refused.*",
|
||||
".*query handler for .* failed: Socket IO error: Connection reset by peer.*",
|
||||
".*serving compute connection task.*exited with error: Postgres connection error.*",
|
||||
".*serving compute connection task.*exited with error: Connection reset by peer.*",
|
||||
".*serving compute connection task.*exited with error: Postgres query error.*",
|
||||
".*Connection aborted: error communicating with the server: Transport endpoint is not connected.*",
|
||||
# FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected
|
||||
".*Connection aborted: unexpected message from server*",
|
||||
".*kill_and_wait_impl.*: wait successful.*",
|
||||
".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down
|
||||
".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down
|
||||
# safekeeper connection can fail with this, in the window between timeline creation
|
||||
# and streaming start
|
||||
".*Failed to process query for timeline .*: state uninitialized, no data to read.*",
|
||||
# Tests related to authentication and authorization print these
|
||||
".*Error processing HTTP request: Forbidden",
|
||||
# intentional failpoints
|
||||
".*failpoint ",
|
||||
# FIXME: These need investigation
|
||||
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
|
||||
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
|
||||
".*Removing intermediate uninit mark file.*",
|
||||
# Tenant::delete_timeline() can cause any of the four following errors.
|
||||
# FIXME: we shouldn't be considering it an error: https://github.com/neondatabase/neon/issues/2946
|
||||
".*could not flush frozen layer.*queue is in state Stopped", # when schedule layer upload fails because queued got closed before compaction got killed
|
||||
".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped
|
||||
".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs
|
||||
".*gc_loop.*Gc failed, retrying in.*: Cannot run GC iteration on inactive tenant", # Tenant::gc precondition
|
||||
".*compaction_loop.*Compaction failed.*, retrying in.*timeline or pageserver is shutting down", # When compaction checks timeline state after acquiring layer_removal_cs
|
||||
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
|
||||
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
|
||||
".*task iteration took longer than the configured period.*",
|
||||
# this is until #3501
|
||||
".*Compaction failed.*, retrying in [^:]+: Cannot run compaction iteration on inactive tenant",
|
||||
# these can happen anytime we do compactions from background task and shutdown pageserver
|
||||
r".*ERROR.*ancestor timeline \S+ is being stopped",
|
||||
# this is expected given our collaborative shutdown approach for the UploadQueue
|
||||
".*Compaction failed.*, retrying in .*: Other\\(queue is in state Stopped.*",
|
||||
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
|
||||
".*Error processing HTTP request: NotFound: Timeline .* was not found",
|
||||
".*took more than expected to complete.*",
|
||||
# these can happen during shutdown, but it should not be a reason to fail a test
|
||||
".*completed, took longer than expected.*",
|
||||
# AWS S3 may emit 500 errors for keys in a DeleteObjects response: we retry these
|
||||
# and it is not a failure of our code when it happens.
|
||||
".*DeleteObjects.*We encountered an internal error. Please try again.*",
|
||||
]
|
||||
self.allowed_errors: List[str] = list(DEFAULT_PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
def timeline_dir(self, tenant_id: TenantId, timeline_id: Optional[TimelineId] = None) -> Path:
|
||||
"""Get a timeline directory's path based on the repo directory of the test environment"""
|
||||
@@ -1802,27 +1756,9 @@ class NeonPageserver(PgProtocol):
|
||||
|
||||
def assert_no_errors(self):
|
||||
logfile = open(os.path.join(self.workdir, "pageserver.log"), "r")
|
||||
error_or_warn = re.compile(r"\s(ERROR|WARN)")
|
||||
errors = []
|
||||
while True:
|
||||
line = logfile.readline()
|
||||
if not line:
|
||||
break
|
||||
errors = scan_pageserver_log_for_errors(logfile, self.allowed_errors)
|
||||
|
||||
if error_or_warn.search(line):
|
||||
# Is this a torn log line? This happens when force-killing a process and restarting
|
||||
# Example: "2023-10-25T09:38:31.752314Z WARN deletion executo2023-10-25T09:38:31.875947Z INFO version: git-env:0f9452f76e8ccdfc88291bccb3f53e3016f40192"
|
||||
if re.match("\\d{4}-\\d{2}-\\d{2}T.+\\d{4}-\\d{2}-\\d{2}T.+INFO version.+", line):
|
||||
continue
|
||||
|
||||
# It's an ERROR or WARN. Is it in the allow-list?
|
||||
for a in self.allowed_errors:
|
||||
if re.match(a, line):
|
||||
break
|
||||
else:
|
||||
errors.append(line)
|
||||
|
||||
for error in errors:
|
||||
for _lineno, error in errors:
|
||||
log.info(f"not allowed error: {error.strip()}")
|
||||
|
||||
assert not errors
|
||||
|
||||
116
test_runner/fixtures/pageserver/allowed_errors.py
Executable file
116
test_runner/fixtures/pageserver/allowed_errors.py
Executable file
@@ -0,0 +1,116 @@
|
||||
#! /usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import re
|
||||
import sys
|
||||
from typing import Iterable, List, Tuple
|
||||
|
||||
|
||||
def scan_pageserver_log_for_errors(
|
||||
input: Iterable[str], allowed_errors: List[str]
|
||||
) -> List[Tuple[int, str]]:
|
||||
error_or_warn = re.compile(r"\s(ERROR|WARN)")
|
||||
errors = []
|
||||
for lineno, line in enumerate(input, start=1):
|
||||
if len(line) == 0:
|
||||
continue
|
||||
|
||||
if error_or_warn.search(line):
|
||||
# Is this a torn log line? This happens when force-killing a process and restarting
|
||||
# Example: "2023-10-25T09:38:31.752314Z WARN deletion executo2023-10-25T09:38:31.875947Z INFO version: git-env:0f9452f76e8ccdfc88291bccb3f53e3016f40192"
|
||||
if re.match("\\d{4}-\\d{2}-\\d{2}T.+\\d{4}-\\d{2}-\\d{2}T.+INFO version.+", line):
|
||||
continue
|
||||
|
||||
# It's an ERROR or WARN. Is it in the allow-list?
|
||||
for a in allowed_errors:
|
||||
if re.match(a, line):
|
||||
break
|
||||
else:
|
||||
errors.append((lineno, line))
|
||||
return errors
|
||||
|
||||
|
||||
DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
|
||||
# All tests print these, when starting up or shutting down
|
||||
".*wal receiver task finished with an error: walreceiver connection handling failure.*",
|
||||
".*Shutdown task error: walreceiver connection handling failure.*",
|
||||
".*wal_connection_manager.*tcp connect error: Connection refused.*",
|
||||
".*query handler for .* failed: Socket IO error: Connection reset by peer.*",
|
||||
".*serving compute connection task.*exited with error: Postgres connection error.*",
|
||||
".*serving compute connection task.*exited with error: Connection reset by peer.*",
|
||||
".*serving compute connection task.*exited with error: Postgres query error.*",
|
||||
".*Connection aborted: error communicating with the server: Transport endpoint is not connected.*",
|
||||
# FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected
|
||||
".*Connection aborted: unexpected message from server*",
|
||||
".*kill_and_wait_impl.*: wait successful.*",
|
||||
".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down
|
||||
".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down
|
||||
# safekeeper connection can fail with this, in the window between timeline creation
|
||||
# and streaming start
|
||||
".*Failed to process query for timeline .*: state uninitialized, no data to read.*",
|
||||
# Tests related to authentication and authorization print these
|
||||
".*Error processing HTTP request: Forbidden",
|
||||
# intentional failpoints
|
||||
".*failpoint ",
|
||||
# FIXME: These need investigation
|
||||
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
|
||||
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
|
||||
".*Removing intermediate uninit mark file.*",
|
||||
# Tenant::delete_timeline() can cause any of the four following errors.
|
||||
# FIXME: we shouldn't be considering it an error: https://github.com/neondatabase/neon/issues/2946
|
||||
".*could not flush frozen layer.*queue is in state Stopped", # when schedule layer upload fails because queued got closed before compaction got killed
|
||||
".*wait for layer upload ops to complete.*", # .*Caused by:.*wait_completion aborted because upload queue was stopped
|
||||
".*gc_loop.*Gc failed, retrying in.*timeline is Stopping", # When gc checks timeline state after acquiring layer_removal_cs
|
||||
".*gc_loop.*Gc failed, retrying in.*: Cannot run GC iteration on inactive tenant", # Tenant::gc precondition
|
||||
".*compaction_loop.*Compaction failed.*, retrying in.*timeline or pageserver is shutting down", # When compaction checks timeline state after acquiring layer_removal_cs
|
||||
".*query handler for 'pagestream.*failed: Timeline .* was not found", # postgres reconnects while timeline_delete doesn't hold the tenant's timelines.lock()
|
||||
".*query handler for 'pagestream.*failed: Timeline .* is not active", # timeline delete in progress
|
||||
".*task iteration took longer than the configured period.*",
|
||||
# these can happen anytime we do compactions from background task and shutdown pageserver
|
||||
r".*ERROR.*ancestor timeline \S+ is being stopped",
|
||||
# this is expected given our collaborative shutdown approach for the UploadQueue
|
||||
".*Compaction failed.*, retrying in .*: Other\\(queue is in state Stopped.*",
|
||||
".*Compaction failed.*, retrying in .*: ShuttingDown",
|
||||
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
|
||||
".*Error processing HTTP request: NotFound: Timeline .* was not found",
|
||||
".*took more than expected to complete.*",
|
||||
# these can happen during shutdown, but it should not be a reason to fail a test
|
||||
".*completed, took longer than expected.*",
|
||||
# AWS S3 may emit 500 errors for keys in a DeleteObjects response: we retry these
|
||||
# and it is not a failure of our code when it happens.
|
||||
".*DeleteObjects.*We encountered an internal error. Please try again.*",
|
||||
)
|
||||
|
||||
|
||||
def _check_allowed_errors(input):
|
||||
allowed_errors: List[str] = list(DEFAULT_PAGESERVER_ALLOWED_ERRORS)
|
||||
|
||||
# add any test specifics here; cli parsing is not provided for the
|
||||
# difficulty of copypasting regexes as arguments without any quoting
|
||||
# errors.
|
||||
|
||||
errors = scan_pageserver_log_for_errors(input, allowed_errors)
|
||||
|
||||
for lineno, error in errors:
|
||||
print(f"-:{lineno}: {error.strip()}", file=sys.stderr)
|
||||
|
||||
print(f"\n{len(errors)} not allowed errors", file=sys.stderr)
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(
|
||||
description="check input against pageserver global allowed_errors"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-i",
|
||||
"--input",
|
||||
type=argparse.FileType("r"),
|
||||
default=sys.stdin,
|
||||
help="Pageserver logs file. Reads from stdin if no file is provided.",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
errors = _check_allowed_errors(args.input)
|
||||
|
||||
sys.exit(len(errors) > 0)
|
||||
@@ -58,7 +58,6 @@ class HistoricLayerInfo:
|
||||
lsn_start: str
|
||||
lsn_end: Optional[str]
|
||||
remote: bool
|
||||
remote_path: Optional[str] = None
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo:
|
||||
@@ -69,7 +68,6 @@ class HistoricLayerInfo:
|
||||
lsn_start=d["lsn_start"],
|
||||
lsn_end=d.get("lsn_end"),
|
||||
remote=d["remote"],
|
||||
remote_path=d.get("remote_path"),
|
||||
)
|
||||
|
||||
|
||||
@@ -434,12 +432,18 @@ class PageserverHttpClient(requests.Session):
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def timeline_compact(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
def timeline_compact(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, force_repartition=False
|
||||
):
|
||||
self.is_testing_enabled_or_skip()
|
||||
query = {}
|
||||
if force_repartition:
|
||||
query["force_repartition"] = "true"
|
||||
|
||||
log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}")
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact"
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact",
|
||||
params=query,
|
||||
)
|
||||
log.info(f"Got compact request response code: {res.status_code}")
|
||||
self.verbose_error(res)
|
||||
@@ -468,12 +472,18 @@ class PageserverHttpClient(requests.Session):
|
||||
res_json = res.json()
|
||||
return res_json
|
||||
|
||||
def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
def timeline_checkpoint(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, force_repartition=False
|
||||
):
|
||||
self.is_testing_enabled_or_skip()
|
||||
query = {}
|
||||
if force_repartition:
|
||||
query["force_repartition"] = "true"
|
||||
|
||||
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
|
||||
res = self.put(
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint"
|
||||
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
|
||||
params=query,
|
||||
)
|
||||
log.info(f"Got checkpoint request response code: {res.status_code}")
|
||||
self.verbose_error(res)
|
||||
|
||||
146
test_runner/fixtures/pageserver/types.py
Normal file
146
test_runner/fixtures/pageserver/types.py
Normal file
@@ -0,0 +1,146 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, Tuple, Union
|
||||
|
||||
from fixtures.types import KEY_MAX, KEY_MIN, Key, Lsn
|
||||
|
||||
|
||||
@dataclass
|
||||
class IndexLayerMetadata:
|
||||
@classmethod
|
||||
def from_json(cls, d: Dict[str, Any]):
|
||||
return {}
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ImageLayerFileName:
|
||||
lsn: Lsn
|
||||
key_start: Key
|
||||
key_end: Key
|
||||
|
||||
def to_str(self):
|
||||
ret = (
|
||||
f"{self.key_start.as_int():036X}-{self.key_end.as_int():036X}__{self.lsn.as_int():016X}"
|
||||
)
|
||||
assert self == parse_layer_file_name(ret)
|
||||
return ret
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DeltaLayerFileName:
|
||||
lsn_start: Lsn
|
||||
lsn_end: Lsn
|
||||
key_start: Key
|
||||
key_end: Key
|
||||
|
||||
def is_l0(self):
|
||||
return self.key_start == KEY_MIN and self.key_end == KEY_MAX
|
||||
|
||||
def to_str(self):
|
||||
ret = f"{self.key_start.as_int():036X}-{self.key_end.as_int():036X}__{self.lsn_start.as_int():016X}-{self.lsn_end.as_int():016X}"
|
||||
assert self == parse_layer_file_name(ret)
|
||||
return ret
|
||||
|
||||
|
||||
LayerFileName = Union[ImageLayerFileName, DeltaLayerFileName]
|
||||
|
||||
|
||||
class InvalidFileName(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def parse_image_layer(f_name: str) -> Tuple[int, int, int]:
|
||||
"""Parse an image layer file name. Return key start, key end, and snapshot lsn"""
|
||||
parts = f_name.split("__")
|
||||
if len(parts) != 2:
|
||||
raise InvalidFileName(f"expecting two parts separated by '__', got: {parts}")
|
||||
key_parts = parts[0].split("-")
|
||||
if len(key_parts) != 2:
|
||||
raise InvalidFileName(
|
||||
f"expecting two key parts separated by '--' in parts[0], got: {key_parts}"
|
||||
)
|
||||
try:
|
||||
return int(key_parts[0], 16), int(key_parts[1], 16), int(parts[1], 16)
|
||||
except ValueError as e:
|
||||
raise InvalidFileName(f"conversion error: {f_name}") from e
|
||||
|
||||
|
||||
def parse_delta_layer(f_name: str) -> Tuple[int, int, int, int]:
|
||||
"""Parse a delta layer file name. Return key start, key end, lsn start, and lsn end"""
|
||||
parts = f_name.split("__")
|
||||
if len(parts) != 2:
|
||||
raise InvalidFileName(f"expecting two parts separated by '__', got: {parts}")
|
||||
key_parts = parts[0].split("-")
|
||||
if len(key_parts) != 2:
|
||||
raise InvalidFileName(
|
||||
f"expecting two key parts separated by '--' in parts[0], got: {key_parts}"
|
||||
)
|
||||
lsn_parts = parts[1].split("-")
|
||||
if len(lsn_parts) != 2:
|
||||
raise InvalidFileName(
|
||||
f"expecting two lsn parts separated by '--' in parts[1], got: {lsn_parts}"
|
||||
)
|
||||
try:
|
||||
return (
|
||||
int(key_parts[0], 16),
|
||||
int(key_parts[1], 16),
|
||||
int(lsn_parts[0], 16),
|
||||
int(lsn_parts[1], 16),
|
||||
)
|
||||
except ValueError as e:
|
||||
raise InvalidFileName(f"conversion error: {f_name}") from e
|
||||
|
||||
|
||||
def parse_layer_file_name(file_name: str) -> LayerFileName:
|
||||
try:
|
||||
key_start, key_end, lsn = parse_image_layer(file_name)
|
||||
return ImageLayerFileName(lsn=Lsn(lsn), key_start=Key(key_start), key_end=Key(key_end))
|
||||
except InvalidFileName:
|
||||
pass
|
||||
|
||||
try:
|
||||
key_start, key_end, lsn_start, lsn_end = parse_delta_layer(file_name)
|
||||
return DeltaLayerFileName(
|
||||
lsn_start=Lsn(lsn_start),
|
||||
lsn_end=Lsn(lsn_end),
|
||||
key_start=Key(key_start),
|
||||
key_end=Key(key_end),
|
||||
)
|
||||
except InvalidFileName:
|
||||
pass
|
||||
|
||||
raise ValueError()
|
||||
|
||||
|
||||
def is_future_layer(layer_file_name: LayerFileName, disk_consistent_lsn: Lsn):
|
||||
"""
|
||||
Determines if this layer file is considered to be in future meaning we will discard these
|
||||
layers during timeline initialization from the given disk_consistent_lsn.
|
||||
"""
|
||||
if (
|
||||
isinstance(layer_file_name, ImageLayerFileName)
|
||||
and layer_file_name.lsn > disk_consistent_lsn
|
||||
):
|
||||
return True
|
||||
elif (
|
||||
isinstance(layer_file_name, DeltaLayerFileName)
|
||||
and layer_file_name.lsn_end > disk_consistent_lsn + 1
|
||||
):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
@dataclass
|
||||
class IndexPartDump:
|
||||
layer_metadata: Dict[LayerFileName, IndexLayerMetadata]
|
||||
disk_consistent_lsn: Lsn
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, d: Dict[str, Any]) -> "IndexPartDump":
|
||||
return IndexPartDump(
|
||||
layer_metadata={
|
||||
parse_layer_file_name(n): IndexLayerMetadata.from_json(v)
|
||||
for n, v in d["layer_metadata"].items()
|
||||
},
|
||||
disk_consistent_lsn=Lsn(d["disk_consistent_lsn"]),
|
||||
)
|
||||
@@ -12,6 +12,7 @@ import boto3
|
||||
from mypy_boto3_s3 import S3Client
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.types import LayerFileName
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
|
||||
TIMELINE_INDEX_PART_FILE_NAME = "index_part.json"
|
||||
@@ -87,6 +88,11 @@ class LocalFsStorage:
|
||||
def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
||||
return self.tenant_path(tenant_id) / "timelines" / str(timeline_id)
|
||||
|
||||
def layer_path(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, layer_file_name: LayerFileName
|
||||
):
|
||||
return self.timeline_path(tenant_id, timeline_id) / layer_file_name.to_str()
|
||||
|
||||
def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
|
||||
return self.timeline_path(tenant_id, timeline_id) / TIMELINE_INDEX_PART_FILE_NAME
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import random
|
||||
from dataclasses import dataclass
|
||||
from functools import total_ordering
|
||||
from typing import Any, Type, TypeVar, Union
|
||||
|
||||
@@ -36,6 +37,11 @@ class Lsn:
|
||||
return NotImplemented
|
||||
return self.lsn_int < other.lsn_int
|
||||
|
||||
def __gt__(self, other: Any) -> bool:
|
||||
if not isinstance(other, Lsn):
|
||||
raise NotImplementedError
|
||||
return self.lsn_int > other.lsn_int
|
||||
|
||||
def __eq__(self, other: Any) -> bool:
|
||||
if not isinstance(other, Lsn):
|
||||
return NotImplemented
|
||||
@@ -47,9 +53,32 @@ class Lsn:
|
||||
return NotImplemented
|
||||
return self.lsn_int - other.lsn_int
|
||||
|
||||
def __add__(self, other: Union[int, "Lsn"]) -> "Lsn":
|
||||
if isinstance(other, int):
|
||||
return Lsn(self.lsn_int + other)
|
||||
elif isinstance(other, Lsn):
|
||||
return Lsn(self.lsn_int + other.lsn_int)
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
def __hash__(self) -> int:
|
||||
return hash(self.lsn_int)
|
||||
|
||||
def as_int(self) -> int:
|
||||
return self.lsn_int
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Key:
|
||||
key_int: int
|
||||
|
||||
def as_int(self) -> int:
|
||||
return self.key_int
|
||||
|
||||
|
||||
KEY_MAX = Key(0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF)
|
||||
KEY_MIN = Key(0)
|
||||
|
||||
|
||||
@total_ordering
|
||||
class Id:
|
||||
|
||||
@@ -6,7 +6,16 @@ import subprocess
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, TypeVar
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Callable,
|
||||
Dict,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
TypeVar,
|
||||
)
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import allure
|
||||
@@ -14,6 +23,10 @@ import zstandard
|
||||
from psycopg2.extensions import cursor
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.pageserver.types import (
|
||||
parse_delta_layer,
|
||||
parse_image_layer,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import PgBin
|
||||
@@ -193,26 +206,6 @@ def get_timeline_dir_size(path: Path) -> int:
|
||||
return sz
|
||||
|
||||
|
||||
def parse_image_layer(f_name: str) -> Tuple[int, int, int]:
|
||||
"""Parse an image layer file name. Return key start, key end, and snapshot lsn"""
|
||||
parts = f_name.split("__")
|
||||
key_parts = parts[0].split("-")
|
||||
return int(key_parts[0], 16), int(key_parts[1], 16), int(parts[1], 16)
|
||||
|
||||
|
||||
def parse_delta_layer(f_name: str) -> Tuple[int, int, int, int]:
|
||||
"""Parse a delta layer file name. Return key start, key end, lsn start, and lsn end"""
|
||||
parts = f_name.split("__")
|
||||
key_parts = parts[0].split("-")
|
||||
lsn_parts = parts[1].split("-")
|
||||
return (
|
||||
int(key_parts[0], 16),
|
||||
int(key_parts[1], 16),
|
||||
int(lsn_parts[0], 16),
|
||||
int(lsn_parts[1], 16),
|
||||
)
|
||||
|
||||
|
||||
def get_scale_for_db(size_mb: int) -> int:
|
||||
"""Returns pgbench scale factor for given target db size in MB.
|
||||
|
||||
|
||||
@@ -1,122 +0,0 @@
|
||||
import json
|
||||
import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from fixtures.benchmark_fixture import NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, last_flush_lsn_upload
|
||||
from fixtures.pageserver.utils import wait_until_tenant_active
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from fixtures.types import TenantId
|
||||
|
||||
|
||||
def test_getpage_throughput(
|
||||
neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker, pg_bin: PgBin
|
||||
):
|
||||
neon_env_builder.enable_generations = True
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
remote_storage = env.pageserver_remote_storage
|
||||
assert isinstance(remote_storage, LocalFsStorage)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# clean up the useless default tenant
|
||||
ps_http.tenant_delete(env.initial_tenant)
|
||||
|
||||
# create our template tenant
|
||||
tenant_config_mgmt_api = {
|
||||
"gc_period": "0s",
|
||||
"checkpoint_timeout": "3650 day",
|
||||
"compaction_period": "20 s",
|
||||
"compaction_threshold": 10,
|
||||
"compaction_target_size": 134217728,
|
||||
"checkpoint_distance": 268435456,
|
||||
"image_creation_threshold": 3,
|
||||
}
|
||||
tenant_config_cli = {k: str(v) for k, v in tenant_config_mgmt_api.items()}
|
||||
|
||||
template_tenant, template_timeline = env.neon_cli.create_tenant(conf=tenant_config_cli)
|
||||
template_tenant_gen = int(ps_http.tenant_status(template_tenant)["generation"])
|
||||
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s50", ep.connstr()])
|
||||
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
|
||||
ps_http.tenant_detach(template_tenant)
|
||||
|
||||
# stop PS just for good measure
|
||||
env.pageserver.stop()
|
||||
|
||||
# duplicate the tenant in remote storage
|
||||
src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines"
|
||||
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
|
||||
tenants = [template_tenant]
|
||||
for i in range(0, 200):
|
||||
new_tenant = TenantId.generate()
|
||||
tenants.append(new_tenant)
|
||||
log.info("Duplicating tenant #%s: %s", i, new_tenant)
|
||||
|
||||
dst_timelines_dir: Path = remote_storage.tenant_path(new_tenant) / "timelines"
|
||||
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
|
||||
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
|
||||
|
||||
for tl in src_timelines_dir.iterdir():
|
||||
src_tl_dir = src_timelines_dir / tl.name
|
||||
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
|
||||
dst_tl_dir = dst_timelines_dir / tl.name
|
||||
dst_tl_dir.mkdir(parents=False, exist_ok=False)
|
||||
for file in tl.iterdir():
|
||||
shutil.copy2(file, dst_tl_dir)
|
||||
if "__" in file.name:
|
||||
cmd: List[str] = [
|
||||
str(
|
||||
env.neon_binpath / "pagectl"
|
||||
), # TODO: abstract this like the other binaries
|
||||
"layer",
|
||||
"rewrite-summary",
|
||||
str(dst_tl_dir / file.name),
|
||||
"--new-tenant-id",
|
||||
str(new_tenant),
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
else:
|
||||
# index_part etc need no patching
|
||||
pass
|
||||
|
||||
env.pageserver.start()
|
||||
assert ps_http.tenant_list() == []
|
||||
for tenant in tenants:
|
||||
ps_http.tenant_attach(
|
||||
tenant, config=tenant_config_mgmt_api, generation=template_tenant_gen + 1
|
||||
)
|
||||
for tenant in tenants:
|
||||
wait_until_tenant_active(ps_http, tenant)
|
||||
|
||||
# ensure all layers are resident for predictiable performance
|
||||
# TODO: ensure all kinds of eviction are disabled (per-tenant, disk-usage-based)
|
||||
for tenant in tenants:
|
||||
ps_http.download_all_layers(tenant, template_timeline)
|
||||
|
||||
# run the benchmark with one client per timeline, each doing 10k requests to random keys.
|
||||
cmd = [
|
||||
str(env.neon_binpath / "pagebench"),
|
||||
"get-page-latest-lsn",
|
||||
"--mgmt-api-endpoint",
|
||||
ps_http.base_url,
|
||||
"--page-service-connstring",
|
||||
env.pageserver.connstr(password=None),
|
||||
"--runtime",
|
||||
"10s",
|
||||
*[f"{tenant}/{template_timeline}" for tenant in tenants],
|
||||
]
|
||||
log.info(f"command: {' '.join(cmd)}")
|
||||
basepath = pg_bin.run_capture(cmd)
|
||||
results_path = Path(basepath + ".stdout")
|
||||
log.info(f"Benchmark results at: {results_path}")
|
||||
|
||||
with open(results_path, "r") as f:
|
||||
results = json.load(f)
|
||||
|
||||
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
|
||||
@@ -24,8 +24,6 @@ def check_backpressure(endpoint: Endpoint, stop_event: threading.Event, polling_
|
||||
log.info("checks started")
|
||||
|
||||
with pg_cur(endpoint) as cur:
|
||||
cur.execute("CREATE EXTENSION neon") # TODO move it to neon_fixtures?
|
||||
|
||||
cur.execute("select pg_size_bytes(current_setting('max_replication_write_lag'))")
|
||||
res = cur.fetchone()
|
||||
max_replication_write_lag_bytes = res[0]
|
||||
@@ -102,9 +100,13 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
|
||||
# Create a branch for us
|
||||
env.neon_cli.create_branch("test_backpressure")
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
endpoint = env.endpoints.create(
|
||||
"test_backpressure", config_lines=["max_replication_write_lag=30MB"]
|
||||
)
|
||||
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
|
||||
# which is needed for backpressure_lsns() to work
|
||||
endpoint.respec(skip_pg_catalog_updates=False)
|
||||
endpoint.start()
|
||||
log.info("postgres is running on 'test_backpressure' branch")
|
||||
|
||||
# setup check thread
|
||||
|
||||
@@ -114,6 +114,7 @@ def test_timeline_init_break_before_checkpoint(neon_env_builder: NeonEnvBuilder)
|
||||
[
|
||||
".*Failed to process timeline dir contents.*Timeline has no ancestor and no layer files.*",
|
||||
".*Timeline got dropped without initializing, cleaning its files.*",
|
||||
".*Failed to load index_part from remote storage, failed creation?.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -143,6 +144,58 @@ def test_timeline_init_break_before_checkpoint(neon_env_builder: NeonEnvBuilder)
|
||||
), "pageserver should clean its temp timeline files on timeline creation failure"
|
||||
|
||||
|
||||
def test_timeline_init_break_before_checkpoint_recreate(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*Failed to process timeline dir contents.*Timeline has no ancestor and no layer files.*",
|
||||
".*Timeline got dropped without initializing, cleaning its files.*",
|
||||
".*Failed to load index_part from remote storage, failed creation?.*",
|
||||
]
|
||||
)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
|
||||
timelines_dir = env.pageserver.timeline_dir(tenant_id)
|
||||
old_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
|
||||
initial_timeline_dirs = [d for d in timelines_dir.iterdir()]
|
||||
|
||||
# Some fixed timeline ID (like control plane does)
|
||||
timeline_id = TimelineId("1080243c1f76fe3c5147266663c9860b")
|
||||
|
||||
# Introduce failpoint during timeline init (some intermediate files are on disk), before it's checkpointed.
|
||||
pageserver_http.configure_failpoints(("before-checkpoint-new-timeline", "return"))
|
||||
with pytest.raises(Exception, match="before-checkpoint-new-timeline"):
|
||||
_ = env.neon_cli.create_timeline(
|
||||
"test_timeline_init_break_before_checkpoint", tenant_id, timeline_id
|
||||
)
|
||||
|
||||
# Restart the page server
|
||||
env.pageserver.restart(immediate=True)
|
||||
|
||||
# Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally.
|
||||
new_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
|
||||
assert (
|
||||
new_tenant_timelines == old_tenant_timelines
|
||||
), f"Pageserver after restart should ignore non-initialized timelines for tenant {tenant_id}"
|
||||
|
||||
timeline_dirs = [d for d in timelines_dir.iterdir()]
|
||||
assert (
|
||||
timeline_dirs == initial_timeline_dirs
|
||||
), "pageserver should clean its temp timeline files on timeline creation failure"
|
||||
|
||||
# Disable the failpoint again
|
||||
pageserver_http.configure_failpoints(("before-checkpoint-new-timeline", "off"))
|
||||
# creating the branch should have worked now
|
||||
new_timeline_id = env.neon_cli.create_timeline(
|
||||
"test_timeline_init_break_before_checkpoint", tenant_id, timeline_id
|
||||
)
|
||||
|
||||
assert timeline_id == new_timeline_id
|
||||
|
||||
|
||||
def test_timeline_create_break_after_uninit_mark(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
222
test_runner/regress/test_layers_from_future.py
Normal file
222
test_runner/regress/test_layers_from_future.py
Normal file
@@ -0,0 +1,222 @@
|
||||
import time
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.pageserver.types import (
|
||||
DeltaLayerFileName,
|
||||
ImageLayerFileName,
|
||||
is_future_layer,
|
||||
)
|
||||
from fixtures.pageserver.utils import (
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload_queue_empty,
|
||||
wait_until_tenant_active,
|
||||
)
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar, wait_until
|
||||
|
||||
|
||||
def test_issue_5878(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Regression test for issue https://github.com/neondatabase/neon/issues/5878 .
|
||||
|
||||
Create a situation where IndexPart contains an image layer from a future
|
||||
(i.e., image layer > IndexPart::disk_consistent_lsn).
|
||||
Detach.
|
||||
Attach.
|
||||
Wait for tenant to finish load_layer_map (by waiting for it to become active).
|
||||
Wait for any remote timeline client ops to finish that the attach started.
|
||||
Integrity-check the index part.
|
||||
|
||||
Before fixing the issue, load_layer_map would schedule removal of the future
|
||||
image layer. A compaction run could later re-create the image layer with
|
||||
the same file name, scheduling a PUT.
|
||||
Due to lack of an upload queue barrier, the PUT and DELETE could be re-ordered.
|
||||
The result was IndexPart referencing a non-existent object.
|
||||
"""
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
l0_l1_threshold = 3
|
||||
image_creation_threshold = 1
|
||||
|
||||
tenant_config = {
|
||||
"gc_period": "0s", # disable GC (shouldn't matter for this test but still)
|
||||
"compaction_period": "0s", # we want to control when compaction runs
|
||||
"checkpoint_timeout": "24h", # something we won't reach
|
||||
"checkpoint_distance": f"{50 * (1024**2)}", # something we won't reach, we checkpoint manually
|
||||
"image_creation_threshold": f"{image_creation_threshold}",
|
||||
"compaction_threshold": f"{l0_l1_threshold}",
|
||||
"compaction_target_size": f"{128 * (1024**3)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
|
||||
}
|
||||
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(conf=tenant_config)
|
||||
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
|
||||
def get_index_part():
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
ip_path = env.pageserver_remote_storage.index_path(tenant_id, timeline_id)
|
||||
return env.pagectl.dump_index_part(ip_path)
|
||||
|
||||
def get_future_layers():
|
||||
ip = get_index_part()
|
||||
future_layers = [
|
||||
layer_file_name
|
||||
for layer_file_name in ip.layer_metadata.keys()
|
||||
if is_future_layer(layer_file_name, ip.disk_consistent_lsn)
|
||||
]
|
||||
return future_layers
|
||||
|
||||
assert len(get_future_layers()) == 0
|
||||
|
||||
current = get_index_part()
|
||||
assert len(set(current.layer_metadata.keys())) == 1
|
||||
layer_file_name = list(current.layer_metadata.keys())[0]
|
||||
assert isinstance(layer_file_name, DeltaLayerFileName)
|
||||
assert layer_file_name.is_l0(), f"{layer_file_name}"
|
||||
|
||||
log.info("force image layer creation in the future by writing some data into in-memory layer")
|
||||
|
||||
# Create a number of layers in the tenant
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("CREATE TABLE foo (t text)")
|
||||
iters = l0_l1_threshold * image_creation_threshold
|
||||
for i in range(0, iters):
|
||||
cur.execute(
|
||||
f"""
|
||||
INSERT INTO foo
|
||||
SELECT '{i}' || g
|
||||
FROM generate_series(1, 10000) g
|
||||
"""
|
||||
)
|
||||
last_record_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
|
||||
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_record_lsn)
|
||||
# 0..iters-1: create a stack of delta layers
|
||||
# iters: leave a non-empty in-memory layer which we'll use for image layer generation
|
||||
if i < iters - 1:
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id, force_repartition=True)
|
||||
assert (
|
||||
len(
|
||||
[
|
||||
layer
|
||||
for layer in ps_http.layer_map_info(
|
||||
tenant_id, timeline_id
|
||||
).historic_layers
|
||||
if layer.kind == "Image"
|
||||
]
|
||||
)
|
||||
== 0
|
||||
)
|
||||
|
||||
endpoint.stop()
|
||||
|
||||
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
|
||||
|
||||
ip = get_index_part()
|
||||
assert len(ip.layer_metadata.keys())
|
||||
assert (
|
||||
ip.disk_consistent_lsn < last_record_lsn
|
||||
), "sanity check for what above loop is supposed to do"
|
||||
|
||||
# create the image layer from the future
|
||||
ps_http.timeline_compact(tenant_id, timeline_id, force_repartition=True)
|
||||
assert (
|
||||
len(
|
||||
[
|
||||
layer
|
||||
for layer in ps_http.layer_map_info(tenant_id, timeline_id).historic_layers
|
||||
if layer.kind == "Image"
|
||||
]
|
||||
)
|
||||
== 1
|
||||
)
|
||||
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
|
||||
future_layers = get_future_layers()
|
||||
assert len(future_layers) == 1
|
||||
future_layer = future_layers[0]
|
||||
assert isinstance(future_layer, ImageLayerFileName)
|
||||
assert future_layer.lsn == last_record_lsn
|
||||
log.info(
|
||||
f"got layer from the future: lsn={future_layer.lsn} disk_consistent_lsn={ip.disk_consistent_lsn} last_record_lsn={last_record_lsn}"
|
||||
)
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
future_layer_path = env.pageserver_remote_storage.layer_path(
|
||||
tenant_id, timeline_id, future_layer
|
||||
)
|
||||
log.info(f"future layer path: {future_layer_path}")
|
||||
pre_stat = future_layer_path.stat()
|
||||
time.sleep(1.1) # so that we can use change in pre_stat.st_mtime to detect overwrites
|
||||
|
||||
# force removal of layers from the future
|
||||
tenant_conf = ps_http.tenant_config(tenant_id)
|
||||
ps_http.tenant_detach(tenant_id)
|
||||
failpoint_name = "before-delete-layer-pausable"
|
||||
ps_http.configure_failpoints((failpoint_name, "pause"))
|
||||
ps_http.tenant_attach(tenant_id, tenant_conf.tenant_specific_overrides)
|
||||
wait_until_tenant_active(ps_http, tenant_id)
|
||||
|
||||
# Ensure the IndexPart upload that unlinks the layer file finishes, i.e., doesn't clog the queue.
|
||||
def future_layer_is_gone_from_index_part():
|
||||
future_layers = set(get_future_layers())
|
||||
assert future_layer not in future_layers
|
||||
|
||||
wait_until(10, 0.5, future_layer_is_gone_from_index_part)
|
||||
|
||||
# NB: the layer file is unlinked index part now, but, because we made the delete
|
||||
# operation stuck, the layer file itself is still in the remote_storage
|
||||
def delete_at_pause_point():
|
||||
assert env.pageserver.log_contains(f".*{tenant_id}.*at failpoint.*{failpoint_name}")
|
||||
|
||||
wait_until(10, 0.5, delete_at_pause_point)
|
||||
assert future_layer_path.exists()
|
||||
|
||||
# wait for re-ingestion of the WAL from safekeepers into the in-memory layer
|
||||
# (this happens in parallel to the above)
|
||||
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, last_record_lsn)
|
||||
|
||||
# re-do image layer generation
|
||||
# This will produce the same image layer and queue an upload.
|
||||
# However, we still have the deletion for the layer queued, stuck on the failpoint.
|
||||
# An incorrect implementation would let the PUT execute before the DELETE.
|
||||
# The later code in this test asserts that this doesn't happen.
|
||||
ps_http.timeline_compact(tenant_id, timeline_id, force_repartition=True)
|
||||
|
||||
# Let things sit for some time; a good implementation makes no progress because
|
||||
# we can't execute the PUT before the DELETE. A bad implementation would do that.
|
||||
max_race_opportunity_window = 4
|
||||
start = time.monotonic()
|
||||
while True:
|
||||
post_stat = future_layer_path.stat()
|
||||
assert (
|
||||
pre_stat.st_mtime == post_stat.st_mtime
|
||||
), "observed PUT overtake the stucked DELETE => bug isn't fixed yet"
|
||||
if time.monotonic() - start > max_race_opportunity_window:
|
||||
log.info(
|
||||
"a correct implementation would never let the later PUT overtake the earlier DELETE"
|
||||
)
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
# Window has passed, unstuck the delete, let upload queue drain.
|
||||
log.info("unstuck the DELETE")
|
||||
ps_http.configure_failpoints(("before-delete-layer-pausable", "off"))
|
||||
|
||||
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
|
||||
|
||||
# Examine the resulting S3 state.
|
||||
log.info("integrity-check the remote storage")
|
||||
ip = get_index_part()
|
||||
for layer_file_name in ip.layer_metadata.keys():
|
||||
layer_path = env.pageserver_remote_storage.layer_path(
|
||||
tenant_id, timeline_id, layer_file_name
|
||||
)
|
||||
assert layer_path.exists(), f"{layer_file_name.to_str()}"
|
||||
|
||||
log.info("assert that the overwritten layer won")
|
||||
final_stat = future_layer_path.stat()
|
||||
assert final_stat.st_mtime != pre_stat.st_mtime
|
||||
74
test_runner/regress/test_local_file_cache.py
Normal file
74
test_runner/regress/test_local_file_cache.py
Normal file
@@ -0,0 +1,74 @@
|
||||
import os
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
def test_local_file_cache_unlink(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
cache_dir = os.path.join(env.repo_dir, "file_cache")
|
||||
os.mkdir(cache_dir)
|
||||
|
||||
env.neon_cli.create_branch("test_local_file_cache_unlink", "empty")
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_local_file_cache_unlink",
|
||||
config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
f"neon.file_cache_path='{cache_dir}/file.cache'",
|
||||
"neon.max_file_cache_size='64MB'",
|
||||
"neon.file_cache_size_limit='10MB'",
|
||||
],
|
||||
)
|
||||
|
||||
cur = endpoint.connect().cursor()
|
||||
|
||||
n_rows = 100000
|
||||
n_threads = 20
|
||||
n_updates_per_thread = 10000
|
||||
n_updates_per_connection = 1000
|
||||
n_total_updates = n_threads * n_updates_per_thread
|
||||
|
||||
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
|
||||
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
|
||||
|
||||
# Start threads that will perform random UPDATEs. Each UPDATE
|
||||
# increments the counter on the row, so that we can check at the
|
||||
# end that the sum of all the counters match the number of updates
|
||||
# performed (plus the initial 1 on each row).
|
||||
#
|
||||
# Furthermore, each thread will reconnect between every 1000 updates.
|
||||
def run_updates():
|
||||
n_updates_performed = 0
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
for _ in range(n_updates_per_thread):
|
||||
id = random.randint(1, n_rows)
|
||||
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
|
||||
n_updates_performed += 1
|
||||
if n_updates_performed % n_updates_per_connection == 0:
|
||||
cur.close()
|
||||
conn.close()
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
threads: List[threading.Thread] = []
|
||||
for _i in range(n_threads):
|
||||
thread = threading.Thread(target=run_updates, args=(), daemon=True)
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
|
||||
os.rename(cache_dir, new_cache_dir)
|
||||
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows
|
||||
28
test_runner/regress/test_neon_extension.py
Normal file
28
test_runner/regress/test_neon_extension.py
Normal file
@@ -0,0 +1,28 @@
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
# Verify that the neon extension is installed and has the correct version.
|
||||
def test_neon_extension(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_create_extension_neon")
|
||||
|
||||
endpoint_main = env.endpoints.create("test_create_extension_neon")
|
||||
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
|
||||
endpoint_main.respec(skip_pg_catalog_updates=False)
|
||||
endpoint_main.start()
|
||||
|
||||
log.info("postgres is running on 'test_create_extension_neon' branch")
|
||||
|
||||
with closing(endpoint_main.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT extversion from pg_extension where extname='neon'")
|
||||
# If this fails, it means the extension is either not installed
|
||||
# or was updated and the version is different.
|
||||
#
|
||||
# IMPORTANT:
|
||||
# If the version has changed, the test should be updated.
|
||||
# Ensure that the default version is also updated in the neon.control file
|
||||
assert cur.fetchone() == ("1.1",)
|
||||
@@ -1,6 +1,7 @@
|
||||
import enum
|
||||
import os
|
||||
import shutil
|
||||
from threading import Thread
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -27,7 +28,7 @@ from fixtures.remote_storage import (
|
||||
available_s3_storages,
|
||||
)
|
||||
from fixtures.types import TenantId
|
||||
from fixtures.utils import run_pg_bench_small
|
||||
from fixtures.utils import run_pg_bench_small, wait_until
|
||||
|
||||
|
||||
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
|
||||
@@ -399,4 +400,78 @@ def test_tenant_delete_is_resumed_on_attach(
|
||||
)
|
||||
|
||||
|
||||
def test_long_timeline_create_cancelled_by_tenant_delete(neon_env_builder: NeonEnvBuilder):
|
||||
"""Reproduction of 2023-11-23 stuck tenants investigation"""
|
||||
|
||||
# do not use default tenant/timeline creation because it would output the failpoint log message too early
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# happens with the cancellation bailing flushing loop earlier, leaving disk_consistent_lsn at zero
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*Timeline got dropped without initializing, cleaning its files"
|
||||
)
|
||||
# the response hit_pausable_failpoint_and_later_fail
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*Error processing HTTP request: InternalServerError\\(new timeline {env.initial_tenant}/{env.initial_timeline} has invalid disk_consistent_lsn"
|
||||
)
|
||||
|
||||
pageserver_http.tenant_create(env.initial_tenant)
|
||||
|
||||
failpoint = "flush-layer-cancel-after-writing-layer-out-pausable"
|
||||
pageserver_http.configure_failpoints((failpoint, "pause"))
|
||||
|
||||
def hit_pausable_failpoint_and_later_fail():
|
||||
with pytest.raises(
|
||||
PageserverApiException, match="new timeline \\S+ has invalid disk_consistent_lsn"
|
||||
):
|
||||
pageserver_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline
|
||||
)
|
||||
|
||||
def start_deletion():
|
||||
pageserver_http.tenant_delete(env.initial_tenant)
|
||||
|
||||
def has_hit_failpoint():
|
||||
assert env.pageserver.log_contains(f"at failpoint {failpoint}") is not None
|
||||
|
||||
def deletion_has_started_waiting_for_timelines():
|
||||
assert env.pageserver.log_contains("Waiting for timelines...") is not None
|
||||
|
||||
def tenant_is_deleted():
|
||||
try:
|
||||
pageserver_http.tenant_status(env.initial_tenant)
|
||||
except PageserverApiException as e:
|
||||
assert e.status_code == 404
|
||||
else:
|
||||
raise RuntimeError("tenant was still accessible")
|
||||
|
||||
creation = Thread(target=hit_pausable_failpoint_and_later_fail)
|
||||
creation.start()
|
||||
|
||||
deletion = None
|
||||
|
||||
try:
|
||||
wait_until(10, 1, has_hit_failpoint)
|
||||
|
||||
# it should start ok, sync up with the stuck creation, then fail because disk_consistent_lsn was not updated
|
||||
# then deletion should fail and set the tenant broken
|
||||
deletion = Thread(target=start_deletion)
|
||||
deletion.start()
|
||||
|
||||
wait_until(10, 1, deletion_has_started_waiting_for_timelines)
|
||||
|
||||
pageserver_http.configure_failpoints((failpoint, "off"))
|
||||
|
||||
creation.join()
|
||||
deletion.join()
|
||||
|
||||
wait_until(10, 1, tenant_is_deleted)
|
||||
finally:
|
||||
creation.join()
|
||||
if deletion is not None:
|
||||
deletion.join()
|
||||
|
||||
|
||||
# TODO test concurrent deletions with "hang" failpoint
|
||||
|
||||
@@ -134,10 +134,11 @@ def wait_for_pageserver_catchup(endpoint_main: Endpoint, polling_interval=1, tim
|
||||
res = endpoint_main.safe_psql(
|
||||
"""
|
||||
SELECT
|
||||
pg_size_pretty(pg_cluster_size()),
|
||||
pg_size_pretty(neon.pg_cluster_size()),
|
||||
pg_wal_lsn_diff(pg_current_wal_flush_lsn(), received_lsn) as received_lsn_lag
|
||||
FROM backpressure_lsns();
|
||||
"""
|
||||
FROM neon.backpressure_lsns();
|
||||
""",
|
||||
dbname="postgres",
|
||||
)[0]
|
||||
log.info(f"pg_cluster_size = {res[0]}, received_lsn_lag = {res[1]}")
|
||||
received_lsn_lag = res[1]
|
||||
@@ -152,17 +153,20 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
wait_for_timeline_size_init(client, tenant=env.initial_tenant, timeline=new_timeline_id)
|
||||
|
||||
endpoint_main = env.endpoints.create_start(
|
||||
endpoint_main = env.endpoints.create(
|
||||
"test_timeline_size_quota",
|
||||
# Set small limit for the test
|
||||
config_lines=["neon.max_cluster_size=30MB"],
|
||||
)
|
||||
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
|
||||
# which is needed for pg_cluster_size() to work
|
||||
endpoint_main.respec(skip_pg_catalog_updates=False)
|
||||
endpoint_main.start()
|
||||
|
||||
log.info("postgres is running on 'test_timeline_size_quota' branch")
|
||||
|
||||
with closing(endpoint_main.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CREATE EXTENSION neon") # TODO move it to neon_fixtures?
|
||||
|
||||
cur.execute("CREATE TABLE foo (t text)")
|
||||
|
||||
wait_for_pageserver_catchup(endpoint_main)
|
||||
@@ -211,7 +215,7 @@ def test_timeline_size_quota(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
wait_for_pageserver_catchup(endpoint_main)
|
||||
|
||||
cur.execute("SELECT * from pg_size_pretty(pg_cluster_size())")
|
||||
cur.execute("SELECT * from pg_size_pretty(neon.pg_cluster_size())")
|
||||
pg_cluster_size = cur.fetchone()
|
||||
log.info(f"pg_cluster_size = {pg_cluster_size}")
|
||||
|
||||
|
||||
@@ -1,14 +1,19 @@
|
||||
import sys
|
||||
import tarfile
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
import zstandard
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
VanillaPostgres,
|
||||
)
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.remote_storage import LocalFsStorage
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
@@ -53,3 +58,70 @@ def test_wal_restore(
|
||||
)
|
||||
restored.start()
|
||||
assert restored.safe_psql("select count(*) from t", user="cloud_admin") == [(300000,)]
|
||||
|
||||
|
||||
def decompress_zstd(
|
||||
input_file_name: Path,
|
||||
output_dir: Path,
|
||||
):
|
||||
log.info(f"decompressing zstd to: {output_dir}")
|
||||
output_dir.mkdir(mode=0o750, parents=True, exist_ok=True)
|
||||
with tempfile.TemporaryFile(suffix=".tar") as temp:
|
||||
decompressor = zstandard.ZstdDecompressor()
|
||||
with open(input_file_name, "rb") as input_file:
|
||||
decompressor.copy_stream(input_file, temp)
|
||||
temp.seek(0)
|
||||
with tarfile.open(fileobj=temp) as tfile:
|
||||
tfile.extractall(path=output_dir)
|
||||
|
||||
|
||||
def test_wal_restore_initdb(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
test_output_dir: Path,
|
||||
port_distributor: PortDistributor,
|
||||
base_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
):
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
endpoint.safe_psql("create table t as select generate_series(1,300000)")
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
original_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
|
||||
env.pageserver.stop()
|
||||
port = port_distributor.get_port()
|
||||
data_dir = test_output_dir / "pgsql.restored"
|
||||
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
|
||||
initdb_zst_path = (
|
||||
env.pageserver_remote_storage.timeline_path(tenant_id, timeline_id) / "initdb.tar.zst"
|
||||
)
|
||||
|
||||
decompress_zstd(initdb_zst_path, data_dir)
|
||||
with VanillaPostgres(
|
||||
data_dir, PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version), port, init=False
|
||||
) as restored:
|
||||
pg_bin.run_capture(
|
||||
[
|
||||
str(base_dir / "libs" / "utils" / "scripts" / "restore_from_wal_initdb.sh"),
|
||||
str(pg_distrib_dir / f"v{env.pg_version}/bin"),
|
||||
str(
|
||||
test_output_dir
|
||||
/ "repo"
|
||||
/ "safekeepers"
|
||||
/ "sk1"
|
||||
/ str(tenant_id)
|
||||
/ str(timeline_id)
|
||||
),
|
||||
str(data_dir),
|
||||
str(port),
|
||||
]
|
||||
)
|
||||
restored.start()
|
||||
restored_lsn = Lsn(
|
||||
restored.safe_psql("SELECT pg_current_wal_flush_lsn()", user="cloud_admin")[0][0]
|
||||
)
|
||||
log.info(f"original lsn: {original_lsn}, restored lsn: {restored_lsn}")
|
||||
assert restored.safe_psql("select count(*) from t", user="cloud_admin") == [(300000,)]
|
||||
|
||||
@@ -68,6 +68,9 @@ tracing-core = { version = "0.1" }
|
||||
tungstenite = { version = "0.20" }
|
||||
url = { version = "2", features = ["serde"] }
|
||||
uuid = { version = "1", features = ["serde", "v4"] }
|
||||
zstd = { version = "0.12" }
|
||||
zstd-safe = { version = "6", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
|
||||
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }
|
||||
|
||||
[build-dependencies]
|
||||
anyhow = { version = "1", features = ["backtrace"] }
|
||||
|
||||
Reference in New Issue
Block a user