mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 15:10:44 +00:00
Compare commits
1 Commits
erik/prod-
...
erik/wal-f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8dd8369a83 |
217
Cargo.lock
generated
217
Cargo.lock
generated
@@ -46,15 +46,6 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aligned-vec"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e0966165eaf052580bd70eb1b32cb3d6245774c0104d1b2793e9650bf83b52a"
|
||||
dependencies = [
|
||||
"equator",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "allocator-api2"
|
||||
version = "0.2.16"
|
||||
@@ -155,12 +146,6 @@ dependencies = [
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "arrayvec"
|
||||
version = "0.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
|
||||
|
||||
[[package]]
|
||||
name = "asn1-rs"
|
||||
version = "0.6.2"
|
||||
@@ -757,7 +742,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"paste",
|
||||
"pin-project",
|
||||
"quick-xml 0.31.0",
|
||||
"quick-xml",
|
||||
"rand 0.8.5",
|
||||
"reqwest 0.11.19",
|
||||
"rustc_version",
|
||||
@@ -1396,15 +1381,6 @@ version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa"
|
||||
|
||||
[[package]]
|
||||
name = "cpp_demangle"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "96e58d342ad113c2b878f16d5d034c03be492ae460cdbc02b7f0f2284d310c7d"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cpufeatures"
|
||||
version = "0.2.9"
|
||||
@@ -1928,26 +1904,6 @@ dependencies = [
|
||||
"termcolor",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equator"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c35da53b5a021d2484a7cc49b2ac7f2d840f8236a286f84202369bd338d761ea"
|
||||
dependencies = [
|
||||
"equator-macro",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equator-macro"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3bf679796c0322556351f287a51b49e48f7c4986e727b5dd78c972d30e2e16cc"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equivalent"
|
||||
version = "1.0.1"
|
||||
@@ -2055,18 +2011,6 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "findshlibs"
|
||||
version = "0.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40b9e59cd0f7e0806cca4be089683ecb6434e602038df21fe6bf6711b2f07f64"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fixedbitset"
|
||||
version = "0.4.2"
|
||||
@@ -2770,24 +2714,6 @@ version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac"
|
||||
|
||||
[[package]]
|
||||
name = "inferno"
|
||||
version = "0.11.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"indexmap 2.0.1",
|
||||
"is-terminal",
|
||||
"itoa",
|
||||
"log",
|
||||
"num-format",
|
||||
"once_cell",
|
||||
"quick-xml 0.26.0",
|
||||
"rgb",
|
||||
"str_stack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify"
|
||||
version = "0.9.6"
|
||||
@@ -3127,15 +3053,6 @@ version = "2.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
|
||||
|
||||
[[package]]
|
||||
name = "memmap2"
|
||||
version = "0.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "45fd3a57831bf88bc63f8cebc0cf956116276e97fef3966103e96416209f7c92"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.7.1"
|
||||
@@ -3361,16 +3278,6 @@ version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
|
||||
|
||||
[[package]]
|
||||
name = "num-format"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3"
|
||||
dependencies = [
|
||||
"arrayvec",
|
||||
"itoa",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-integer"
|
||||
version = "0.1.45"
|
||||
@@ -4201,31 +4108,6 @@ version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
|
||||
|
||||
[[package]]
|
||||
name = "pprof"
|
||||
version = "0.14.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ebbe2f8898beba44815fdc9e5a4ae9c929e21c5dc29b0c774a15555f7f58d6d0"
|
||||
dependencies = [
|
||||
"aligned-vec",
|
||||
"backtrace",
|
||||
"cfg-if",
|
||||
"criterion",
|
||||
"findshlibs",
|
||||
"inferno",
|
||||
"libc",
|
||||
"log",
|
||||
"nix 0.26.4",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"protobuf",
|
||||
"protobuf-codegen-pure",
|
||||
"smallvec",
|
||||
"symbolic-demangle",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.17"
|
||||
@@ -4378,31 +4260,6 @@ dependencies = [
|
||||
"prost",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf"
|
||||
version = "2.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
|
||||
|
||||
[[package]]
|
||||
name = "protobuf-codegen"
|
||||
version = "2.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "033460afb75cf755fcfc16dfaed20b86468082a2ea24e05ac35ab4a099a017d6"
|
||||
dependencies = [
|
||||
"protobuf",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf-codegen-pure"
|
||||
version = "2.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "95a29399fc94bcd3eeaa951c715f7bea69409b2445356b00519740bcd6ddd865"
|
||||
dependencies = [
|
||||
"protobuf",
|
||||
"protobuf-codegen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proxy"
|
||||
version = "0.1.0"
|
||||
@@ -4514,15 +4371,6 @@ dependencies = [
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.31.0"
|
||||
@@ -5005,15 +4853,6 @@ dependencies = [
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rgb"
|
||||
version = "0.8.50"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57397d16646700483b67d2dd6511d79318f9d057fdbd21a4066aeac8b41d310a"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.17.6"
|
||||
@@ -5327,7 +5166,6 @@ dependencies = [
|
||||
"postgres-protocol",
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
@@ -5825,9 +5663,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "smallvec"
|
||||
version = "1.13.2"
|
||||
version = "1.13.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
|
||||
checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7"
|
||||
|
||||
[[package]]
|
||||
name = "smol_str"
|
||||
@@ -5874,12 +5712,6 @@ dependencies = [
|
||||
"der 0.7.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "stable_deref_trait"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
|
||||
|
||||
[[package]]
|
||||
name = "static_assertions"
|
||||
version = "1.1.0"
|
||||
@@ -6026,12 +5858,6 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "str_stack"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
|
||||
|
||||
[[package]]
|
||||
name = "stringprep"
|
||||
version = "0.1.2"
|
||||
@@ -6079,29 +5905,6 @@ version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "20e16a0f46cf5fd675563ef54f26e83e20f2366bcf027bcb3cc3ed2b98aaf2ca"
|
||||
|
||||
[[package]]
|
||||
name = "symbolic-common"
|
||||
version = "12.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "366f1b4c6baf6cfefc234bbd4899535fca0b06c74443039a73f6dfb2fad88d77"
|
||||
dependencies = [
|
||||
"debugid",
|
||||
"memmap2",
|
||||
"stable_deref_trait",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "symbolic-demangle"
|
||||
version = "12.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aba05ba5b9962ea5617baf556293720a8b2d0a282aa14ee4bf10e22efc7da8c8"
|
||||
dependencies = [
|
||||
"cpp_demangle",
|
||||
"rustc-demangle",
|
||||
"symbolic-common",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "1.0.109"
|
||||
@@ -6271,9 +6074,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tikv-jemalloc-ctl"
|
||||
version = "0.6.0"
|
||||
version = "0.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f21f216790c8df74ce3ab25b534e0718da5a1916719771d3fec23315c99e468b"
|
||||
checksum = "619bfed27d807b54f7f776b9430d4f8060e66ee138a28632ca898584d462c31c"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"paste",
|
||||
@@ -6282,9 +6085,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tikv-jemalloc-sys"
|
||||
version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7"
|
||||
version = "0.5.4+5.3.0-patched"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d"
|
||||
checksum = "9402443cb8fd499b6f327e40565234ff34dbda27460c5b47db0db77443dd85d1"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
@@ -6292,9 +6095,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tikv-jemallocator"
|
||||
version = "0.6.0"
|
||||
version = "0.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865"
|
||||
checksum = "965fe0c26be5c56c94e38ba547249074803efd52adfb66de62107d95aab3eaca"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"tikv-jemalloc-sys",
|
||||
@@ -6969,7 +6772,6 @@ dependencies = [
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"postgres_connection",
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
@@ -7538,7 +7340,6 @@ dependencies = [
|
||||
"libc",
|
||||
"log",
|
||||
"memchr",
|
||||
"nix 0.26.4",
|
||||
"nom",
|
||||
"num-bigint",
|
||||
"num-integer",
|
||||
|
||||
@@ -130,7 +130,6 @@ parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
parquet_derive = "53"
|
||||
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
|
||||
pin-project-lite = "0.2"
|
||||
pprof = { version = "0.14", features = ["criterion", "flamegraph", "protobuf", "protobuf-codec"] }
|
||||
procfs = "0.16"
|
||||
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
|
||||
prost = "0.13"
|
||||
@@ -169,8 +168,8 @@ sync_wrapper = "0.1.2"
|
||||
tar = "0.4"
|
||||
test-context = "0.3"
|
||||
thiserror = "1.0"
|
||||
tikv-jemallocator = { version = "0.6", features = ["stats"] }
|
||||
tikv-jemalloc-ctl = { version = "0.6", features = ["stats"] }
|
||||
tikv-jemallocator = "0.5"
|
||||
tikv-jemalloc-ctl = "0.5"
|
||||
tokio = { version = "1.17", features = ["macros"] }
|
||||
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
|
||||
tokio-io-timeout = "1.2.0"
|
||||
|
||||
@@ -29,7 +29,6 @@ jsonwebtoken.workspace = true
|
||||
nix.workspace = true
|
||||
once_cell.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
pprof.workspace = true
|
||||
regex.workspace = true
|
||||
routerify.workspace = true
|
||||
serde.workspace = true
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use crate::auth::{AuthError, Claims, SwappableJwtAuth};
|
||||
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
|
||||
use crate::http::request::{get_query_param, parse_query_param};
|
||||
use anyhow::{anyhow, Context};
|
||||
use hyper::header::{HeaderName, AUTHORIZATION, CONTENT_DISPOSITION};
|
||||
use anyhow::Context;
|
||||
use hyper::header::{HeaderName, AUTHORIZATION};
|
||||
use hyper::http::HeaderValue;
|
||||
use hyper::Method;
|
||||
use hyper::{header::CONTENT_TYPE, Body, Request, Response};
|
||||
@@ -13,13 +12,11 @@ use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
|
||||
use std::future::Future;
|
||||
use std::io::Write as _;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use pprof::protos::Message as _;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use std::io::Write as _;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
@@ -331,82 +328,6 @@ pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// Generates CPU profiles.
|
||||
pub async fn profile_cpu_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
enum Format {
|
||||
Pprof,
|
||||
Svg,
|
||||
}
|
||||
|
||||
// Parameters.
|
||||
let format = match get_query_param(&req, "format")?.as_deref() {
|
||||
None => Format::Pprof,
|
||||
Some("pprof") => Format::Pprof,
|
||||
Some("svg") => Format::Svg,
|
||||
Some(format) => return Err(ApiError::BadRequest(anyhow!("invalid format {format}"))),
|
||||
};
|
||||
let seconds = match parse_query_param(&req, "seconds")? {
|
||||
None => 5,
|
||||
Some(seconds @ 1..=30) => seconds,
|
||||
Some(_) => return Err(ApiError::BadRequest(anyhow!("duration must be 1-30 secs"))),
|
||||
};
|
||||
let frequency_hz = match parse_query_param(&req, "frequency")? {
|
||||
None => 99,
|
||||
Some(1001..) => return Err(ApiError::BadRequest(anyhow!("frequency must be <=1000 Hz"))),
|
||||
Some(frequency) => frequency,
|
||||
};
|
||||
|
||||
// Only allow one profiler at a time.
|
||||
static PROFILE_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
|
||||
let _lock = PROFILE_LOCK
|
||||
.try_lock()
|
||||
.map_err(|_| ApiError::Conflict("profiler already running".into()))?;
|
||||
|
||||
// Take the profile.
|
||||
let report = tokio::task::spawn_blocking(move || {
|
||||
let guard = pprof::ProfilerGuardBuilder::default()
|
||||
.frequency(frequency_hz)
|
||||
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
|
||||
.build()?;
|
||||
std::thread::sleep(Duration::from_secs(seconds));
|
||||
guard.report().build()
|
||||
})
|
||||
.await
|
||||
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
|
||||
.map_err(|pprof_err| ApiError::InternalServerError(pprof_err.into()))?;
|
||||
|
||||
// Return the report in the requested format.
|
||||
match format {
|
||||
Format::Pprof => {
|
||||
let mut body = Vec::new();
|
||||
report
|
||||
.pprof()
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))?
|
||||
.write_to_vec(&mut body)
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))?;
|
||||
|
||||
Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, "application/octet-stream")
|
||||
.header(CONTENT_DISPOSITION, "attachment; filename=\"profile.pb\"")
|
||||
.body(Body::from(body))
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))
|
||||
}
|
||||
|
||||
Format::Svg => {
|
||||
let mut body = Vec::new();
|
||||
report
|
||||
.flamegraph(&mut body)
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))?;
|
||||
Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, "image/svg+xml")
|
||||
.body(Body::from(body))
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
|
||||
) -> Middleware<B, ApiError> {
|
||||
Middleware::pre(move |req| async move {
|
||||
|
||||
@@ -30,7 +30,7 @@ pub fn parse_request_param<T: FromStr>(
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_query_param<'a>(
|
||||
fn get_query_param<'a>(
|
||||
request: &'a Request<Body>,
|
||||
param_name: &str,
|
||||
) -> Result<Option<Cow<'a, str>>, ApiError> {
|
||||
|
||||
@@ -55,7 +55,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::auth::JwtAuth;
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
use utils::http::endpoint::profile_cpu_handler;
|
||||
use utils::http::endpoint::prometheus_metrics_handler;
|
||||
use utils::http::endpoint::request_span;
|
||||
use utils::http::request::must_parse_query_param;
|
||||
@@ -145,16 +144,10 @@ impl State {
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
secondary_controller: SecondaryController,
|
||||
) -> anyhow::Result<Self> {
|
||||
let allowlist_routes = [
|
||||
"/v1/status",
|
||||
"/v1/doc",
|
||||
"/swagger.yml",
|
||||
"/metrics",
|
||||
"/profile/cpu",
|
||||
]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
Ok(Self {
|
||||
conf,
|
||||
tenant_manager,
|
||||
@@ -3165,7 +3158,6 @@ pub fn make_router(
|
||||
Ok(router
|
||||
.data(state)
|
||||
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
|
||||
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
|
||||
.get("/v1/status", |r| api_handler(r, status_handler))
|
||||
.put("/v1/failpoints", |r| {
|
||||
testing_api_handler("manage failpoints", r, failpoints_handler)
|
||||
|
||||
@@ -653,35 +653,6 @@ pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy<IntCounter> = Lazy::new(|
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_relsize_cache_entries",
|
||||
"Number of entries in the relation size cache",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!("pageserver_relsize_cache_hits", "Relation size cache hits",)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_cache_misses",
|
||||
"Relation size cache misses",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_MISSES_UPDATED: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_cache_misses_updated",
|
||||
"Relation size cache misses where relation update LSN is after lookup LSN",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) mod initial_logical_size {
|
||||
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
@@ -10,9 +10,6 @@ use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::aux_file;
|
||||
use crate::context::RequestContext;
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_UPDATED,
|
||||
};
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
@@ -958,12 +955,9 @@ impl Timeline {
|
||||
let rel_size_cache = self.rel_size_cache.read().unwrap();
|
||||
if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
|
||||
if lsn >= *cached_lsn {
|
||||
RELSIZE_CACHE_HITS.inc();
|
||||
return Some(*nblocks);
|
||||
}
|
||||
RELSIZE_CACHE_MISSES_UPDATED.inc();
|
||||
}
|
||||
RELSIZE_CACHE_MISSES.inc();
|
||||
None
|
||||
}
|
||||
|
||||
@@ -988,7 +982,6 @@ impl Timeline {
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert((lsn, nblocks));
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -996,17 +989,13 @@ impl Timeline {
|
||||
/// Store cached relation size
|
||||
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
}
|
||||
rel_size_cache.map.insert(tag, (lsn, nblocks));
|
||||
}
|
||||
|
||||
/// Remove cached relation size
|
||||
pub fn remove_cached_rel_size(&self, tag: &RelTag) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
if rel_size_cache.map.remove(tag).is_some() {
|
||||
RELSIZE_CACHE_ENTRIES.dec();
|
||||
}
|
||||
rel_size_cache.map.remove(tag);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1022,7 +1011,7 @@ pub struct DatadirModification<'a> {
|
||||
pub tline: &'a Timeline,
|
||||
|
||||
/// Current LSN of the modification
|
||||
pub lsn: Lsn,
|
||||
lsn: Lsn,
|
||||
|
||||
// The modifications are not applied directly to the underlying key-value store.
|
||||
// The put-functions add the modifications here, and they are flushed to the
|
||||
|
||||
@@ -327,25 +327,6 @@ impl WalIngest {
|
||||
let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
||||
let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
||||
|
||||
// Only apply the writes if this shard owns either of the VM pages. These WAL records are
|
||||
// broadcast to all shards, but should only be applied on the relevant ones. See:
|
||||
// https://github.com/neondatabase/neon/issues/9855
|
||||
//
|
||||
// TODO: we should differentiate on old_vm_blk and new_vm_blk since we may only own one of
|
||||
// them, but this is no worse than the old behavior where we applied all records, and will
|
||||
// avoid the performance penalty of the relsize cache miss and DbDir deserialization.
|
||||
//
|
||||
// TODO: consider filtering this during decoding, and avoid the broadcast.
|
||||
let old_is_local = old_vm_blk
|
||||
.map(|blk| self.shard.is_key_local(&rel_block_to_key(vm_rel, blk)))
|
||||
.unwrap_or_default();
|
||||
let new_is_local = new_vm_blk
|
||||
.map(|blk| self.shard.is_key_local(&rel_block_to_key(vm_rel, blk)))
|
||||
.unwrap_or_default();
|
||||
if !old_is_local && !new_is_local {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Sometimes, Postgres seems to create heap WAL records with the
|
||||
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
|
||||
// not set. In fact, it's possible that the VM page does not exist at all.
|
||||
@@ -353,11 +334,7 @@ impl WalIngest {
|
||||
// replaying it would fail to find the previous image of the page, because
|
||||
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
|
||||
// record if it doesn't.
|
||||
let (vm_size, vm_exists) = get_relsize(modification, vm_rel, ctx).await?;
|
||||
if vm_size == 0 {
|
||||
let lsn = modification.lsn;
|
||||
log::info!("ingest_clear_vm_bits: vm_rel {vm_rel} has size={vm_size} exists={vm_exists} at LSN {lsn}, flags={flags} old={old_heap_blkno:?} new={new_heap_blkno:?}");
|
||||
}
|
||||
let vm_size = get_relsize(modification, vm_rel, ctx).await?;
|
||||
if let Some(blknum) = new_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
new_vm_blk = None;
|
||||
@@ -595,7 +572,7 @@ impl WalIngest {
|
||||
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
|
||||
fsm_physical_page_no += 1;
|
||||
}
|
||||
let (nblocks, _) = get_relsize(modification, rel, ctx).await?;
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
if nblocks > fsm_physical_page_no {
|
||||
// check if something to do: FSM is larger than truncate position
|
||||
self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
|
||||
@@ -635,7 +612,7 @@ impl WalIngest {
|
||||
)?;
|
||||
vm_page_no += 1;
|
||||
}
|
||||
let (nblocks, _) = get_relsize(modification, rel, ctx).await?;
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
if nblocks > vm_page_no {
|
||||
// check if something to do: VM is larger than truncate position
|
||||
self.put_rel_truncation(modification, rel, vm_page_no, ctx)
|
||||
@@ -1457,22 +1434,20 @@ async fn get_relsize(
|
||||
modification: &DatadirModification<'_>,
|
||||
rel: RelTag,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(BlockNumber, bool), PageReconstructError> {
|
||||
if !modification
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
let nblocks = if !modification
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(modification), ctx)
|
||||
.await?
|
||||
{
|
||||
Ok((0, false))
|
||||
0
|
||||
} else {
|
||||
Ok((
|
||||
modification
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(modification), ctx)
|
||||
.await?,
|
||||
true,
|
||||
))
|
||||
}
|
||||
modification
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(modification), ctx)
|
||||
.await?
|
||||
};
|
||||
Ok(nblocks)
|
||||
}
|
||||
|
||||
#[allow(clippy::bool_assert_comparison)]
|
||||
|
||||
@@ -30,7 +30,6 @@ once_cell.workspace = true
|
||||
parking_lot.workspace = true
|
||||
postgres.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
pprof.workspace = true
|
||||
rand.workspace = true
|
||||
regex.workspace = true
|
||||
scopeguard.workspace = true
|
||||
|
||||
@@ -14,10 +14,6 @@ cargo bench --package safekeeper --bench receive_wal process_msg/fsync=false
|
||||
|
||||
# List available benchmarks.
|
||||
cargo bench --package safekeeper --benches -- --list
|
||||
|
||||
# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds.
|
||||
# Output in target/criterion/*/profile/flamegraph.svg.
|
||||
cargo bench --package safekeeper --bench receive_wal process_msg/fsync=false --profile-time 10
|
||||
```
|
||||
|
||||
Additional charts and statistics are available in `target/criterion/report/index.html`.
|
||||
|
||||
@@ -10,7 +10,6 @@ use camino_tempfile::tempfile;
|
||||
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
|
||||
use itertools::Itertools as _;
|
||||
use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator};
|
||||
use pprof::criterion::{Output, PProfProfiler};
|
||||
use safekeeper::receive_wal::{self, WalAcceptor};
|
||||
use safekeeper::safekeeper::{
|
||||
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
|
||||
@@ -25,9 +24,8 @@ const GB: usize = 1024 * MB;
|
||||
|
||||
// Register benchmarks with Criterion.
|
||||
criterion_group!(
|
||||
name = benches;
|
||||
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
|
||||
targets = bench_process_msg,
|
||||
benches,
|
||||
bench_process_msg,
|
||||
bench_wal_acceptor,
|
||||
bench_wal_acceptor_throughput,
|
||||
bench_file_write
|
||||
|
||||
@@ -14,9 +14,7 @@ use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, Instrument};
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
use utils::http::endpoint::{
|
||||
profile_cpu_handler, prometheus_metrics_handler, request_span, ChannelWriter,
|
||||
};
|
||||
use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter};
|
||||
use utils::http::request::parse_query_param;
|
||||
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
@@ -576,7 +574,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
router = router.middleware(auth_middleware(|request| {
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
|
||||
["/v1/status", "/metrics", "/pprof/profile"]
|
||||
["/v1/status", "/metrics"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect()
|
||||
@@ -600,7 +598,6 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
.data(Arc::new(conf))
|
||||
.data(auth)
|
||||
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
|
||||
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
|
||||
.get("/v1/status", |r| request_span(r, status_handler))
|
||||
.put("/v1/failpoints", |r| {
|
||||
request_span(r, move |r| async {
|
||||
|
||||
@@ -603,7 +603,10 @@ where
|
||||
|
||||
/// wal_store wrapper avoiding commit_lsn <= flush_lsn violation when we don't have WAL yet.
|
||||
pub fn flush_lsn(&self) -> Lsn {
|
||||
max(self.wal_store.flush_lsn(), self.state.timeline_start_lsn)
|
||||
max(
|
||||
self.wal_store.flush_record_lsn(),
|
||||
self.state.timeline_start_lsn,
|
||||
)
|
||||
}
|
||||
|
||||
/// Process message from proposer and possibly form reply. Concurrent
|
||||
@@ -828,7 +831,7 @@ where
|
||||
//
|
||||
// If we fail before first WAL write flush this action would be
|
||||
// repeated, that's ok because it is idempotent.
|
||||
if self.wal_store.flush_lsn() == Lsn::INVALID {
|
||||
if self.wal_store.flush_record_lsn() == Lsn::INVALID {
|
||||
self.wal_store
|
||||
.initialize_first_segment(msg.start_streaming_at)
|
||||
.await?;
|
||||
@@ -947,7 +950,7 @@ where
|
||||
// while first connection still gets some packets later. It might be
|
||||
// better to not log this as error! above.
|
||||
let write_lsn = self.wal_store.write_lsn();
|
||||
let flush_lsn = self.wal_store.flush_lsn();
|
||||
let flush_lsn = self.wal_store.flush_record_lsn();
|
||||
if write_lsn > msg.h.begin_lsn {
|
||||
bail!(
|
||||
"append request rewrites WAL written before, write_lsn={}, msg lsn={}",
|
||||
@@ -1087,7 +1090,7 @@ mod tests {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
fn flush_record_lsn(&self) -> Lsn {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
|
||||
@@ -176,7 +176,7 @@ pub enum StateSK {
|
||||
impl StateSK {
|
||||
pub fn flush_lsn(&self) -> Lsn {
|
||||
match self {
|
||||
StateSK::Loaded(sk) => sk.wal_store.flush_lsn(),
|
||||
StateSK::Loaded(sk) => sk.wal_store.flush_record_lsn(),
|
||||
StateSK::Offloaded(state) => match state.eviction_state {
|
||||
EvictionState::Offloaded(flush_lsn) => flush_lsn,
|
||||
_ => panic!("StateSK::Offloaded mismatches with eviction_state from control_file"),
|
||||
@@ -1108,11 +1108,11 @@ impl ManagerTimeline {
|
||||
);
|
||||
}
|
||||
|
||||
if wal_store.flush_lsn() != shared.sk.flush_lsn() {
|
||||
if wal_store.flush_record_lsn() != shared.sk.flush_lsn() {
|
||||
bail!(
|
||||
"flush_lsn mismatch in restored WAL, expected {}, got {}",
|
||||
shared.sk.flush_lsn(),
|
||||
wal_store.flush_lsn()
|
||||
wal_store.flush_record_lsn()
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -599,7 +599,7 @@ pub async fn validate_temp_timeline(
|
||||
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
|
||||
|
||||
let commit_lsn = control_store.commit_lsn;
|
||||
let flush_lsn = wal_store.flush_lsn();
|
||||
let flush_lsn = wal_store.flush_record_lsn();
|
||||
|
||||
Ok((commit_lsn, flush_lsn))
|
||||
}
|
||||
|
||||
@@ -35,10 +35,10 @@ use pq_proto::SystemId;
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
pub trait Storage {
|
||||
// Last written LSN.
|
||||
/// Last written LSN.
|
||||
fn write_lsn(&self) -> Lsn;
|
||||
/// LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn;
|
||||
/// End LSN of last durably stored WAL record.
|
||||
fn flush_record_lsn(&self) -> Lsn;
|
||||
|
||||
/// Initialize segment by creating proper long header at the beginning of
|
||||
/// the segment and short header at the page of given LSN. This is only used
|
||||
@@ -116,11 +116,13 @@ pub struct PhysicalStorage {
|
||||
/// The last LSN flushed to disk. May be in the middle of a record.
|
||||
///
|
||||
/// NB: when the rest of the system refers to `flush_lsn`, it usually
|
||||
/// actually refers to `flush_record_lsn`. This ambiguity can be dangerous
|
||||
/// and should be resolved.
|
||||
/// means `flush_record_lsn`. This `flush_lsn` is only used internally.
|
||||
flush_lsn: Lsn,
|
||||
|
||||
/// The LSN of the last WAL record flushed to disk.
|
||||
///
|
||||
/// NB: when the rest of the system refers to `flush_lsn`, it usually
|
||||
/// means `flush_record_lsn`.
|
||||
flush_record_lsn: Lsn,
|
||||
|
||||
/// Decoder is required for detecting boundaries of WAL records.
|
||||
@@ -387,11 +389,8 @@ impl Storage for PhysicalStorage {
|
||||
fn write_lsn(&self) -> Lsn {
|
||||
self.write_lsn
|
||||
}
|
||||
/// flush_lsn returns LSN of last durably stored WAL record.
|
||||
///
|
||||
/// TODO: flush_lsn() returns flush_record_lsn, but write_lsn() returns write_lsn: confusing.
|
||||
#[allow(clippy::misnamed_getters)]
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
/// End LSN of the last durably stored WAL record.
|
||||
fn flush_record_lsn(&self) -> Lsn {
|
||||
self.flush_record_lsn
|
||||
}
|
||||
|
||||
|
||||
@@ -178,7 +178,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
let mut conns: HashMap<usize, ConnState> = HashMap::new();
|
||||
|
||||
for (&_ttid, shared_state) in global.timelines.iter_mut() {
|
||||
let flush_lsn = shared_state.sk.wal_store.flush_lsn();
|
||||
let flush_lsn = shared_state.sk.wal_store.flush_record_lsn();
|
||||
let commit_lsn = shared_state.sk.state.commit_lsn;
|
||||
os.log_event(format!("tli_loaded;{};{}", flush_lsn.0, commit_lsn.0));
|
||||
}
|
||||
|
||||
@@ -180,7 +180,7 @@ impl wal_storage::Storage for DiskWALStorage {
|
||||
self.write_lsn
|
||||
}
|
||||
/// LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
fn flush_record_lsn(&self) -> Lsn {
|
||||
self.flush_record_lsn
|
||||
}
|
||||
|
||||
|
||||
@@ -52,7 +52,6 @@ lazy_static = { version = "1", default-features = false, features = ["spin_no_st
|
||||
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
memchr = { version = "2" }
|
||||
nix = { version = "0.26" }
|
||||
nom = { version = "7" }
|
||||
num-bigint = { version = "0.4" }
|
||||
num-integer = { version = "0.1", features = ["i128"] }
|
||||
@@ -76,7 +75,7 @@ smallvec = { version = "1", default-features = false, features = ["const_new", "
|
||||
spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
|
||||
subtle = { version = "2" }
|
||||
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
|
||||
tikv-jemalloc-sys = { version = "0.6", features = ["stats"] }
|
||||
tikv-jemalloc-sys = { version = "0.5" }
|
||||
time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon", features = ["with-serde_json-1"] }
|
||||
|
||||
Reference in New Issue
Block a user