mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-01 21:30:37 +00:00
Compare commits
5 Commits
release-co
...
erik/prod-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4fe36706d9 | ||
|
|
673815a056 | ||
|
|
ee4cbe8778 | ||
|
|
204eaa194f | ||
|
|
f7cb553ea3 |
201
Cargo.lock
generated
201
Cargo.lock
generated
@@ -46,6 +46,15 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aligned-vec"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7e0966165eaf052580bd70eb1b32cb3d6245774c0104d1b2793e9650bf83b52a"
|
||||
dependencies = [
|
||||
"equator",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "allocator-api2"
|
||||
version = "0.2.16"
|
||||
@@ -146,6 +155,12 @@ dependencies = [
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "arrayvec"
|
||||
version = "0.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
|
||||
|
||||
[[package]]
|
||||
name = "asn1-rs"
|
||||
version = "0.6.2"
|
||||
@@ -742,7 +757,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"paste",
|
||||
"pin-project",
|
||||
"quick-xml",
|
||||
"quick-xml 0.31.0",
|
||||
"rand 0.8.5",
|
||||
"reqwest 0.11.19",
|
||||
"rustc_version",
|
||||
@@ -1381,6 +1396,15 @@ version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa"
|
||||
|
||||
[[package]]
|
||||
name = "cpp_demangle"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "96e58d342ad113c2b878f16d5d034c03be492ae460cdbc02b7f0f2284d310c7d"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cpufeatures"
|
||||
version = "0.2.9"
|
||||
@@ -1904,6 +1928,26 @@ dependencies = [
|
||||
"termcolor",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equator"
|
||||
version = "0.2.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c35da53b5a021d2484a7cc49b2ac7f2d840f8236a286f84202369bd338d761ea"
|
||||
dependencies = [
|
||||
"equator-macro",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equator-macro"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3bf679796c0322556351f287a51b49e48f7c4986e727b5dd78c972d30e2e16cc"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equivalent"
|
||||
version = "1.0.1"
|
||||
@@ -2011,6 +2055,18 @@ dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "findshlibs"
|
||||
version = "0.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40b9e59cd0f7e0806cca4be089683ecb6434e602038df21fe6bf6711b2f07f64"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"lazy_static",
|
||||
"libc",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fixedbitset"
|
||||
version = "0.4.2"
|
||||
@@ -2714,6 +2770,24 @@ version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac"
|
||||
|
||||
[[package]]
|
||||
name = "inferno"
|
||||
version = "0.11.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"indexmap 2.0.1",
|
||||
"is-terminal",
|
||||
"itoa",
|
||||
"log",
|
||||
"num-format",
|
||||
"once_cell",
|
||||
"quick-xml 0.26.0",
|
||||
"rgb",
|
||||
"str_stack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inotify"
|
||||
version = "0.9.6"
|
||||
@@ -3053,6 +3127,15 @@ version = "2.6.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
|
||||
|
||||
[[package]]
|
||||
name = "memmap2"
|
||||
version = "0.9.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "45fd3a57831bf88bc63f8cebc0cf956116276e97fef3966103e96416209f7c92"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.7.1"
|
||||
@@ -3278,6 +3361,16 @@ version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
|
||||
|
||||
[[package]]
|
||||
name = "num-format"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3"
|
||||
dependencies = [
|
||||
"arrayvec",
|
||||
"itoa",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-integer"
|
||||
version = "0.1.45"
|
||||
@@ -4108,6 +4201,31 @@ version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
|
||||
|
||||
[[package]]
|
||||
name = "pprof"
|
||||
version = "0.14.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ebbe2f8898beba44815fdc9e5a4ae9c929e21c5dc29b0c774a15555f7f58d6d0"
|
||||
dependencies = [
|
||||
"aligned-vec",
|
||||
"backtrace",
|
||||
"cfg-if",
|
||||
"criterion",
|
||||
"findshlibs",
|
||||
"inferno",
|
||||
"libc",
|
||||
"log",
|
||||
"nix 0.26.4",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"protobuf",
|
||||
"protobuf-codegen-pure",
|
||||
"smallvec",
|
||||
"symbolic-demangle",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ppv-lite86"
|
||||
version = "0.2.17"
|
||||
@@ -4260,6 +4378,31 @@ dependencies = [
|
||||
"prost",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf"
|
||||
version = "2.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
|
||||
|
||||
[[package]]
|
||||
name = "protobuf-codegen"
|
||||
version = "2.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "033460afb75cf755fcfc16dfaed20b86468082a2ea24e05ac35ab4a099a017d6"
|
||||
dependencies = [
|
||||
"protobuf",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf-codegen-pure"
|
||||
version = "2.28.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "95a29399fc94bcd3eeaa951c715f7bea69409b2445356b00519740bcd6ddd865"
|
||||
dependencies = [
|
||||
"protobuf",
|
||||
"protobuf-codegen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proxy"
|
||||
version = "0.1.0"
|
||||
@@ -4371,6 +4514,15 @@ dependencies = [
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.31.0"
|
||||
@@ -4853,6 +5005,15 @@ dependencies = [
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rgb"
|
||||
version = "0.8.50"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57397d16646700483b67d2dd6511d79318f9d057fdbd21a4066aeac8b41d310a"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.17.6"
|
||||
@@ -5166,6 +5327,7 @@ dependencies = [
|
||||
"postgres-protocol",
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
@@ -5712,6 +5874,12 @@ dependencies = [
|
||||
"der 0.7.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "stable_deref_trait"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
|
||||
|
||||
[[package]]
|
||||
name = "static_assertions"
|
||||
version = "1.1.0"
|
||||
@@ -5858,6 +6026,12 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "str_stack"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
|
||||
|
||||
[[package]]
|
||||
name = "stringprep"
|
||||
version = "0.1.2"
|
||||
@@ -5905,6 +6079,29 @@ version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "20e16a0f46cf5fd675563ef54f26e83e20f2366bcf027bcb3cc3ed2b98aaf2ca"
|
||||
|
||||
[[package]]
|
||||
name = "symbolic-common"
|
||||
version = "12.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "366f1b4c6baf6cfefc234bbd4899535fca0b06c74443039a73f6dfb2fad88d77"
|
||||
dependencies = [
|
||||
"debugid",
|
||||
"memmap2",
|
||||
"stable_deref_trait",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "symbolic-demangle"
|
||||
version = "12.12.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aba05ba5b9962ea5617baf556293720a8b2d0a282aa14ee4bf10e22efc7da8c8"
|
||||
dependencies = [
|
||||
"cpp_demangle",
|
||||
"rustc-demangle",
|
||||
"symbolic-common",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "syn"
|
||||
version = "1.0.109"
|
||||
@@ -6772,6 +6969,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"postgres_connection",
|
||||
"pprof",
|
||||
"pq_proto",
|
||||
"rand 0.8.5",
|
||||
"regex",
|
||||
@@ -7340,6 +7538,7 @@ dependencies = [
|
||||
"libc",
|
||||
"log",
|
||||
"memchr",
|
||||
"nix 0.26.4",
|
||||
"nom",
|
||||
"num-bigint",
|
||||
"num-integer",
|
||||
|
||||
@@ -130,6 +130,7 @@ parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
parquet_derive = "53"
|
||||
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
|
||||
pin-project-lite = "0.2"
|
||||
pprof = { version = "0.14", features = ["criterion", "flamegraph", "protobuf", "protobuf-codec"] }
|
||||
procfs = "0.16"
|
||||
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
|
||||
prost = "0.13"
|
||||
|
||||
@@ -29,6 +29,7 @@ jsonwebtoken.workspace = true
|
||||
nix.workspace = true
|
||||
once_cell.workspace = true
|
||||
pin-project-lite.workspace = true
|
||||
pprof.workspace = true
|
||||
regex.workspace = true
|
||||
routerify.workspace = true
|
||||
serde.workspace = true
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use crate::auth::{AuthError, Claims, SwappableJwtAuth};
|
||||
use crate::http::error::{api_error_handler, route_error_handler, ApiError};
|
||||
use anyhow::Context;
|
||||
use hyper::header::{HeaderName, AUTHORIZATION};
|
||||
use crate::http::request::{get_query_param, parse_query_param};
|
||||
use anyhow::{anyhow, Context};
|
||||
use hyper::header::{HeaderName, AUTHORIZATION, CONTENT_DISPOSITION};
|
||||
use hyper::http::HeaderValue;
|
||||
use hyper::Method;
|
||||
use hyper::{header::CONTENT_TYPE, Body, Request, Response};
|
||||
@@ -12,11 +13,13 @@ use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
|
||||
use std::future::Future;
|
||||
use std::io::Write as _;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use std::io::Write as _;
|
||||
use tokio::sync::mpsc;
|
||||
use pprof::protos::Message as _;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
@@ -328,6 +331,82 @@ pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
/// Generates CPU profiles.
|
||||
pub async fn profile_cpu_handler(req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
enum Format {
|
||||
Pprof,
|
||||
Svg,
|
||||
}
|
||||
|
||||
// Parameters.
|
||||
let format = match get_query_param(&req, "format")?.as_deref() {
|
||||
None => Format::Pprof,
|
||||
Some("pprof") => Format::Pprof,
|
||||
Some("svg") => Format::Svg,
|
||||
Some(format) => return Err(ApiError::BadRequest(anyhow!("invalid format {format}"))),
|
||||
};
|
||||
let seconds = match parse_query_param(&req, "seconds")? {
|
||||
None => 5,
|
||||
Some(seconds @ 1..=30) => seconds,
|
||||
Some(_) => return Err(ApiError::BadRequest(anyhow!("duration must be 1-30 secs"))),
|
||||
};
|
||||
let frequency_hz = match parse_query_param(&req, "frequency")? {
|
||||
None => 99,
|
||||
Some(1001..) => return Err(ApiError::BadRequest(anyhow!("frequency must be <=1000 Hz"))),
|
||||
Some(frequency) => frequency,
|
||||
};
|
||||
|
||||
// Only allow one profiler at a time.
|
||||
static PROFILE_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
|
||||
let _lock = PROFILE_LOCK
|
||||
.try_lock()
|
||||
.map_err(|_| ApiError::Conflict("profiler already running".into()))?;
|
||||
|
||||
// Take the profile.
|
||||
let report = tokio::task::spawn_blocking(move || {
|
||||
let guard = pprof::ProfilerGuardBuilder::default()
|
||||
.frequency(frequency_hz)
|
||||
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
|
||||
.build()?;
|
||||
std::thread::sleep(Duration::from_secs(seconds));
|
||||
guard.report().build()
|
||||
})
|
||||
.await
|
||||
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
|
||||
.map_err(|pprof_err| ApiError::InternalServerError(pprof_err.into()))?;
|
||||
|
||||
// Return the report in the requested format.
|
||||
match format {
|
||||
Format::Pprof => {
|
||||
let mut body = Vec::new();
|
||||
report
|
||||
.pprof()
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))?
|
||||
.write_to_vec(&mut body)
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))?;
|
||||
|
||||
Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, "application/octet-stream")
|
||||
.header(CONTENT_DISPOSITION, "attachment; filename=\"profile.pb\"")
|
||||
.body(Body::from(body))
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))
|
||||
}
|
||||
|
||||
Format::Svg => {
|
||||
let mut body = Vec::new();
|
||||
report
|
||||
.flamegraph(&mut body)
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))?;
|
||||
Response::builder()
|
||||
.status(200)
|
||||
.header(CONTENT_TYPE, "image/svg+xml")
|
||||
.body(Body::from(body))
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
|
||||
) -> Middleware<B, ApiError> {
|
||||
Middleware::pre(move |req| async move {
|
||||
|
||||
@@ -30,7 +30,7 @@ pub fn parse_request_param<T: FromStr>(
|
||||
}
|
||||
}
|
||||
|
||||
fn get_query_param<'a>(
|
||||
pub fn get_query_param<'a>(
|
||||
request: &'a Request<Body>,
|
||||
param_name: &str,
|
||||
) -> Result<Option<Cow<'a, str>>, ApiError> {
|
||||
|
||||
@@ -55,6 +55,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::auth::JwtAuth;
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
use utils::http::endpoint::profile_cpu_handler;
|
||||
use utils::http::endpoint::prometheus_metrics_handler;
|
||||
use utils::http::endpoint::request_span;
|
||||
use utils::http::request::must_parse_query_param;
|
||||
@@ -144,10 +145,16 @@ impl State {
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
secondary_controller: SecondaryController,
|
||||
) -> anyhow::Result<Self> {
|
||||
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
let allowlist_routes = [
|
||||
"/v1/status",
|
||||
"/v1/doc",
|
||||
"/swagger.yml",
|
||||
"/metrics",
|
||||
"/profile/cpu",
|
||||
]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
Ok(Self {
|
||||
conf,
|
||||
tenant_manager,
|
||||
@@ -3158,6 +3165,7 @@ pub fn make_router(
|
||||
Ok(router
|
||||
.data(state)
|
||||
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
|
||||
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
|
||||
.get("/v1/status", |r| api_handler(r, status_handler))
|
||||
.put("/v1/failpoints", |r| {
|
||||
testing_api_handler("manage failpoints", r, failpoints_handler)
|
||||
|
||||
@@ -653,6 +653,35 @@ pub(crate) static COMPRESSION_IMAGE_OUTPUT_BYTES: Lazy<IntCounter> = Lazy::new(|
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_relsize_cache_entries",
|
||||
"Number of entries in the relation size cache",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_HITS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!("pageserver_relsize_cache_hits", "Relation size cache hits",)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_MISSES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_cache_misses",
|
||||
"Relation size cache misses",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static RELSIZE_CACHE_MISSES_UPDATED: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_relsize_cache_misses_updated",
|
||||
"Relation size cache misses where relation update LSN is after lookup LSN",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) mod initial_logical_size {
|
||||
use metrics::{register_int_counter, register_int_counter_vec, IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
@@ -10,6 +10,9 @@ use super::tenant::{PageReconstructError, Timeline};
|
||||
use crate::aux_file;
|
||||
use crate::context::RequestContext;
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::metrics::{
|
||||
RELSIZE_CACHE_ENTRIES, RELSIZE_CACHE_HITS, RELSIZE_CACHE_MISSES, RELSIZE_CACHE_MISSES_UPDATED,
|
||||
};
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
@@ -955,9 +958,12 @@ impl Timeline {
|
||||
let rel_size_cache = self.rel_size_cache.read().unwrap();
|
||||
if let Some((cached_lsn, nblocks)) = rel_size_cache.map.get(tag) {
|
||||
if lsn >= *cached_lsn {
|
||||
RELSIZE_CACHE_HITS.inc();
|
||||
return Some(*nblocks);
|
||||
}
|
||||
RELSIZE_CACHE_MISSES_UPDATED.inc();
|
||||
}
|
||||
RELSIZE_CACHE_MISSES.inc();
|
||||
None
|
||||
}
|
||||
|
||||
@@ -982,6 +988,7 @@ impl Timeline {
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert((lsn, nblocks));
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -989,13 +996,17 @@ impl Timeline {
|
||||
/// Store cached relation size
|
||||
pub fn set_cached_rel_size(&self, tag: RelTag, lsn: Lsn, nblocks: BlockNumber) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
rel_size_cache.map.insert(tag, (lsn, nblocks));
|
||||
if rel_size_cache.map.insert(tag, (lsn, nblocks)).is_none() {
|
||||
RELSIZE_CACHE_ENTRIES.inc();
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove cached relation size
|
||||
pub fn remove_cached_rel_size(&self, tag: &RelTag) {
|
||||
let mut rel_size_cache = self.rel_size_cache.write().unwrap();
|
||||
rel_size_cache.map.remove(tag);
|
||||
if rel_size_cache.map.remove(tag).is_some() {
|
||||
RELSIZE_CACHE_ENTRIES.dec();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1011,7 +1022,7 @@ pub struct DatadirModification<'a> {
|
||||
pub tline: &'a Timeline,
|
||||
|
||||
/// Current LSN of the modification
|
||||
lsn: Lsn,
|
||||
pub lsn: Lsn,
|
||||
|
||||
// The modifications are not applied directly to the underlying key-value store.
|
||||
// The put-functions add the modifications here, and they are flushed to the
|
||||
|
||||
@@ -327,6 +327,25 @@ impl WalIngest {
|
||||
let mut new_vm_blk = new_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
||||
let mut old_vm_blk = old_heap_blkno.map(pg_constants::HEAPBLK_TO_MAPBLOCK);
|
||||
|
||||
// Only apply the writes if this shard owns either of the VM pages. These WAL records are
|
||||
// broadcast to all shards, but should only be applied on the relevant ones. See:
|
||||
// https://github.com/neondatabase/neon/issues/9855
|
||||
//
|
||||
// TODO: we should differentiate on old_vm_blk and new_vm_blk since we may only own one of
|
||||
// them, but this is no worse than the old behavior where we applied all records, and will
|
||||
// avoid the performance penalty of the relsize cache miss and DbDir deserialization.
|
||||
//
|
||||
// TODO: consider filtering this during decoding, and avoid the broadcast.
|
||||
let old_is_local = old_vm_blk
|
||||
.map(|blk| self.shard.is_key_local(&rel_block_to_key(vm_rel, blk)))
|
||||
.unwrap_or_default();
|
||||
let new_is_local = new_vm_blk
|
||||
.map(|blk| self.shard.is_key_local(&rel_block_to_key(vm_rel, blk)))
|
||||
.unwrap_or_default();
|
||||
if !old_is_local && !new_is_local {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Sometimes, Postgres seems to create heap WAL records with the
|
||||
// ALL_VISIBLE_CLEARED flag set, even though the bit in the VM page is
|
||||
// not set. In fact, it's possible that the VM page does not exist at all.
|
||||
@@ -334,7 +353,11 @@ impl WalIngest {
|
||||
// replaying it would fail to find the previous image of the page, because
|
||||
// it doesn't exist. So check if the VM page(s) exist, and skip the WAL
|
||||
// record if it doesn't.
|
||||
let vm_size = get_relsize(modification, vm_rel, ctx).await?;
|
||||
let (vm_size, vm_exists) = get_relsize(modification, vm_rel, ctx).await?;
|
||||
if vm_size == 0 {
|
||||
let lsn = modification.lsn;
|
||||
log::info!("ingest_clear_vm_bits: vm_rel {vm_rel} has size={vm_size} exists={vm_exists} at LSN {lsn}, flags={flags} old={old_heap_blkno:?} new={new_heap_blkno:?}");
|
||||
}
|
||||
if let Some(blknum) = new_vm_blk {
|
||||
if blknum >= vm_size {
|
||||
new_vm_blk = None;
|
||||
@@ -572,7 +595,7 @@ impl WalIngest {
|
||||
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
|
||||
fsm_physical_page_no += 1;
|
||||
}
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
let (nblocks, _) = get_relsize(modification, rel, ctx).await?;
|
||||
if nblocks > fsm_physical_page_no {
|
||||
// check if something to do: FSM is larger than truncate position
|
||||
self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
|
||||
@@ -612,7 +635,7 @@ impl WalIngest {
|
||||
)?;
|
||||
vm_page_no += 1;
|
||||
}
|
||||
let nblocks = get_relsize(modification, rel, ctx).await?;
|
||||
let (nblocks, _) = get_relsize(modification, rel, ctx).await?;
|
||||
if nblocks > vm_page_no {
|
||||
// check if something to do: VM is larger than truncate position
|
||||
self.put_rel_truncation(modification, rel, vm_page_no, ctx)
|
||||
@@ -1434,20 +1457,22 @@ async fn get_relsize(
|
||||
modification: &DatadirModification<'_>,
|
||||
rel: RelTag,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
let nblocks = if !modification
|
||||
) -> Result<(BlockNumber, bool), PageReconstructError> {
|
||||
if !modification
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(modification), ctx)
|
||||
.await?
|
||||
{
|
||||
0
|
||||
Ok((0, false))
|
||||
} else {
|
||||
modification
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(modification), ctx)
|
||||
.await?
|
||||
};
|
||||
Ok(nblocks)
|
||||
Ok((
|
||||
modification
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(modification), ctx)
|
||||
.await?,
|
||||
true,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::bool_assert_comparison)]
|
||||
|
||||
@@ -30,6 +30,7 @@ once_cell.workspace = true
|
||||
parking_lot.workspace = true
|
||||
postgres.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
pprof.workspace = true
|
||||
rand.workspace = true
|
||||
regex.workspace = true
|
||||
scopeguard.workspace = true
|
||||
|
||||
@@ -14,6 +14,10 @@ cargo bench --package safekeeper --bench receive_wal process_msg/fsync=false
|
||||
|
||||
# List available benchmarks.
|
||||
cargo bench --package safekeeper --benches -- --list
|
||||
|
||||
# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds.
|
||||
# Output in target/criterion/*/profile/flamegraph.svg.
|
||||
cargo bench --package safekeeper --bench receive_wal process_msg/fsync=false --profile-time 10
|
||||
```
|
||||
|
||||
Additional charts and statistics are available in `target/criterion/report/index.html`.
|
||||
|
||||
@@ -10,6 +10,7 @@ use camino_tempfile::tempfile;
|
||||
use criterion::{criterion_group, criterion_main, BatchSize, Bencher, Criterion};
|
||||
use itertools::Itertools as _;
|
||||
use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator};
|
||||
use pprof::criterion::{Output, PProfProfiler};
|
||||
use safekeeper::receive_wal::{self, WalAcceptor};
|
||||
use safekeeper::safekeeper::{
|
||||
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
|
||||
@@ -24,8 +25,9 @@ const GB: usize = 1024 * MB;
|
||||
|
||||
// Register benchmarks with Criterion.
|
||||
criterion_group!(
|
||||
benches,
|
||||
bench_process_msg,
|
||||
name = benches;
|
||||
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
|
||||
targets = bench_process_msg,
|
||||
bench_wal_acceptor,
|
||||
bench_wal_acceptor_throughput,
|
||||
bench_file_write
|
||||
|
||||
@@ -14,7 +14,9 @@ use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, Instrument};
|
||||
use utils::failpoint_support::failpoints_handler;
|
||||
use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter};
|
||||
use utils::http::endpoint::{
|
||||
profile_cpu_handler, prometheus_metrics_handler, request_span, ChannelWriter,
|
||||
};
|
||||
use utils::http::request::parse_query_param;
|
||||
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
@@ -574,7 +576,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
router = router.middleware(auth_middleware(|request| {
|
||||
#[allow(clippy::mutable_key_type)]
|
||||
static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
|
||||
["/v1/status", "/metrics"]
|
||||
["/v1/status", "/metrics", "/pprof/profile"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect()
|
||||
@@ -598,6 +600,7 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
|
||||
.data(Arc::new(conf))
|
||||
.data(auth)
|
||||
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
|
||||
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
|
||||
.get("/v1/status", |r| request_span(r, status_handler))
|
||||
.put("/v1/failpoints", |r| {
|
||||
request_span(r, move |r| async {
|
||||
|
||||
@@ -52,6 +52,7 @@ lazy_static = { version = "1", default-features = false, features = ["spin_no_st
|
||||
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
memchr = { version = "2" }
|
||||
nix = { version = "0.26" }
|
||||
nom = { version = "7" }
|
||||
num-bigint = { version = "0.4" }
|
||||
num-integer = { version = "0.1", features = ["i128"] }
|
||||
|
||||
Reference in New Issue
Block a user