Compare commits

..

7 Commits

Author SHA1 Message Date
Conrad Ludgate
7d2709f4a1 back to published versions 2024-04-22 10:28:29 +01:00
Conrad Ludgate
6e2c04bc48 custom jemalloc opts 2024-04-22 10:26:49 +01:00
Conrad Ludgate
76ae735a24 bump 2024-04-22 06:55:43 +01:00
Conrad Ludgate
7be445f627 jemalloc profiling 2024-04-22 06:48:03 +01:00
Em Sharnoff
35e9fb360b Bump vm-builder v0.23.2 -> v0.28.1 (#7433)
Only one relevant change, from v0.28.0:

- neondatabase/autoscaling#887

Double-checked with `git log neonvm/tools/vm-builder`.
2024-04-21 17:35:01 -07:00
Heikki Linnakangas
0d21187322 update rustls
## Problem

`cargo deny check` is complaining about our rustls versions, causing
CI to fail:

```
error[vulnerability]: `rustls::ConnectionCommon::complete_io` could fall into an infinite loop based on network input
    ┌─ /__w/neon/neon/Cargo.lock:395:1
    │
395 │ rustls 0.21.9 registry+https://github.com/rust-lang/crates.io-index
    │ ------------------------------------------------------------------- security vulnerability detected
    │
    = ID: RUSTSEC-2024-0336
    = Advisory: https://rustsec.org/advisories/RUSTSEC-2024-0336
    = If a `close_notify` alert is received during a handshake, `complete_io`
      does not terminate.

      Callers which do not call `complete_io` are not affected.

      `rustls-tokio` and `rustls-ffi` do not call `complete_io`
      and are not affected.

      `rustls::Stream` and `rustls::StreamOwned` types use
      `complete_io` and are affected.
    = Announcement: https://github.com/rustls/rustls/security/advisories/GHSA-6g7w-8wpp-frhj
    = Solution: Upgrade to >=0.23.5 OR >=0.22.4, <0.23.0 OR >=0.21.11, <0.22.0 (try `cargo update -p rustls`)

error[vulnerability]: `rustls::ConnectionCommon::complete_io` could fall into an infinite loop based on network input
    ┌─ /__w/neon/neon/Cargo.lock:396:1
    │
396 │ rustls 0.22.2 registry+https://github.com/rust-lang/crates.io-index
    │ ------------------------------------------------------------------- security vulnerability detected
    │
    = ID: RUSTSEC-2024-0336
    = Advisory: https://rustsec.org/advisories/RUSTSEC-2024-0336
    = If a `close_notify` alert is received during a handshake, `complete_io`
      does not terminate.

      Callers which do not call `complete_io` are not affected.

      `rustls-tokio` and `rustls-ffi` do not call `complete_io`
      and are not affected.

      `rustls::Stream` and `rustls::StreamOwned` types use
      `complete_io` and are affected.
    = Announcement: https://github.com/rustls/rustls/security/advisories/GHSA-6g7w-8wpp-frhj
    = Solution: Upgrade to >=0.23.5 OR >=0.22.4, <0.23.0 OR >=0.21.11, <0.22.0 (try `cargo update -p rustls`)
```

## Summary of changes

`cargo update -p rustls@0.21.9 -p rustls@0.22.2`
2024-04-21 21:10:05 +01:00
Alexander Bayandin
e8a98adcd0 CI: downgrade docker/setup-buildx-action to v2
- Cleanup part for `docker/setup-buildx-action` started to fail with the following error (for no obvious reason):
```
/nvme/actions-runner/_work/_actions/docker/setup-buildx-action/v3/webpack:/docker-setup-buildx/node_modules/@actions/cache/lib/cache.js:175
            throw new Error(`Path Validation Error: Path(s) specified in the action for caching do(es) not exist, hence no cache is being saved.`);
^
Error: Path Validation Error: Path(s) specified in the action for caching do(es) not exist, hence no cache is being saved.
    at Object.rejected (/nvme/actions-runner/_work/_actions/docker/setup-buildx-action/v3/webpack:/docker-setup-buildx/node_modules/@actions/cache/lib/cache.js:175:1)
    at Generator.next (<anonymous>)
    at fulfilled (/nvme/actions-runner/_work/_actions/docker/setup-buildx-action/v3/webpack:/docker-setup-buildx/node_modules/@actions/cache/lib/cache.js:29:1)
```

- Downgrade `docker/setup-buildx-action` from v3 to v2
2024-04-21 21:10:05 +01:00
27 changed files with 345 additions and 442 deletions

View File

@@ -2,7 +2,6 @@
# This is only present for local builds, as it will be overridden
# by the RUSTDOCFLAGS env var in CI.
rustdocflags = ["-Arustdoc::private_intra_doc_links"]
rustflags = ["--cfg", "tokio_unstable"]
[alias]
build_testing = ["build", "--features", "testing"]

View File

@@ -214,7 +214,6 @@ jobs:
BUILD_TYPE: ${{ matrix.build_type }}
GIT_VERSION: ${{ github.event.pull_request.head.sha || github.sha }}
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
RUSTFLAGS: "--cfg=tokio_unstable"
steps:
- name: Fix git ownership
@@ -736,7 +735,7 @@ jobs:
run: |
mkdir -p .docker-custom
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
- uses: docker/setup-buildx-action@v3
- uses: docker/setup-buildx-action@v2
- uses: docker/login-action@v3
with:
@@ -793,7 +792,7 @@ jobs:
run: |
mkdir -p .docker-custom
echo DOCKER_CONFIG=$(pwd)/.docker-custom >> $GITHUB_ENV
- uses: docker/setup-buildx-action@v3
- uses: docker/setup-buildx-action@v2
with:
# Disable parallelism for docker buildkit.
# As we already build everything with `make -j$(nproc)`, running it in additional level of parallelisam blows up the Runner.
@@ -866,7 +865,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.23.2
VM_BUILDER_VERSION: v0.28.1
steps:
- name: Checkout

172
Cargo.lock generated
View File

@@ -599,7 +599,7 @@ dependencies = [
"once_cell",
"pin-project-lite",
"pin-utils",
"rustls 0.21.9",
"rustls 0.21.11",
"tokio",
"tracing",
]
@@ -1240,43 +1240,6 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "console-api"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787"
dependencies = [
"futures-core",
"prost 0.12.4",
"prost-types 0.12.4",
"tonic 0.10.2",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e"
dependencies = [
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures-task",
"hdrhistogram",
"humantime",
"prost-types 0.12.4",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic 0.10.2",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "const-oid"
version = "0.9.5"
@@ -2556,7 +2519,7 @@ dependencies = [
"http 0.2.9",
"hyper 0.14.26",
"log",
"rustls 0.21.9",
"rustls 0.21.11",
"rustls-native-certs 0.6.2",
"tokio",
"tokio-rustls 0.24.0",
@@ -3445,11 +3408,11 @@ dependencies = [
"opentelemetry-semantic-conventions",
"opentelemetry_api",
"opentelemetry_sdk",
"prost 0.11.9",
"prost",
"reqwest",
"thiserror",
"tokio",
"tonic 0.9.2",
"tonic",
]
[[package]]
@@ -3460,8 +3423,8 @@ checksum = "b1e3f814aa9f8c905d0ee4bde026afd3b2577a97c10e1699912e3e44f0c4cbeb"
dependencies = [
"opentelemetry_api",
"opentelemetry_sdk",
"prost 0.11.9",
"tonic 0.9.2",
"prost",
"tonic",
]
[[package]]
@@ -4046,6 +4009,17 @@ dependencies = [
"tokio-postgres",
]
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"native-tls",
"tokio",
"tokio-native-tls",
"tokio-postgres",
]
[[package]]
name = "postgres-protocol"
version = "0.6.4"
@@ -4085,7 +4059,7 @@ dependencies = [
"futures",
"once_cell",
"pq_proto",
"rustls 0.22.2",
"rustls 0.22.4",
"rustls-pemfile 2.1.1",
"serde",
"thiserror",
@@ -4260,17 +4234,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
dependencies = [
"bytes",
"prost-derive 0.11.9",
]
[[package]]
name = "prost"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922"
dependencies = [
"bytes",
"prost-derive 0.12.4",
"prost-derive",
]
[[package]]
@@ -4287,8 +4251,8 @@ dependencies = [
"multimap",
"petgraph",
"prettyplease 0.1.25",
"prost 0.11.9",
"prost-types 0.11.9",
"prost",
"prost-types",
"regex",
"syn 1.0.109",
"tempfile",
@@ -4308,35 +4272,13 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "prost-derive"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn 2.0.52",
]
[[package]]
name = "prost-types"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
dependencies = [
"prost 0.11.9",
]
[[package]]
name = "prost-types"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3235c33eb02c1f1e212abdbe34c78b264b038fb58ca612664343271e36e55ffe"
dependencies = [
"prost 0.12.4",
"prost",
]
[[package]]
@@ -4358,7 +4300,6 @@ dependencies = [
"camino-tempfile",
"chrono",
"clap",
"console-subscriber",
"consumption_metrics",
"dashmap",
"env_logger",
@@ -4383,6 +4324,7 @@ dependencies = [
"md5",
"measured",
"metrics",
"native-tls",
"once_cell",
"opentelemetry",
"parking_lot 0.12.1",
@@ -4390,6 +4332,7 @@ dependencies = [
"parquet_derive",
"pbkdf2",
"pin-project-lite",
"postgres-native-tls",
"postgres-protocol",
"postgres_backend",
"pq_proto",
@@ -4407,8 +4350,7 @@ dependencies = [
"routerify",
"rstest",
"rustc-hash",
"rustls 0.22.2",
"rustls-native-certs 0.7.0",
"rustls 0.22.4",
"rustls-pemfile 2.1.1",
"scopeguard",
"serde",
@@ -4600,7 +4542,7 @@ dependencies = [
"itoa",
"percent-encoding",
"pin-project-lite",
"rustls 0.22.2",
"rustls 0.22.4",
"rustls-native-certs 0.7.0",
"rustls-pemfile 2.1.1",
"rustls-pki-types",
@@ -4754,7 +4696,7 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls 0.21.9",
"rustls 0.21.11",
"rustls-pemfile 1.0.2",
"serde",
"serde_json",
@@ -5014,9 +4956,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.21.9"
version = "0.21.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9"
checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4"
dependencies = [
"log",
"ring 0.17.6",
@@ -5026,9 +4968,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.22.2"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41"
checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432"
dependencies = [
"log",
"ring 0.17.6",
@@ -5340,7 +5282,7 @@ checksum = "2e95efd0cefa32028cdb9766c96de71d96671072f9fb494dc9fb84c0ef93e52b"
dependencies = [
"httpdate",
"reqwest",
"rustls 0.21.9",
"rustls 0.21.11",
"sentry-backtrace",
"sentry-contexts",
"sentry-core",
@@ -5775,10 +5717,10 @@ dependencies = [
"metrics",
"once_cell",
"parking_lot 0.12.1",
"prost 0.11.9",
"prost",
"tokio",
"tokio-stream",
"tonic 0.9.2",
"tonic",
"tonic-build",
"tracing",
"utils",
@@ -6170,7 +6112,6 @@ dependencies = [
"signal-hook-registry",
"socket2 0.5.5",
"tokio-macros",
"tracing",
"windows-sys 0.48.0",
]
@@ -6252,7 +6193,7 @@ checksum = "0ea13f22eda7127c827983bdaf0d7fff9df21c8817bab02815ac277a21143677"
dependencies = [
"futures",
"ring 0.17.6",
"rustls 0.22.2",
"rustls 0.22.4",
"tokio",
"tokio-postgres",
"tokio-rustls 0.25.0",
@@ -6265,7 +6206,7 @@ version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5"
dependencies = [
"rustls 0.21.9",
"rustls 0.21.11",
"tokio",
]
@@ -6275,7 +6216,7 @@ version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"
dependencies = [
"rustls 0.22.2",
"rustls 0.22.4",
"rustls-pki-types",
"tokio",
]
@@ -6401,7 +6342,7 @@ dependencies = [
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost 0.11.9",
"prost",
"rustls-native-certs 0.6.2",
"rustls-pemfile 1.0.2",
"tokio",
@@ -6413,33 +6354,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "tonic"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64 0.21.1",
"bytes",
"h2 0.3.26",
"http 0.2.9",
"http-body 0.4.5",
"hyper 0.14.26",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost 0.12.4",
"tokio",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic-build"
version = "0.9.2"
@@ -6763,7 +6677,7 @@ dependencies = [
"base64 0.21.1",
"log",
"once_cell",
"rustls 0.21.9",
"rustls 0.21.11",
"rustls-webpki 0.100.2",
"url",
"webpki-roots 0.23.1",
@@ -7420,7 +7334,6 @@ dependencies = [
"futures-util",
"getrandom 0.2.11",
"hashbrown 0.14.0",
"hdrhistogram",
"hex",
"hmac",
"hyper 0.14.26",
@@ -7435,13 +7348,13 @@ dependencies = [
"num-traits",
"once_cell",
"parquet",
"prost 0.11.9",
"prost",
"rand 0.8.5",
"regex",
"regex-automata 0.4.3",
"regex-syntax 0.8.2",
"reqwest",
"rustls 0.21.9",
"rustls 0.21.11",
"scopeguard",
"serde",
"serde_json",
@@ -7454,11 +7367,10 @@ dependencies = [
"time-macros",
"tokio",
"tokio-rustls 0.24.0",
"tokio-stream",
"tokio-util",
"toml_datetime",
"toml_edit",
"tonic 0.9.2",
"tonic",
"tower",
"tracing",
"tracing-core",

View File

@@ -252,7 +252,7 @@ debug = true
# disable debug symbols for all packages except this one to decrease binaries size
[profile.release.package."*"]
debug = false
debug = true
[profile.release-line-debug]
inherits = "release"

View File

@@ -44,10 +44,11 @@ COPY --from=pg-build /home/nonroot/pg_install/v15/include/postgresql/server pg_i
COPY --from=pg-build /home/nonroot/pg_install/v16/include/postgresql/server pg_install/v16/include/postgresql/server
COPY --chown=nonroot . .
ENV _RJEM_MALLOC_CONF="prof:true"
# Show build caching stats to check if it was used in the end.
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment --cfg=tokio_unstable" cargo build \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment" cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \

View File

@@ -21,6 +21,7 @@ base64.workspace = true
bstr.workspace = true
bytes = { workspace = true, features = ["serde"] }
camino.workspace = true
camino-tempfile.workspace = true
chrono.workspace = true
clap.workspace = true
consumption_metrics.workspace = true
@@ -67,7 +68,6 @@ routerify.workspace = true
rustc-hash.workspace = true
rustls-pemfile.workspace = true
rustls.workspace = true
rustls-native-certs = "0.7.0"
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
@@ -79,13 +79,12 @@ subtle.workspace = true
sync_wrapper.workspace = true
task-local-extensions.workspace = true
thiserror.workspace = true
tikv-jemallocator.workspace = true
tikv-jemallocator = { workspace = true, features = ["profiling"] }
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
tokio-postgres.workspace = true
tokio-postgres-rustls.workspace = true
tokio-rustls.workspace = true
tokio-util.workspace = true
tokio = { workspace = true, features = ["signal", "tracing"] }
tokio = { workspace = true, features = ["signal"] }
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true
@@ -96,17 +95,17 @@ utils.workspace = true
uuid.workspace = true
webpki-roots.workspace = true
x509-parser.workspace = true
native-tls.workspace = true
postgres-native-tls.workspace = true
postgres-protocol.workspace = true
redis.workspace = true
console-subscriber = "0.2.0"
workspace_hack.workspace = true
[dev-dependencies]
camino-tempfile.workspace = true
fallible-iterator.workspace = true
rcgen.workspace = true
rstest.workspace = true
tokio-postgres-rustls.workspace = true
walkdir.workspace = true
rand_distr = "0.4"

View File

@@ -121,5 +121,6 @@ pub(super) async fn authenticate(
Ok(NodeInfo {
config,
aux: db_info.aux,
allow_self_signed_compute: false, // caller may override
})
}

View File

@@ -35,8 +35,6 @@ use proxy::config::{self, ProxyConfig};
use proxy::serverless;
use std::net::SocketAddr;
use std::pin::pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
@@ -122,6 +120,9 @@ struct ProxyCliArgs {
/// lock for `wake_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
#[clap(long, default_value = config::WakeComputeLockOptions::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK)]
wake_compute_lock: String,
/// Allow self-signed certificates for compute nodes (for testing)
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
allow_self_signed_compute: bool,
#[clap(flatten)]
sql_over_http: SqlOverHttpArgs,
/// timeout for scram authentication protocol
@@ -235,21 +236,8 @@ struct SqlOverHttpArgs {
sql_over_http_pool_shards: usize,
}
fn main() -> anyhow::Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("worker-{}", id)
})
.build()
.unwrap();
rt.block_on(main2())
}
async fn main2() -> anyhow::Result<()> {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _logging_guard = proxy::logging::init().await?;
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
@@ -364,16 +352,12 @@ async fn main2() -> anyhow::Result<()> {
// client facing tasks. these will exit on error or on cancellation
// cancellation returns Ok(())
let mut client_tasks = JoinSet::new();
client_tasks
.build_task()
.name("tcp main")
.spawn(proxy::proxy::task_main(
config,
proxy_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
))
.unwrap();
client_tasks.spawn(proxy::proxy::task_main(
config,
proxy_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
));
// TODO: rename the argument to something like serverless.
// It now covers more than just websockets, it also covers SQL over HTTP.
@@ -382,98 +366,58 @@ async fn main2() -> anyhow::Result<()> {
info!("Starting wss on {serverless_address}");
let serverless_listener = TcpListener::bind(serverless_address).await?;
client_tasks
.build_task()
.name("serverless main")
.spawn(serverless::task_main(
config,
serverless_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
))
.unwrap();
client_tasks.spawn(serverless::task_main(
config,
serverless_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
));
}
client_tasks
.build_task()
.name("parquet worker")
.spawn(proxy::context::parquet::worker(
cancellation_token.clone(),
args.parquet_upload,
))
.unwrap();
client_tasks.spawn(proxy::context::parquet::worker(
cancellation_token.clone(),
args.parquet_upload,
));
// maintenance tasks. these never return unless there's an error
let mut maintenance_tasks = JoinSet::new();
maintenance_tasks
.build_task()
.name("signal handler")
.spawn(proxy::handle_signals(cancellation_token.clone()))
.unwrap();
maintenance_tasks
.build_task()
.name("health server")
.spawn(http::health_server::task_main(
http_listener,
AppMetrics {
jemalloc,
neon_metrics,
proxy: proxy::metrics::Metrics::get(),
},
))
.unwrap();
maintenance_tasks
.build_task()
.name("mangement main")
.spawn(console::mgmt::task_main(mgmt_listener))
.unwrap();
maintenance_tasks.spawn(proxy::handle_signals(cancellation_token.clone()));
maintenance_tasks.spawn(http::health_server::task_main(
http_listener,
AppMetrics {
jemalloc,
neon_metrics,
proxy: proxy::metrics::Metrics::get(),
},
));
maintenance_tasks.spawn(console::mgmt::task_main(mgmt_listener));
if let Some(metrics_config) = &config.metric_collection {
// TODO: Add gc regardles of the metric collection being enabled.
maintenance_tasks
.build_task()
.name("")
.spawn(usage_metrics::task_main(metrics_config))
.unwrap();
client_tasks
.build_task()
.name("")
.spawn(usage_metrics::task_backup(
&metrics_config.backup_metric_collection_config,
cancellation_token,
))
.unwrap();
maintenance_tasks.spawn(usage_metrics::task_main(metrics_config));
client_tasks.spawn(usage_metrics::task_backup(
&metrics_config.backup_metric_collection_config,
cancellation_token,
));
}
if let auth::BackendType::Console(api, _) = &config.auth_backend {
if let proxy::console::provider::ConsoleBackend::Console(api) = &**api {
if let Some(redis_notifications_client) = redis_notifications_client {
let cache = api.caches.project_info.clone();
maintenance_tasks
.build_task()
.name("redis notifications")
.spawn(notifications::task_main(
redis_notifications_client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
))
.unwrap();
maintenance_tasks
.build_task()
.name("proj info cache gc")
.spawn(async move { cache.clone().gc_worker().await })
.unwrap();
maintenance_tasks.spawn(notifications::task_main(
redis_notifications_client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
));
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
}
if let Some(regional_redis_client) = regional_redis_client {
let cache = api.caches.endpoints_cache.clone();
let con = regional_redis_client;
let span = tracing::info_span!("endpoints_cache");
maintenance_tasks
.build_task()
.name("redis endpoints cache read")
.spawn(async move { cache.do_read(con).await }.instrument(span))
.unwrap();
maintenance_tasks.spawn(async move { cache.do_read(con).await }.instrument(span));
}
}
}
@@ -514,6 +458,9 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
};
if args.allow_self_signed_compute {
warn!("allowing self-signed compute certificates");
}
let backup_metric_collection_config = config::MetricBackupCollectionConfig {
interval: args.metric_backup_collection_interval,
remote_storage_config: remote_storage_from_toml(
@@ -578,10 +525,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
)
.unwrap(),
));
tokio::task::Builder::new()
.name("wake compute lock gc")
.spawn(locks.garbage_collect_worker())
.unwrap();
tokio::spawn(locks.garbage_collect_worker());
let url = args.auth_endpoint.parse()?;
let endpoint = http::Endpoint::new(url, http::new_client());
@@ -631,6 +575,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
tls_config,
auth_backend,
metric_collection,
allow_self_signed_compute: args.allow_self_signed_compute,
http_config,
authentication_config,
require_client_ip: args.require_client_ip,

View File

@@ -10,12 +10,7 @@ use crate::{
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
use std::{
io,
net::SocketAddr,
sync::{Arc, OnceLock},
time::Duration,
};
use std::{io, net::SocketAddr, time::Duration};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::tls::MakeTlsConnect;
@@ -33,6 +28,9 @@ pub enum ConnectionError {
#[error("{COULD_NOT_CONNECT}: {0}")]
CouldNotConnect(#[from] io::Error),
#[error("{COULD_NOT_CONNECT}: {0}")]
TlsError(#[from] native_tls::Error),
#[error("{COULD_NOT_CONNECT}: {0}")]
WakeComputeError(#[from] WakeComputeError),
}
@@ -71,6 +69,7 @@ impl ReportableError for ConnectionError {
}
ConnectionError::Postgres(_) => crate::error::ErrorKind::Compute,
ConnectionError::CouldNotConnect(_) => crate::error::ErrorKind::Compute,
ConnectionError::TlsError(_) => crate::error::ErrorKind::Compute,
ConnectionError::WakeComputeError(e) => e.get_error_kind(),
}
}
@@ -240,7 +239,7 @@ pub struct PostgresConnection {
/// Socket connected to a compute node.
pub stream: tokio_postgres::maybe_tls_stream::MaybeTlsStream<
tokio::net::TcpStream,
tokio_postgres_rustls::RustlsStream<tokio::net::TcpStream>,
postgres_native_tls::TlsStream<tokio::net::TcpStream>,
>,
/// PostgreSQL connection parameters.
pub params: std::collections::HashMap<String, String>,
@@ -252,39 +251,22 @@ pub struct PostgresConnection {
_guage: NumDbConnectionsGuard<'static>,
}
static ROOT_STORE: OnceLock<Arc<rustls::RootCertStore>> = OnceLock::new();
impl ConnCfg {
/// Connect to a corresponding compute node.
pub async fn connect(
&self,
ctx: &mut RequestMonitoring,
allow_self_signed_compute: bool,
aux: MetricsAuxInfo,
timeout: Duration,
) -> Result<PostgresConnection, ConnectionError> {
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
let root_store = ROOT_STORE.get_or_init(|| {
let mut roots = rustls::RootCertStore::empty();
let certs = match rustls_native_certs::load_native_certs() {
Ok(certs) => certs,
Err(e) => {
error!("could not load native ssl certs: {e:?}");
return Arc::new(roots);
}
};
let (added, ignored) = roots.add_parsable_certificates(certs);
info!(added, ignored, "loaded native ssl certifications");
Arc::new(roots)
});
let client_config = rustls::ClientConfig::builder()
.with_root_certificates(root_store.clone())
.with_no_client_auth();
let mut mk_tls = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
let tls_connector = native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(allow_self_signed_compute)
.build()
.unwrap();
let mut mk_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector);
let tls = MakeTlsConnect::<tokio::net::TcpStream>::make_tls_connect(&mut mk_tls, host)?;
// connect_raw() will not use TLS if sslmode is "disable"

View File

@@ -24,6 +24,7 @@ pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
pub auth_backend: auth::BackendType<'static, (), ()>,
pub metric_collection: Option<MetricCollectionConfig>,
pub allow_self_signed_compute: bool,
pub http_config: HttpConfig,
pub authentication_config: AuthenticationConfig,
pub require_client_ip: bool,

View File

@@ -40,31 +40,28 @@ pub async fn task_main(listener: TcpListener) -> anyhow::Result<Infallible> {
let span = info_span!("mgmt", peer = %peer_addr);
tokio::task::Builder::new()
.name("mgmt handler")
.spawn(
async move {
info!("serving a new console management API connection");
tokio::task::spawn(
async move {
info!("serving a new console management API connection");
// these might be long running connections, have a separate logging for cancelling
// on shutdown and other ways of stopping.
let cancelled = scopeguard::guard(tracing::Span::current(), |span| {
let _e = span.entered();
info!("console management API task cancelled");
});
// these might be long running connections, have a separate logging for cancelling
// on shutdown and other ways of stopping.
let cancelled = scopeguard::guard(tracing::Span::current(), |span| {
let _e = span.entered();
info!("console management API task cancelled");
});
if let Err(e) = handle_connection(socket).await {
error!("serving failed with an error: {e}");
} else {
info!("serving completed");
}
// we can no longer get dropped
scopeguard::ScopeGuard::into_inner(cancelled);
if let Err(e) = handle_connection(socket).await {
error!("serving failed with an error: {e}");
} else {
info!("serving completed");
}
.instrument(span),
)
.unwrap();
// we can no longer get dropped
scopeguard::ScopeGuard::into_inner(cancelled);
}
.instrument(span),
);
}
}

View File

@@ -293,6 +293,9 @@ pub struct NodeInfo {
/// Labels for proxy's metrics.
pub aux: MetricsAuxInfo,
/// Whether we should accept self-signed certificates (for testing)
pub allow_self_signed_compute: bool,
}
impl NodeInfo {
@@ -301,9 +304,17 @@ impl NodeInfo {
ctx: &mut RequestMonitoring,
timeout: Duration,
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
self.config.connect(ctx, self.aux.clone(), timeout).await
self.config
.connect(
ctx,
self.allow_self_signed_compute,
self.aux.clone(),
timeout,
)
.await
}
pub fn reuse_settings(&mut self, other: Self) {
self.allow_self_signed_compute = other.allow_self_signed_compute;
self.config.reuse_password(other.config);
}

View File

@@ -63,10 +63,7 @@ impl Api {
let (client, connection) =
tokio_postgres::connect(self.endpoint.as_str(), tokio_postgres::NoTls).await?;
tokio::task::Builder::new()
.name("mock conn")
.spawn(connection)
.unwrap();
tokio::spawn(connection);
let secret = match get_execute_postgres_query(
&client,
"select rolpassword from pg_catalog.pg_authid where rolname = $1",
@@ -129,6 +126,7 @@ impl Api {
branch_id: (&BranchId::from("branch")).into(),
cold_start_info: crate::console::messages::ColdStartInfo::Warm,
},
allow_self_signed_compute: false,
};
Ok(node)

View File

@@ -175,6 +175,7 @@ impl Api {
let node = NodeInfo {
config,
aux: body.aux,
allow_self_signed_compute: false,
};
Ok(node)

View File

@@ -141,15 +141,12 @@ pub async fn worker(
LOG_CHAN.set(tx.downgrade()).unwrap();
// setup row stream that will close on cancellation
tokio::task::Builder::new()
.name("drop parquet conn")
.spawn(async move {
cancellation_token.cancelled().await;
// dropping this sender will cause the channel to close only once
// all the remaining inflight requests have been completed.
drop(tx);
})
.unwrap();
tokio::spawn(async move {
cancellation_token.cancelled().await;
// dropping this sender will cause the channel to close only once
// all the remaining inflight requests have been completed.
drop(tx);
});
let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
let rx = rx.map(RequestData::from);

View File

@@ -1,13 +1,17 @@
use anyhow::{anyhow, bail};
use camino::Utf8PathBuf;
use camino_tempfile::Utf8TempDir;
use hyper::{header::CONTENT_TYPE, Body, Request, Response, StatusCode};
use measured::{text::BufferedTextEncoder, MetricGroup};
use metrics::NeonMetrics;
use once_cell::sync::Lazy;
use std::{
convert::Infallible,
ffi::CString,
net::TcpListener,
sync::{Arc, Mutex},
};
use tracing::{info, info_span};
use tracing::{info, info_span, warn};
use utils::http::{
endpoint::{self, request_span},
error::ApiError,
@@ -21,18 +25,49 @@ async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(StatusCode::OK, "")
}
async fn prof_dump(_: Request<Body>) -> Result<Response<Body>, ApiError> {
static PROF_MIB: Lazy<jemalloc::dump_mib> =
Lazy::new(|| jemalloc::dump::mib().expect("could not create prof.dump MIB"));
static PROF_DIR: Lazy<Utf8TempDir> =
Lazy::new(|| camino_tempfile::tempdir().expect("could not create tempdir"));
static PROF_FILE: Lazy<Utf8PathBuf> = Lazy::new(|| PROF_DIR.path().join("prof.dump"));
static PROF_FILE0: Lazy<CString> = Lazy::new(|| CString::new(PROF_FILE.as_str()).unwrap());
static DUMP_LOCK: Mutex<()> = Mutex::new(());
tokio::task::spawn_blocking(|| {
let _guard = DUMP_LOCK.lock();
PROF_MIB
.write(&PROF_FILE0)
.expect("could not trigger prof.dump");
let prof_dump = std::fs::read_to_string(&*PROF_FILE).expect("could not open prof.dump");
Response::new(Body::from(prof_dump))
})
.await
.map_err(|e| ApiError::InternalServerError(e.into()))
}
fn make_router(metrics: AppMetrics) -> RouterBuilder<hyper::Body, ApiError> {
let state = Arc::new(Mutex::new(PrometheusHandler {
encoder: BufferedTextEncoder::new(),
metrics,
}));
endpoint::make_router()
let mut router = endpoint::make_router()
.get("/metrics", move |r| {
let state = state.clone();
request_span(r, move |b| prometheus_metrics_handler(b, state))
})
.get("/v1/status", status_handler)
.get("/v1/status", status_handler);
let prof_enabled = jemalloc::prof::read().unwrap_or_default();
if prof_enabled {
warn!("activating jemalloc profiling");
jemalloc::active::write(true).unwrap();
router = router.get("/v1/jemalloc/prof.dump", prof_dump);
}
router
}
pub async fn task_main(
@@ -75,10 +110,6 @@ async fn prometheus_metrics_handler(
let span = info_span!("blocking");
let body = tokio::task::spawn_blocking(move || {
// there are situations where we lose scraped metrics under load, try to gather some clues
// since all nodes are queried this, keep the message count low.
let spawned_at = std::time::Instant::now();
let _span = span.entered();
let mut state = state.lock().unwrap();
@@ -88,19 +119,11 @@ async fn prometheus_metrics_handler(
.collect_group_into(&mut *encoder)
.unwrap_or_else(|infallible| match infallible {});
let encoded_at = std::time::Instant::now();
let body = encoder.finish();
let spawned_in = spawned_at - started_at;
let encoded_in = encoded_at - spawned_at;
let total = encoded_at - started_at;
tracing::info!(
bytes = body.len(),
total_ms = total.as_millis(),
spawning_ms = spawned_in.as_millis(),
encoding_ms = encoded_in.as_millis(),
elapsed_ms = started_at.elapsed().as_millis(),
"responded /metrics"
);

View File

@@ -1,4 +1,4 @@
use std::marker::PhantomData;
use std::{ffi::CStr, marker::PhantomData};
use measured::{
label::NoLabels,
@@ -9,7 +9,9 @@ use measured::{
text::TextEncoder,
LabelGroup, MetricGroup,
};
use tikv_jemalloc_ctl::{config, epoch, epoch_mib, stats, version};
use tikv_jemalloc_ctl::{
config, epoch, epoch_mib, raw, stats, version, Access, AsName, MibStr, Name,
};
pub struct MetricRecorder {
epoch: epoch_mib,
@@ -114,3 +116,59 @@ jemalloc_gauge!(mapped, mapped_mib);
jemalloc_gauge!(metadata, metadata_mib);
jemalloc_gauge!(resident, resident_mib);
jemalloc_gauge!(retained, retained_mib);
#[allow(non_camel_case_types)]
pub struct dump;
impl dump {
pub fn mib() -> tikv_jemalloc_ctl::Result<dump_mib> {
Ok(dump_mib(b"prof.dump\0".as_slice().name().mib_str()?))
}
}
#[repr(transparent)]
#[derive(Copy, Clone)]
#[allow(non_camel_case_types)]
pub struct dump_mib(pub MibStr<[usize; 2]>);
impl dump_mib {
pub fn write(self, value: &'static CStr) -> tikv_jemalloc_ctl::Result<()> {
// No support for Access<CStr> yet.
// self.0.write(value)
let mib = [self.0[0], self.0[1]];
raw::write_str_mib(&mib, value.to_bytes_with_nul())
}
}
#[allow(non_camel_case_types)]
pub struct active;
impl active {
pub fn name() -> &'static Name {
b"prof.active\0".as_slice().name()
}
}
impl active {
pub fn read() -> tikv_jemalloc_ctl::Result<bool> {
Self::name().read()
}
pub fn write(value: bool) -> tikv_jemalloc_ctl::Result<()> {
Self::name().write(value)
}
}
#[allow(non_camel_case_types)]
pub struct prof;
impl prof {
pub fn name() -> &'static Name {
b"opt.prof\0".as_slice().name()
}
}
impl prof {
pub fn read() -> tikv_jemalloc_ctl::Result<bool> {
Self::name().read()
}
}

View File

@@ -26,12 +26,7 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
.await
.map(OpenTelemetryLayer::new);
// spawn the console server in the background,
// returning a `Layer`:
let console_layer = console_subscriber::spawn();
tracing_subscriber::registry()
.with(console_layer)
.with(env_filter)
.with(otlp_layer)
.with(fmt_layer)

View File

@@ -87,7 +87,7 @@ pub async fn task_main(
tracing::info!(protocol = "tcp", %session_id, "accepted new TCP connection");
tokio::task::Builder::new().name("tcp client connection").spawn(connections.track_future(async move {
connections.spawn(async move {
let mut socket = WithClientIp::new(socket);
let mut peer_addr = peer_addr.ip();
match socket.wait_for_addr().await {
@@ -152,7 +152,7 @@ pub async fn task_main(
}
}
}
})).unwrap();
});
}
connections.close();
@@ -178,6 +178,13 @@ impl ClientMode {
}
}
pub fn allow_self_signed_compute(&self, config: &ProxyConfig) -> bool {
match self {
ClientMode::Tcp => config.allow_self_signed_compute,
ClientMode::Websockets { .. } => false,
}
}
fn hostname<'a, S>(&'a self, s: &'a Stream<S>) -> Option<&'a str> {
match self {
ClientMode::Tcp => s.sni_hostname(),
@@ -296,9 +303,14 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
}
};
let mut node = connect_to_compute(ctx, &TcpMechanism { params: &params }, &user_info)
.or_else(|e| stream.throw_error(e))
.await?;
let mut node = connect_to_compute(
ctx,
&TcpMechanism { params: &params },
&user_info,
mode.allow_self_signed_compute(config),
)
.or_else(|e| stream.throw_error(e))
.await?;
let session = cancellation_handler.get_session();
prepare_client_connection(&node, &session, &mut stream).await?;

View File

@@ -92,6 +92,7 @@ pub async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
ctx: &mut RequestMonitoring,
mechanism: &M,
user_info: &B,
allow_self_signed_compute: bool,
) -> Result<M::Connection, M::Error>
where
M::ConnectError: ShouldRetry + std::fmt::Debug,
@@ -102,6 +103,8 @@ where
if let Some(keys) = user_info.get_keys() {
node_info.set_keys(keys);
}
node_info.allow_self_signed_compute = allow_self_signed_compute;
// let mut node_info = credentials.get_node_info(ctx, user_info).await?;
mechanism.update_connect_config(&mut node_info.config);
// try once

View File

@@ -519,6 +519,7 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
branch_id: (&BranchId::from("branch")).into(),
cold_start_info: crate::console::messages::ColdStartInfo::Warm,
},
allow_self_signed_compute: false,
};
let (_, node) = cache.insert("key".into(), node);
node
@@ -548,7 +549,7 @@ async fn connect_to_compute_success() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info)
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
.await
.unwrap();
mechanism.verify();
@@ -561,7 +562,7 @@ async fn connect_to_compute_retry() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info)
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
.await
.unwrap();
mechanism.verify();
@@ -575,7 +576,7 @@ async fn connect_to_compute_non_retry_1() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Fail]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info)
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
.await
.unwrap_err();
mechanism.verify();
@@ -589,7 +590,7 @@ async fn connect_to_compute_non_retry_2() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Fail, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info)
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
.await
.unwrap();
mechanism.verify();
@@ -607,7 +608,7 @@ async fn connect_to_compute_non_retry_3() {
Retry, Retry, Retry, Retry, Retry, /* the 17th time */ Retry,
]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info)
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
.await
.unwrap_err();
mechanism.verify();
@@ -621,7 +622,7 @@ async fn wake_retry() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![WakeRetry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info)
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
.await
.unwrap();
mechanism.verify();
@@ -635,7 +636,7 @@ async fn wake_non_retry() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![WakeRetry, WakeFail]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info)
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
.await
.unwrap_err();
mechanism.verify();

View File

@@ -108,12 +108,10 @@ impl ConnectionWithCredentialsProvider {
if let Credentials::Dynamic(credentials_provider, _) = &self.credentials {
let credentials_provider = credentials_provider.clone();
let con2 = con.clone();
let f = tokio::task::Builder::new()
.name("redis keep connection")
.spawn(async move {
let _ = Self::keep_connection(con2, credentials_provider).await;
});
self.refresh_token_task = Some(f.unwrap());
let f = tokio::spawn(async move {
let _ = Self::keep_connection(con2, credentials_provider).await;
});
self.refresh_token_task = Some(f);
}
match Self::ping(&mut con).await {
Ok(()) => {

View File

@@ -142,13 +142,10 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
// To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds.
// TODO: include the version (or the timestamp) in the message and invalidate only if the entry is cached before the message.
let cache = self.cache.clone();
tokio::task::Builder::new()
.name("invalidate cache lazy")
.spawn(async move {
tokio::time::sleep(INVALIDATION_LAG).await;
invalidate_cache(cache, msg);
})
.unwrap();
tokio::spawn(async move {
tokio::time::sleep(INVALIDATION_LAG).await;
invalidate_cache(cache, msg);
});
}
}

View File

@@ -61,28 +61,22 @@ pub async fn task_main(
let conn_pool = conn_pool::GlobalConnPool::new(&config.http_config);
{
let conn_pool = Arc::clone(&conn_pool);
tokio::task::Builder::new()
.name("serverless pool gc")
.spawn(async move {
conn_pool.gc_worker(StdRng::from_entropy()).await;
})
.unwrap();
tokio::spawn(async move {
conn_pool.gc_worker(StdRng::from_entropy()).await;
});
}
// shutdown the connection pool
tokio::task::Builder::new()
.name("serverless pool shutdown")
.spawn({
let cancellation_token = cancellation_token.clone();
let conn_pool = conn_pool.clone();
async move {
cancellation_token.cancelled().await;
tokio::task::spawn_blocking(move || conn_pool.shutdown())
.await
.unwrap();
}
})
.unwrap();
tokio::spawn({
let cancellation_token = cancellation_token.clone();
let conn_pool = conn_pool.clone();
async move {
cancellation_token.cancelled().await;
tokio::task::spawn_blocking(move || conn_pool.shutdown())
.await
.unwrap();
}
});
let backend = Arc::new(PoolingBackend {
pool: Arc::clone(&conn_pool),
@@ -115,25 +109,20 @@ pub async fn task_main(
let conn_id = uuid::Uuid::new_v4();
let http_conn_span = tracing::info_span!("http_conn", ?conn_id);
tokio::task::Builder::new()
.name("serverless conn handler")
.spawn(
connections.track_future(
connection_handler(
config,
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
cancellation_token.clone(),
server.clone(),
tls_acceptor.clone(),
conn,
peer_addr,
)
.instrument(http_conn_span),
),
connections.spawn(
connection_handler(
config,
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
cancellation_token.clone(),
server.clone(),
tls_acceptor.clone(),
conn,
peer_addr,
)
.unwrap();
.instrument(http_conn_span),
);
}
connections.wait().await;
@@ -229,25 +218,20 @@ async fn connection_handler(
// `request_handler` is not cancel safe. It expects to be cancelled only at specific times.
// By spawning the future, we ensure it never gets cancelled until it decides to.
let handler = tokio::task::Builder::new()
.name("serverless request handler")
.spawn(
connections.track_future(
request_handler(
req,
config,
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
session_id,
peer_addr,
http_request_token,
)
.in_current_span()
.map_ok_or_else(api_error_into_response, |r| r),
),
let handler = connections.spawn(
request_handler(
req,
config,
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
session_id,
peer_addr,
http_request_token,
)
.unwrap();
.in_current_span()
.map_ok_or_else(api_error_into_response, |r| r),
);
async move {
let res = handler.await;
@@ -306,27 +290,17 @@ async fn request_handler(
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)
.map_err(|e| ApiError::BadRequest(e.into()))?;
tokio::task::Builder::new()
.name("websocket client conn")
.spawn(
ws_connections.track_future(
async move {
if let Err(e) = websocket::serve_websocket(
config,
ctx,
websocket,
cancellation_handler,
host,
)
ws_connections.spawn(
async move {
if let Err(e) =
websocket::serve_websocket(config, ctx, websocket, cancellation_handler, host)
.await
{
error!("error in websocket connection: {e:#}");
}
}
.instrument(span),
),
)
.unwrap();
{
error!("error in websocket connection: {e:#}");
}
}
.instrument(span),
);
// Return the response so the spawned future can continue.
Ok(response)

View File

@@ -107,6 +107,7 @@ impl PoolingBackend {
pool: self.pool.clone(),
},
&backend,
false, // do not allow self signed compute for http flow
)
.await
}

View File

@@ -492,7 +492,7 @@ pub fn poll_client<C: ClientInnerExt>(
let cancel = CancellationToken::new();
let cancelled = cancel.clone().cancelled_owned();
tokio::task::Builder::new().name("pooled conn").spawn(
tokio::spawn(
async move {
let _conn_gauge = conn_gauge;
let mut idle_timeout = pin!(tokio::time::sleep(idle));
@@ -565,7 +565,7 @@ pub fn poll_client<C: ClientInnerExt>(
}).await;
}
.instrument(span)).unwrap();
.instrument(span));
let inner = ClientInner {
inner: client,
session: tx,

View File

@@ -38,7 +38,6 @@ futures-sink = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown = { version = "0.14", features = ["raw"] }
hdrhistogram = { version = "7" }
hex = { version = "0.4", features = ["serde"] }
hmac = { version = "0.12", default-features = false, features = ["reset"] }
hyper = { version = "0.14", features = ["full"] }
@@ -67,9 +66,8 @@ sha2 = { version = "0.10", features = ["asm"] }
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
subtle = { version = "2" }
time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util", "tracing"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio-rustls = { version = "0.24" }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
toml_edit = { version = "0.19", features = ["serde"] }