mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-27 07:10:37 +00:00
Compare commits
7 Commits
proxy-conf
...
jemalloc-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d2709f4a1 | ||
|
|
6e2c04bc48 | ||
|
|
76ae735a24 | ||
|
|
7be445f627 | ||
|
|
35e9fb360b | ||
|
|
0d21187322 | ||
|
|
e8a98adcd0 |
6
.github/workflows/build_and_test.yml
vendored
6
.github/workflows/build_and_test.yml
vendored
@@ -735,7 +735,7 @@ jobs:
|
||||
run: |
|
||||
mkdir -p .docker-custom
|
||||
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
|
||||
- uses: docker/setup-buildx-action@v3
|
||||
- uses: docker/setup-buildx-action@v2
|
||||
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
@@ -792,7 +792,7 @@ jobs:
|
||||
run: |
|
||||
mkdir -p .docker-custom
|
||||
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
|
||||
- uses: docker/setup-buildx-action@v3
|
||||
- uses: docker/setup-buildx-action@v2
|
||||
with:
|
||||
# Disable parallelism for docker buildkit.
|
||||
# As we already build everything with `make -j$(nproc)`, running it in additional level of parallelisam blows up the Runner.
|
||||
@@ -865,7 +865,7 @@ jobs:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
env:
|
||||
VM_BUILDER_VERSION: v0.23.2
|
||||
VM_BUILDER_VERSION: v0.28.1
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
||||
34
Cargo.lock
generated
34
Cargo.lock
generated
@@ -599,7 +599,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"pin-project-lite",
|
||||
"pin-utils",
|
||||
"rustls 0.21.9",
|
||||
"rustls 0.21.11",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
@@ -2519,7 +2519,7 @@ dependencies = [
|
||||
"http 0.2.9",
|
||||
"hyper 0.14.26",
|
||||
"log",
|
||||
"rustls 0.21.9",
|
||||
"rustls 0.21.11",
|
||||
"rustls-native-certs 0.6.2",
|
||||
"tokio",
|
||||
"tokio-rustls 0.24.0",
|
||||
@@ -4059,7 +4059,7 @@ dependencies = [
|
||||
"futures",
|
||||
"once_cell",
|
||||
"pq_proto",
|
||||
"rustls 0.22.2",
|
||||
"rustls 0.22.4",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"serde",
|
||||
"thiserror",
|
||||
@@ -4314,7 +4314,6 @@ dependencies = [
|
||||
"http 1.1.0",
|
||||
"http-body-util",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"hyper 0.14.26",
|
||||
"hyper 1.2.0",
|
||||
"hyper-tungstenite",
|
||||
@@ -4351,12 +4350,11 @@ dependencies = [
|
||||
"routerify",
|
||||
"rstest",
|
||||
"rustc-hash",
|
||||
"rustls 0.22.2",
|
||||
"rustls 0.22.4",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"sha2",
|
||||
"smallvec",
|
||||
"smol_str",
|
||||
@@ -4544,7 +4542,7 @@ dependencies = [
|
||||
"itoa",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustls 0.22.2",
|
||||
"rustls 0.22.4",
|
||||
"rustls-native-certs 0.7.0",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"rustls-pki-types",
|
||||
@@ -4698,7 +4696,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustls 0.21.9",
|
||||
"rustls 0.21.11",
|
||||
"rustls-pemfile 1.0.2",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -4958,9 +4956,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.21.9"
|
||||
version = "0.21.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9"
|
||||
checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4"
|
||||
dependencies = [
|
||||
"log",
|
||||
"ring 0.17.6",
|
||||
@@ -4970,9 +4968,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.22.2"
|
||||
version = "0.22.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41"
|
||||
checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432"
|
||||
dependencies = [
|
||||
"log",
|
||||
"ring 0.17.6",
|
||||
@@ -5284,7 +5282,7 @@ checksum = "2e95efd0cefa32028cdb9766c96de71d96671072f9fb494dc9fb84c0ef93e52b"
|
||||
dependencies = [
|
||||
"httpdate",
|
||||
"reqwest",
|
||||
"rustls 0.21.9",
|
||||
"rustls 0.21.11",
|
||||
"sentry-backtrace",
|
||||
"sentry-contexts",
|
||||
"sentry-core",
|
||||
@@ -6195,7 +6193,7 @@ checksum = "0ea13f22eda7127c827983bdaf0d7fff9df21c8817bab02815ac277a21143677"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"ring 0.17.6",
|
||||
"rustls 0.22.2",
|
||||
"rustls 0.22.4",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-rustls 0.25.0",
|
||||
@@ -6208,7 +6206,7 @@ version = "0.24.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5"
|
||||
dependencies = [
|
||||
"rustls 0.21.9",
|
||||
"rustls 0.21.11",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@@ -6218,7 +6216,7 @@ version = "0.25.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"
|
||||
dependencies = [
|
||||
"rustls 0.22.2",
|
||||
"rustls 0.22.4",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
]
|
||||
@@ -6679,7 +6677,7 @@ dependencies = [
|
||||
"base64 0.21.1",
|
||||
"log",
|
||||
"once_cell",
|
||||
"rustls 0.21.9",
|
||||
"rustls 0.21.11",
|
||||
"rustls-webpki 0.100.2",
|
||||
"url",
|
||||
"webpki-roots 0.23.1",
|
||||
@@ -7356,7 +7354,7 @@ dependencies = [
|
||||
"regex-automata 0.4.3",
|
||||
"regex-syntax 0.8.2",
|
||||
"reqwest",
|
||||
"rustls 0.21.9",
|
||||
"rustls 0.21.11",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
||||
@@ -252,7 +252,7 @@ debug = true
|
||||
|
||||
# disable debug symbols for all packages except this one to decrease binaries size
|
||||
[profile.release.package."*"]
|
||||
debug = false
|
||||
debug = true
|
||||
|
||||
[profile.release-line-debug]
|
||||
inherits = "release"
|
||||
|
||||
@@ -44,6 +44,7 @@ COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_i
|
||||
COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server
|
||||
COPY --chown=nonroot . .
|
||||
|
||||
ENV _RJEM_MALLOC_CONF="prof:true"
|
||||
# Show build caching stats to check if it was used in the end.
|
||||
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
|
||||
RUN set -e \
|
||||
|
||||
@@ -21,6 +21,7 @@ base64.workspace = true
|
||||
bstr.workspace = true
|
||||
bytes = { workspace = true, features = ["serde"] }
|
||||
camino.workspace = true
|
||||
camino-tempfile.workspace = true
|
||||
chrono.workspace = true
|
||||
clap.workspace = true
|
||||
consumption_metrics.workspace = true
|
||||
@@ -35,7 +36,6 @@ hmac.workspace = true
|
||||
hostname.workspace = true
|
||||
http.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
hyper-tungstenite.workspace = true
|
||||
hyper.workspace = true
|
||||
hyper1 = { package = "hyper", version = "1.2", features = ["server"] }
|
||||
@@ -71,7 +71,6 @@ rustls.workspace = true
|
||||
scopeguard.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde_with.workspace = true
|
||||
sha2 = { workspace = true, features = ["asm"] }
|
||||
smol_str.workspace = true
|
||||
smallvec.workspace = true
|
||||
@@ -80,7 +79,7 @@ subtle.workspace = true
|
||||
sync_wrapper.workspace = true
|
||||
task-local-extensions.workspace = true
|
||||
thiserror.workspace = true
|
||||
tikv-jemallocator.workspace = true
|
||||
tikv-jemallocator = { workspace = true, features = ["profiling"] }
|
||||
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
|
||||
tokio-postgres.workspace = true
|
||||
tokio-rustls.workspace = true
|
||||
@@ -104,7 +103,6 @@ redis.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
camino-tempfile.workspace = true
|
||||
fallible-iterator.workspace = true
|
||||
rcgen.workspace = true
|
||||
rstest.workspace = true
|
||||
|
||||
@@ -3,19 +3,13 @@ use crate::{
|
||||
rate_limiter::RateBucketInfo,
|
||||
serverless::GlobalConnPoolOptions,
|
||||
};
|
||||
use anyhow::{ensure, Context};
|
||||
use humantime::parse_duration;
|
||||
use anyhow::{bail, ensure, Context, Ok};
|
||||
use itertools::Itertools;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use rustls::{
|
||||
crypto::ring::sign,
|
||||
pki_types::{CertificateDer, PrivateKeyDer},
|
||||
};
|
||||
use serde::{
|
||||
de::{value::BorrowedStrDeserializer, MapAccess},
|
||||
forward_to_deserialize_any, Deserialize, Deserializer,
|
||||
};
|
||||
use serde_with::serde_as;
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
@@ -343,88 +337,45 @@ impl EndpointCacheConfig {
|
||||
/// Notice that by default the limiter is empty, which means that cache is disabled.
|
||||
pub const CACHE_DEFAULT_OPTIONS: &'static str =
|
||||
"initial_batch_size=1000,default_batch_size=10,xread_timeout=5m,stream_name=controlPlane,disable_cache=true,limiter_info=1000@1s,retry_interval=1s";
|
||||
}
|
||||
|
||||
impl<'de> serde::Deserialize<'de> for EndpointCacheConfig {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
struct Visitor;
|
||||
impl<'de> serde::de::Visitor<'de> for Visitor {
|
||||
type Value = EndpointCacheConfig;
|
||||
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
f.write_str("struct EndpointCacheConfig")
|
||||
}
|
||||
/// Parse cache options passed via cmdline.
|
||||
/// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
|
||||
fn parse(options: &str) -> anyhow::Result<Self> {
|
||||
let mut initial_batch_size = None;
|
||||
let mut default_batch_size = None;
|
||||
let mut xread_timeout = None;
|
||||
let mut stream_name = None;
|
||||
let mut limiter_info = vec![];
|
||||
let mut disable_cache = false;
|
||||
let mut retry_interval = None;
|
||||
|
||||
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
|
||||
where
|
||||
A: serde::de::MapAccess<'de>,
|
||||
{
|
||||
fn e<E: serde::de::Error, T: std::fmt::Display>(t: T) -> E {
|
||||
E::custom(t)
|
||||
}
|
||||
for option in options.split(',') {
|
||||
let (key, value) = option
|
||||
.split_once('=')
|
||||
.with_context(|| format!("bad key-value pair: {option}"))?;
|
||||
|
||||
let mut initial_batch_size: Option<usize> = None;
|
||||
let mut default_batch_size: Option<usize> = None;
|
||||
let mut xread_timeout: Option<Duration> = None;
|
||||
let mut stream_name: Option<String> = None;
|
||||
let mut limiter_info: Vec<RateBucketInfo> = vec![];
|
||||
let mut disable_cache: bool = false;
|
||||
let mut retry_interval: Option<Duration> = None;
|
||||
while let Some((k, v)) = map.next_entry::<&str, &str>()? {
|
||||
match k {
|
||||
"initial_batch_size" => initial_batch_size = Some(v.parse().map_err(e)?),
|
||||
"default_batch_size" => default_batch_size = Some(v.parse().map_err(e)?),
|
||||
"xread_timeout" => {
|
||||
xread_timeout = Some(parse_duration(v).map_err(e)?);
|
||||
}
|
||||
"stream_name" => stream_name = Some(v.to_owned()),
|
||||
"limiter_info" => limiter_info.push(v.parse().map_err(e)?),
|
||||
"disable_cache" => disable_cache = v.parse().map_err(e)?,
|
||||
"retry_interval" => retry_interval = Some(parse_duration(v).map_err(e)?),
|
||||
x => {
|
||||
return Err(serde::de::Error::unknown_field(
|
||||
x,
|
||||
&[
|
||||
"initial_batch_size",
|
||||
"default_batch_size",
|
||||
"xread_timeout",
|
||||
"stream_name",
|
||||
"limiter_info",
|
||||
"disable_cache",
|
||||
"retry_interval",
|
||||
],
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let initial_batch_size = initial_batch_size
|
||||
.ok_or_else(|| serde::de::Error::missing_field("initial_batch_size"))?;
|
||||
let default_batch_size = default_batch_size
|
||||
.ok_or_else(|| serde::de::Error::missing_field("default_batch_size"))?;
|
||||
let xread_timeout = xread_timeout
|
||||
.ok_or_else(|| serde::de::Error::missing_field("xread_timeout"))?;
|
||||
let stream_name =
|
||||
stream_name.ok_or_else(|| serde::de::Error::missing_field("stream_name"))?;
|
||||
let retry_interval = retry_interval
|
||||
.ok_or_else(|| serde::de::Error::missing_field("retry_interval"))?;
|
||||
|
||||
RateBucketInfo::validate(&mut limiter_info).map_err(e)?;
|
||||
|
||||
Ok(EndpointCacheConfig {
|
||||
initial_batch_size,
|
||||
default_batch_size,
|
||||
xread_timeout,
|
||||
stream_name,
|
||||
limiter_info,
|
||||
disable_cache,
|
||||
retry_interval,
|
||||
})
|
||||
match key {
|
||||
"initial_batch_size" => initial_batch_size = Some(value.parse()?),
|
||||
"default_batch_size" => default_batch_size = Some(value.parse()?),
|
||||
"xread_timeout" => xread_timeout = Some(humantime::parse_duration(value)?),
|
||||
"stream_name" => stream_name = Some(value.to_string()),
|
||||
"limiter_info" => limiter_info.push(RateBucketInfo::from_str(value)?),
|
||||
"disable_cache" => disable_cache = value.parse()?,
|
||||
"retry_interval" => retry_interval = Some(humantime::parse_duration(value)?),
|
||||
unknown => bail!("unknown key: {unknown}"),
|
||||
}
|
||||
}
|
||||
serde::Deserializer::deserialize_map(deserializer, Visitor)
|
||||
RateBucketInfo::validate(&mut limiter_info)?;
|
||||
|
||||
Ok(Self {
|
||||
initial_batch_size: initial_batch_size.context("missing `initial_batch_size`")?,
|
||||
default_batch_size: default_batch_size.context("missing `default_batch_size`")?,
|
||||
xread_timeout: xread_timeout.context("missing `xread_timeout`")?,
|
||||
stream_name: stream_name.context("missing `stream_name`")?,
|
||||
disable_cache,
|
||||
limiter_info,
|
||||
retry_interval: retry_interval.context("missing `retry_interval`")?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -433,7 +384,7 @@ impl FromStr for EndpointCacheConfig {
|
||||
|
||||
fn from_str(options: &str) -> Result<Self, Self::Err> {
|
||||
let error = || format!("failed to parse endpoint cache options '{options}'");
|
||||
Self::deserialize(SimpleKVConfig(options)).with_context(error)
|
||||
Self::parse(options).with_context(error)
|
||||
}
|
||||
}
|
||||
#[derive(Debug)]
|
||||
@@ -452,15 +403,11 @@ pub fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfi
|
||||
}
|
||||
|
||||
/// Helper for cmdline cache options parsing.
|
||||
#[serde_as]
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug)]
|
||||
pub struct CacheOptions {
|
||||
/// Max number of entries.
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub size: usize,
|
||||
/// Entry's time-to-live.
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub ttl: Duration,
|
||||
}
|
||||
|
||||
@@ -471,7 +418,30 @@ impl CacheOptions {
|
||||
/// Parse cache options passed via cmdline.
|
||||
/// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
|
||||
fn parse(options: &str) -> anyhow::Result<Self> {
|
||||
Ok(Self::deserialize(SimpleKVConfig(options))?)
|
||||
let mut size = None;
|
||||
let mut ttl = None;
|
||||
|
||||
for option in options.split(',') {
|
||||
let (key, value) = option
|
||||
.split_once('=')
|
||||
.with_context(|| format!("bad key-value pair: {option}"))?;
|
||||
|
||||
match key {
|
||||
"size" => size = Some(value.parse()?),
|
||||
"ttl" => ttl = Some(humantime::parse_duration(value)?),
|
||||
unknown => bail!("unknown key: {unknown}"),
|
||||
}
|
||||
}
|
||||
|
||||
// TTL doesn't matter if cache is always empty.
|
||||
if let Some(0) = size {
|
||||
ttl.get_or_insert(Duration::default());
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
size: size.context("missing `size`")?,
|
||||
ttl: ttl.context("missing `ttl`")?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -485,21 +455,15 @@ impl FromStr for CacheOptions {
|
||||
}
|
||||
|
||||
/// Helper for cmdline cache options parsing.
|
||||
#[serde_as]
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[derive(Debug)]
|
||||
pub struct ProjectInfoCacheOptions {
|
||||
/// Max number of entries.
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub size: usize,
|
||||
/// Entry's time-to-live.
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub ttl: Duration,
|
||||
/// Max number of roles per endpoint.
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub max_roles: usize,
|
||||
/// Gc interval.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub gc_interval: Duration,
|
||||
}
|
||||
|
||||
@@ -511,7 +475,36 @@ impl ProjectInfoCacheOptions {
|
||||
/// Parse cache options passed via cmdline.
|
||||
/// Example: [`Self::CACHE_DEFAULT_OPTIONS`].
|
||||
fn parse(options: &str) -> anyhow::Result<Self> {
|
||||
Ok(Self::deserialize(SimpleKVConfig(options))?)
|
||||
let mut size = None;
|
||||
let mut ttl = None;
|
||||
let mut max_roles = None;
|
||||
let mut gc_interval = None;
|
||||
|
||||
for option in options.split(',') {
|
||||
let (key, value) = option
|
||||
.split_once('=')
|
||||
.with_context(|| format!("bad key-value pair: {option}"))?;
|
||||
|
||||
match key {
|
||||
"size" => size = Some(value.parse()?),
|
||||
"ttl" => ttl = Some(humantime::parse_duration(value)?),
|
||||
"max_roles" => max_roles = Some(value.parse()?),
|
||||
"gc_interval" => gc_interval = Some(humantime::parse_duration(value)?),
|
||||
unknown => bail!("unknown key: {unknown}"),
|
||||
}
|
||||
}
|
||||
|
||||
// TTL doesn't matter if cache is always empty.
|
||||
if let Some(0) = size {
|
||||
ttl.get_or_insert(Duration::default());
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
size: size.context("missing `size`")?,
|
||||
ttl: ttl.context("missing `ttl`")?,
|
||||
max_roles: max_roles.context("missing `max_roles`")?,
|
||||
gc_interval: gc_interval.context("missing `gc_interval`")?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -525,23 +518,14 @@ impl FromStr for ProjectInfoCacheOptions {
|
||||
}
|
||||
|
||||
/// Helper for cmdline cache options parsing.
|
||||
#[serde_as]
|
||||
#[derive(Deserialize)]
|
||||
pub struct WakeComputeLockOptions {
|
||||
/// The number of shards the lock map should have
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
#[serde(default)]
|
||||
pub shards: usize,
|
||||
/// The number of allowed concurrent requests for each endpoitn
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pub permits: usize,
|
||||
/// Garbage collection epoch
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub epoch: Duration,
|
||||
/// Lock timeout
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
@@ -554,23 +538,45 @@ impl WakeComputeLockOptions {
|
||||
/// Parse lock options passed via cmdline.
|
||||
/// Example: [`Self::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK`].
|
||||
fn parse(options: &str) -> anyhow::Result<Self> {
|
||||
let out = Self::deserialize(SimpleKVConfig(options))?;
|
||||
if out.permits != 0 {
|
||||
ensure!(
|
||||
out.timeout > Duration::ZERO,
|
||||
"wake compute lock timeout should be non-zero"
|
||||
);
|
||||
ensure!(
|
||||
out.epoch > Duration::ZERO,
|
||||
"wake compute lock gc epoch should be non-zero"
|
||||
);
|
||||
ensure!(out.shards > 1, "shard count must be > 1");
|
||||
ensure!(
|
||||
out.shards.is_power_of_two(),
|
||||
"shard count must be a power of two"
|
||||
);
|
||||
let mut shards = None;
|
||||
let mut permits = None;
|
||||
let mut epoch = None;
|
||||
let mut timeout = None;
|
||||
|
||||
for option in options.split(',') {
|
||||
let (key, value) = option
|
||||
.split_once('=')
|
||||
.with_context(|| format!("bad key-value pair: {option}"))?;
|
||||
|
||||
match key {
|
||||
"shards" => shards = Some(value.parse()?),
|
||||
"permits" => permits = Some(value.parse()?),
|
||||
"epoch" => epoch = Some(humantime::parse_duration(value)?),
|
||||
"timeout" => timeout = Some(humantime::parse_duration(value)?),
|
||||
unknown => bail!("unknown key: {unknown}"),
|
||||
}
|
||||
}
|
||||
|
||||
// these dont matter if lock is disabled
|
||||
if let Some(0) = permits {
|
||||
timeout = Some(Duration::default());
|
||||
epoch = Some(Duration::default());
|
||||
shards = Some(2);
|
||||
}
|
||||
|
||||
let out = Self {
|
||||
shards: shards.context("missing `shards`")?,
|
||||
permits: permits.context("missing `permits`")?,
|
||||
epoch: epoch.context("missing `epoch`")?,
|
||||
timeout: timeout.context("missing `timeout`")?,
|
||||
};
|
||||
|
||||
ensure!(out.shards > 1, "shard count must be > 1");
|
||||
ensure!(
|
||||
out.shards.is_power_of_two(),
|
||||
"shard count must be a power of two"
|
||||
);
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
}
|
||||
@@ -584,100 +590,6 @@ impl FromStr for WakeComputeLockOptions {
|
||||
}
|
||||
}
|
||||
|
||||
struct SimpleKVConfig<'a>(&'a str);
|
||||
struct SimpleKVConfigMapAccess<'a> {
|
||||
kv: std::str::Split<'a, char>,
|
||||
val: Option<&'a str>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SimpleKVConfigErr(String);
|
||||
|
||||
impl std::fmt::Display for SimpleKVConfigErr {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for SimpleKVConfigErr {}
|
||||
|
||||
impl serde::de::Error for SimpleKVConfigErr {
|
||||
fn custom<T>(msg: T) -> Self
|
||||
where
|
||||
T: std::fmt::Display,
|
||||
{
|
||||
Self(msg.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> MapAccess<'de> for SimpleKVConfigMapAccess<'de> {
|
||||
type Error = SimpleKVConfigErr;
|
||||
|
||||
fn next_key_seed<K>(&mut self, seed: K) -> Result<Option<K::Value>, Self::Error>
|
||||
where
|
||||
K: serde::de::DeserializeSeed<'de>,
|
||||
{
|
||||
let Some(kv) = self.kv.next() else {
|
||||
return Ok(None);
|
||||
};
|
||||
let (key, value) = kv
|
||||
.split_once('=')
|
||||
.ok_or_else(|| SimpleKVConfigErr("invalid kv pair".to_string()))?;
|
||||
self.val = Some(value);
|
||||
|
||||
seed.deserialize(BorrowedStrDeserializer::new(key))
|
||||
.map(Some)
|
||||
}
|
||||
|
||||
fn next_value_seed<V>(&mut self, seed: V) -> Result<V::Value, Self::Error>
|
||||
where
|
||||
V: serde::de::DeserializeSeed<'de>,
|
||||
{
|
||||
seed.deserialize(BorrowedStrDeserializer::new(self.val.take().unwrap()))
|
||||
}
|
||||
|
||||
fn next_entry_seed<K, V>(
|
||||
&mut self,
|
||||
kseed: K,
|
||||
vseed: V,
|
||||
) -> Result<Option<(K::Value, V::Value)>, Self::Error>
|
||||
where
|
||||
K: serde::de::DeserializeSeed<'de>,
|
||||
V: serde::de::DeserializeSeed<'de>,
|
||||
{
|
||||
let Some(kv) = self.kv.next() else {
|
||||
return Ok(None);
|
||||
};
|
||||
let (key, value) = kv
|
||||
.split_once('=')
|
||||
.ok_or_else(|| SimpleKVConfigErr("invalid kv pair".to_string()))?;
|
||||
|
||||
let key = kseed.deserialize(BorrowedStrDeserializer::new(key))?;
|
||||
let value = vseed.deserialize(BorrowedStrDeserializer::new(value))?;
|
||||
Ok(Some((key, value)))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserializer<'de> for SimpleKVConfig<'de> {
|
||||
type Error = SimpleKVConfigErr;
|
||||
|
||||
fn deserialize_any<V>(self, visitor: V) -> Result<V::Value, Self::Error>
|
||||
where
|
||||
V: serde::de::Visitor<'de>,
|
||||
{
|
||||
visitor.visit_map(SimpleKVConfigMapAccess {
|
||||
kv: self.0.split(','),
|
||||
val: None,
|
||||
})
|
||||
}
|
||||
|
||||
forward_to_deserialize_any! {
|
||||
bool i8 i16 i32 i64 i128 u8 u16 u32 u64 u128 f32 f64 char str string
|
||||
bytes byte_buf option unit struct unit_struct newtype_struct seq tuple
|
||||
tuple_struct map enum identifier ignored_any
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -735,7 +647,7 @@ mod tests {
|
||||
} = "permits=0".parse()?;
|
||||
assert_eq!(epoch, Duration::ZERO);
|
||||
assert_eq!(timeout, Duration::ZERO);
|
||||
assert_eq!(shards, 0);
|
||||
assert_eq!(shards, 2);
|
||||
assert_eq!(permits, 0);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -17,7 +17,7 @@ use crate::{
|
||||
scram, EndpointCacheKey,
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
use tokio::time::Instant;
|
||||
use tracing::info;
|
||||
@@ -449,17 +449,13 @@ impl ApiCaches {
|
||||
/// Various caches for [`console`](super).
|
||||
pub struct ApiLocks {
|
||||
name: &'static str,
|
||||
inner: Option<ApiLocksInner>,
|
||||
node_locks: DashMap<EndpointCacheKey, Arc<Semaphore>>,
|
||||
permits: usize,
|
||||
timeout: Duration,
|
||||
epoch: std::time::Duration,
|
||||
metrics: &'static ApiLockMetrics,
|
||||
}
|
||||
|
||||
struct ApiLocksInner {
|
||||
permits: NonZeroUsize,
|
||||
node_locks: DashMap<EndpointCacheKey, Arc<Semaphore>>,
|
||||
}
|
||||
|
||||
impl ApiLocks {
|
||||
pub fn new(
|
||||
name: &'static str,
|
||||
@@ -469,14 +465,10 @@ impl ApiLocks {
|
||||
epoch: std::time::Duration,
|
||||
metrics: &'static ApiLockMetrics,
|
||||
) -> prometheus::Result<Self> {
|
||||
let inner = NonZeroUsize::new(permits).map(|permits| ApiLocksInner {
|
||||
permits,
|
||||
node_locks: DashMap::with_shard_amount(shards),
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
name,
|
||||
inner,
|
||||
node_locks: DashMap::with_shard_amount(shards),
|
||||
permits,
|
||||
timeout,
|
||||
epoch,
|
||||
metrics,
|
||||
@@ -487,21 +479,20 @@ impl ApiLocks {
|
||||
&self,
|
||||
key: &EndpointCacheKey,
|
||||
) -> Result<WakeComputePermit, errors::WakeComputeError> {
|
||||
let Some(inner) = &self.inner else {
|
||||
if self.permits == 0 {
|
||||
return Ok(WakeComputePermit { permit: None });
|
||||
};
|
||||
}
|
||||
let now = Instant::now();
|
||||
let semaphore = {
|
||||
// get fast path
|
||||
if let Some(semaphore) = inner.node_locks.get(key) {
|
||||
if let Some(semaphore) = self.node_locks.get(key) {
|
||||
semaphore.clone()
|
||||
} else {
|
||||
inner
|
||||
.node_locks
|
||||
self.node_locks
|
||||
.entry(key.clone())
|
||||
.or_insert_with(|| {
|
||||
self.metrics.semaphores_registered.inc();
|
||||
Arc::new(Semaphore::new(inner.permits.get()))
|
||||
Arc::new(Semaphore::new(self.permits))
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
@@ -518,13 +509,13 @@ impl ApiLocks {
|
||||
}
|
||||
|
||||
pub async fn garbage_collect_worker(&self) {
|
||||
let Some(inner) = &self.inner else {
|
||||
if self.permits == 0 {
|
||||
return;
|
||||
};
|
||||
}
|
||||
let mut interval =
|
||||
tokio::time::interval(self.epoch / (inner.node_locks.shards().len()) as u32);
|
||||
tokio::time::interval(self.epoch / (self.node_locks.shards().len()) as u32);
|
||||
loop {
|
||||
for (i, shard) in inner.node_locks.shards().iter().enumerate() {
|
||||
for (i, shard) in self.node_locks.shards().iter().enumerate() {
|
||||
interval.tick().await;
|
||||
// temporary lock a single shard and then clear any semaphores that aren't currently checked out
|
||||
// race conditions: if strong_count == 1, there's no way that it can increase while the shard is locked
|
||||
|
||||
@@ -1,13 +1,17 @@
|
||||
use anyhow::{anyhow, bail};
|
||||
use camino::Utf8PathBuf;
|
||||
use camino_tempfile::Utf8TempDir;
|
||||
use hyper::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
|
||||
use measured::{text::BufferedTextEncoder, MetricGroup};
|
||||
use metrics::NeonMetrics;
|
||||
use once_cell::sync::Lazy;
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
ffi::CString,
|
||||
net::TcpListener,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
use tracing::{info, info_span};
|
||||
use tracing::{info, info_span, warn};
|
||||
use utils::http::{
|
||||
endpoint::{self, request_span},
|
||||
error::ApiError,
|
||||
@@ -21,18 +25,49 @@ async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
json_response(StatusCode::OK, "")
|
||||
}
|
||||
|
||||
async fn prof_dump(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
static PROF_MIB: Lazy<jemalloc::dump_mib> =
|
||||
Lazy::new(|| jemalloc::dump::mib().expect("could not create prof.dump MIB"));
|
||||
static PROF_DIR: Lazy<Utf8TempDir> =
|
||||
Lazy::new(|| camino_tempfile::tempdir().expect("could not create tempdir"));
|
||||
static PROF_FILE: Lazy<Utf8PathBuf> = Lazy::new(|| PROF_DIR.path().join("prof.dump"));
|
||||
static PROF_FILE0: Lazy<CString> = Lazy::new(|| CString::new(PROF_FILE.as_str()).unwrap());
|
||||
static DUMP_LOCK: Mutex<()> = Mutex::new(());
|
||||
|
||||
tokio::task::spawn_blocking(|| {
|
||||
let _guard = DUMP_LOCK.lock();
|
||||
PROF_MIB
|
||||
.write(&PROF_FILE0)
|
||||
.expect("could not trigger prof.dump");
|
||||
let prof_dump = std::fs::read_to_string(&*PROF_FILE).expect("could not open prof.dump");
|
||||
|
||||
Response::new(Body::from(prof_dump))
|
||||
})
|
||||
.await
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))
|
||||
}
|
||||
|
||||
fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper::Body, ApiError> {
|
||||
let state = Arc::new(Mutex::new(PrometheusHandler {
|
||||
encoder: BufferedTextEncoder::new(),
|
||||
metrics,
|
||||
}));
|
||||
|
||||
endpoint::make_router()
|
||||
let mut router = endpoint::make_router()
|
||||
.get("/metrics", move |r| {
|
||||
let state = state.clone();
|
||||
request_span(r, move |b| prometheus_metrics_handler(b, state))
|
||||
})
|
||||
.get("/v1/status", status_handler)
|
||||
.get("/v1/status", status_handler);
|
||||
|
||||
let prof_enabled = jemalloc::prof::read().unwrap_or_default();
|
||||
if prof_enabled {
|
||||
warn!("activating jemalloc profiling");
|
||||
jemalloc::active::write(true).unwrap();
|
||||
router = router.get("/v1/jemalloc/prof.dump", prof_dump);
|
||||
}
|
||||
|
||||
router
|
||||
}
|
||||
|
||||
pub async fn task_main(
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::marker::PhantomData;
|
||||
use std::{ffi::CStr, marker::PhantomData};
|
||||
|
||||
use measured::{
|
||||
label::NoLabels,
|
||||
@@ -9,7 +9,9 @@ use measured::{
|
||||
text::TextEncoder,
|
||||
LabelGroup, MetricGroup,
|
||||
};
|
||||
use tikv_jemalloc_ctl::{config, epoch, epoch_mib, stats, version};
|
||||
use tikv_jemalloc_ctl::{
|
||||
config, epoch, epoch_mib, raw, stats, version, Access, AsName, MibStr, Name,
|
||||
};
|
||||
|
||||
pub struct MetricRecorder {
|
||||
epoch: epoch_mib,
|
||||
@@ -114,3 +116,59 @@ jemalloc_gauge!(mapped, mapped_mib);
|
||||
jemalloc_gauge!(metadata, metadata_mib);
|
||||
jemalloc_gauge!(resident, resident_mib);
|
||||
jemalloc_gauge!(retained, retained_mib);
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
pub struct dump;
|
||||
|
||||
impl dump {
|
||||
pub fn mib() -> tikv_jemalloc_ctl::Result<dump_mib> {
|
||||
Ok(dump_mib(b"prof.dump\0".as_slice().name().mib_str()?))
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(transparent)]
|
||||
#[derive(Copy, Clone)]
|
||||
#[allow(non_camel_case_types)]
|
||||
pub struct dump_mib(pub MibStr<[usize; 2]>);
|
||||
|
||||
impl dump_mib {
|
||||
pub fn write(self, value: &'static CStr) -> tikv_jemalloc_ctl::Result<()> {
|
||||
// No support for Access<CStr> yet.
|
||||
// self.0.write(value)
|
||||
let mib = [self.0[0], self.0[1]];
|
||||
raw::write_str_mib(&mib, value.to_bytes_with_nul())
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
pub struct active;
|
||||
|
||||
impl active {
|
||||
pub fn name() -> &'static Name {
|
||||
b"prof.active\0".as_slice().name()
|
||||
}
|
||||
}
|
||||
|
||||
impl active {
|
||||
pub fn read() -> tikv_jemalloc_ctl::Result<bool> {
|
||||
Self::name().read()
|
||||
}
|
||||
pub fn write(value: bool) -> tikv_jemalloc_ctl::Result<()> {
|
||||
Self::name().write(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
pub struct prof;
|
||||
|
||||
impl prof {
|
||||
pub fn name() -> &'static Name {
|
||||
b"opt.prof\0".as_slice().name()
|
||||
}
|
||||
}
|
||||
|
||||
impl prof {
|
||||
pub fn read() -> tikv_jemalloc_ctl::Result<bool> {
|
||||
Self::name().read()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user