Compare commits

...

7 Commits

Author SHA1 Message Date
Conrad Ludgate
30148035c9 more task names 2024-04-19 16:04:34 +01:00
Conrad Ludgate
f0fa688ad7 docker 2024-04-19 15:31:23 +01:00
Conrad Ludgate
39345e3f57 add task names 2024-04-19 15:11:43 +01:00
Conrad Ludgate
4d1b5992eb custom runtime threads 2024-04-19 15:02:52 +01:00
Conrad Ludgate
e8a5e0b0ed add tokio-console 2024-04-19 14:54:22 +01:00
Conrad Ludgate
1a979cd27e add more logs to metrics output 2024-04-19 14:11:45 +01:00
Conrad Ludgate
278ba8f8b5 proxy: simplify compute ssl setup 2024-04-19 13:55:51 +01:00
25 changed files with 415 additions and 224 deletions

View File

@@ -2,6 +2,7 @@
# This is only present for local builds, as it will be overridden
# by the RUSTDOCFLAGS env var in CI.
rustdocflags = ["-Arustdoc::private_intra_doc_links"]
rustflags = ["--cfg", "tokio_unstable"]
[alias]
build_testing = ["build", "--features", "testing"]

View File

@@ -214,6 +214,7 @@ jobs:
BUILD_TYPE: ${{ matrix.build_type }}
GIT_VERSION: ${{ github.event.pull_request.head.sha || github.sha }}
BUILD_TAG: ${{ needs.tag.outputs.build-tag }}
RUSTFLAGS: "--cfg=tokio_unstable"
steps:
- name: Fix git ownership

140
Cargo.lock generated
View File

@@ -1240,6 +1240,43 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "console-api"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787"
dependencies = [
"futures-core",
"prost 0.12.4",
"prost-types 0.12.4",
"tonic 0.10.2",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e"
dependencies = [
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures-task",
"hdrhistogram",
"humantime",
"prost-types 0.12.4",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic 0.10.2",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "const-oid"
version = "0.9.5"
@@ -3408,11 +3445,11 @@ dependencies = [
"opentelemetry-semantic-conventions",
"opentelemetry_api",
"opentelemetry_sdk",
"prost",
"prost 0.11.9",
"reqwest",
"thiserror",
"tokio",
"tonic",
"tonic 0.9.2",
]
[[package]]
@@ -3423,8 +3460,8 @@ checksum = "b1e3f814aa9f8c905d0ee4bde026afd3b2577a97c10e1699912e3e44f0c4cbeb"
dependencies = [
"opentelemetry_api",
"opentelemetry_sdk",
"prost",
"tonic",
"prost 0.11.9",
"tonic 0.9.2",
]
[[package]]
@@ -4009,17 +4046,6 @@ dependencies = [
"tokio-postgres",
]
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"native-tls",
"tokio",
"tokio-native-tls",
"tokio-postgres",
]
[[package]]
name = "postgres-protocol"
version = "0.6.4"
@@ -4234,7 +4260,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
dependencies = [
"bytes",
"prost-derive",
"prost-derive 0.11.9",
]
[[package]]
name = "prost"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922"
dependencies = [
"bytes",
"prost-derive 0.12.4",
]
[[package]]
@@ -4251,8 +4287,8 @@ dependencies = [
"multimap",
"petgraph",
"prettyplease 0.1.25",
"prost",
"prost-types",
"prost 0.11.9",
"prost-types 0.11.9",
"regex",
"syn 1.0.109",
"tempfile",
@@ -4272,13 +4308,35 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "prost-derive"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn 2.0.52",
]
[[package]]
name = "prost-types"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
dependencies = [
"prost",
"prost 0.11.9",
]
[[package]]
name = "prost-types"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3235c33eb02c1f1e212abdbe34c78b264b038fb58ca612664343271e36e55ffe"
dependencies = [
"prost 0.12.4",
]
[[package]]
@@ -4300,6 +4358,7 @@ dependencies = [
"camino-tempfile",
"chrono",
"clap",
"console-subscriber",
"consumption_metrics",
"dashmap",
"env_logger",
@@ -4324,7 +4383,6 @@ dependencies = [
"md5",
"measured",
"metrics",
"native-tls",
"once_cell",
"opentelemetry",
"parking_lot 0.12.1",
@@ -4332,7 +4390,6 @@ dependencies = [
"parquet_derive",
"pbkdf2",
"pin-project-lite",
"postgres-native-tls",
"postgres-protocol",
"postgres_backend",
"pq_proto",
@@ -4351,6 +4408,7 @@ dependencies = [
"rstest",
"rustc-hash",
"rustls 0.22.2",
"rustls-native-certs 0.7.0",
"rustls-pemfile 2.1.1",
"scopeguard",
"serde",
@@ -5717,10 +5775,10 @@ dependencies = [
"metrics",
"once_cell",
"parking_lot 0.12.1",
"prost",
"prost 0.11.9",
"tokio",
"tokio-stream",
"tonic",
"tonic 0.9.2",
"tonic-build",
"tracing",
"utils",
@@ -6112,6 +6170,7 @@ dependencies = [
"signal-hook-registry",
"socket2 0.5.5",
"tokio-macros",
"tracing",
"windows-sys 0.48.0",
]
@@ -6342,7 +6401,7 @@ dependencies = [
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost 0.11.9",
"rustls-native-certs 0.6.2",
"rustls-pemfile 1.0.2",
"tokio",
@@ -6354,6 +6413,33 @@ dependencies = [
"tracing",
]
[[package]]
name = "tonic"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64 0.21.1",
"bytes",
"h2 0.3.26",
"http 0.2.9",
"http-body 0.4.5",
"hyper 0.14.26",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost 0.12.4",
"tokio",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic-build"
version = "0.9.2"
@@ -7334,6 +7420,7 @@ dependencies = [
"futures-util",
"getrandom 0.2.11",
"hashbrown 0.14.0",
"hdrhistogram",
"hex",
"hmac",
"hyper 0.14.26",
@@ -7348,7 +7435,7 @@ dependencies = [
"num-traits",
"once_cell",
"parquet",
"prost",
"prost 0.11.9",
"rand 0.8.5",
"regex",
"regex-automata 0.4.3",
@@ -7367,10 +7454,11 @@ dependencies = [
"time-macros",
"tokio",
"tokio-rustls 0.24.0",
"tokio-stream",
"tokio-util",
"toml_datetime",
"toml_edit",
"tonic",
"tonic 0.9.2",
"tower",
"tracing",
"tracing-core",

View File

@@ -47,7 +47,7 @@ COPY --chown=nonroot . .
# Show build caching stats to check if it was used in the end.
# Has to be the part of the same RUN since cachepot daemon is killed in the end of this RUN, losing the compilation stats.
RUN set -e \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment" cargo build \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment --cfg=tokio_unstable" cargo build \
--bin pg_sni_router \
--bin pageserver \
--bin pagectl \

View File

@@ -67,6 +67,7 @@ routerify.workspace = true
rustc-hash.workspace = true
rustls-pemfile.workspace = true
rustls.workspace = true
rustls-native-certs = "0.7.0"
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
@@ -81,9 +82,10 @@ thiserror.workspace = true
tikv-jemallocator.workspace = true
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
tokio-postgres.workspace = true
tokio-postgres-rustls.workspace = true
tokio-rustls.workspace = true
tokio-util.workspace = true
tokio = { workspace = true, features = ["signal"] }
tokio = { workspace = true, features = ["signal", "tracing"] }
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
tracing-utils.workspace = true
@@ -94,11 +96,11 @@ utils.workspace = true
uuid.workspace = true
webpki-roots.workspace = true
x509-parser.workspace = true
native-tls.workspace = true
postgres-native-tls.workspace = true
postgres-protocol.workspace = true
redis.workspace = true
console-subscriber = "0.2.0"
workspace_hack.workspace = true
[dev-dependencies]
@@ -106,6 +108,5 @@ camino-tempfile.workspace = true
fallible-iterator.workspace = true
rcgen.workspace = true
rstest.workspace = true
tokio-postgres-rustls.workspace = true
walkdir.workspace = true
rand_distr = "0.4"

View File

@@ -121,6 +121,5 @@ pub(super) async fn authenticate(
Ok(NodeInfo {
config,
aux: db_info.aux,
allow_self_signed_compute: false, // caller may override
})
}

View File

@@ -35,6 +35,8 @@ use proxy::config::{self, ProxyConfig};
use proxy::serverless;
use std::net::SocketAddr;
use std::pin::pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
@@ -120,9 +122,6 @@ struct ProxyCliArgs {
/// lock for `wake_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable).
#[clap(long, default_value = config::WakeComputeLockOptions::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK)]
wake_compute_lock: String,
/// Allow self-signed certificates for compute nodes (for testing)
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
allow_self_signed_compute: bool,
#[clap(flatten)]
sql_over_http: SqlOverHttpArgs,
/// timeout for scram authentication protocol
@@ -236,8 +235,21 @@ struct SqlOverHttpArgs {
sql_over_http_pool_shards: usize,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
fn main() -> anyhow::Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("worker-{}", id)
})
.build()
.unwrap();
rt.block_on(main2())
}
async fn main2() -> anyhow::Result<()> {
let _logging_guard = proxy::logging::init().await?;
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
@@ -352,12 +364,16 @@ async fn main() -> anyhow::Result<()> {
// client facing tasks. these will exit on error or on cancellation
// cancellation returns Ok(())
let mut client_tasks = JoinSet::new();
client_tasks.spawn(proxy::proxy::task_main(
config,
proxy_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
));
client_tasks
.build_task()
.name("tcp main")
.spawn(proxy::proxy::task_main(
config,
proxy_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
))
.unwrap();
// TODO: rename the argument to something like serverless.
// It now covers more than just websockets, it also covers SQL over HTTP.
@@ -366,58 +382,98 @@ async fn main() -> anyhow::Result<()> {
info!("Starting wss on {serverless_address}");
let serverless_listener = TcpListener::bind(serverless_address).await?;
client_tasks.spawn(serverless::task_main(
config,
serverless_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
));
client_tasks
.build_task()
.name("serverless main")
.spawn(serverless::task_main(
config,
serverless_listener,
cancellation_token.clone(),
cancellation_handler.clone(),
))
.unwrap();
}
client_tasks.spawn(proxy::context::parquet::worker(
cancellation_token.clone(),
args.parquet_upload,
));
client_tasks
.build_task()
.name("parquet worker")
.spawn(proxy::context::parquet::worker(
cancellation_token.clone(),
args.parquet_upload,
))
.unwrap();
// maintenance tasks. these never return unless there's an error
let mut maintenance_tasks = JoinSet::new();
maintenance_tasks.spawn(proxy::handle_signals(cancellation_token.clone()));
maintenance_tasks.spawn(http::health_server::task_main(
http_listener,
AppMetrics {
jemalloc,
neon_metrics,
proxy: proxy::metrics::Metrics::get(),
},
));
maintenance_tasks.spawn(console::mgmt::task_main(mgmt_listener));
maintenance_tasks
.build_task()
.name("signal handler")
.spawn(proxy::handle_signals(cancellation_token.clone()))
.unwrap();
maintenance_tasks
.build_task()
.name("health server")
.spawn(http::health_server::task_main(
http_listener,
AppMetrics {
jemalloc,
neon_metrics,
proxy: proxy::metrics::Metrics::get(),
},
))
.unwrap();
maintenance_tasks
.build_task()
.name("mangement main")
.spawn(console::mgmt::task_main(mgmt_listener))
.unwrap();
if let Some(metrics_config) = &config.metric_collection {
// TODO: Add gc regardles of the metric collection being enabled.
maintenance_tasks.spawn(usage_metrics::task_main(metrics_config));
client_tasks.spawn(usage_metrics::task_backup(
&metrics_config.backup_metric_collection_config,
cancellation_token,
));
maintenance_tasks
.build_task()
.name("")
.spawn(usage_metrics::task_main(metrics_config))
.unwrap();
client_tasks
.build_task()
.name("")
.spawn(usage_metrics::task_backup(
&metrics_config.backup_metric_collection_config,
cancellation_token,
))
.unwrap();
}
if let auth::BackendType::Console(api, _) = &config.auth_backend {
if let proxy::console::provider::ConsoleBackend::Console(api) = &**api {
if let Some(redis_notifications_client) = redis_notifications_client {
let cache = api.caches.project_info.clone();
maintenance_tasks.spawn(notifications::task_main(
redis_notifications_client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
));
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
maintenance_tasks
.build_task()
.name("redis notifications")
.spawn(notifications::task_main(
redis_notifications_client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
))
.unwrap();
maintenance_tasks
.build_task()
.name("proj info cache gc")
.spawn(async move { cache.clone().gc_worker().await })
.unwrap();
}
if let Some(regional_redis_client) = regional_redis_client {
let cache = api.caches.endpoints_cache.clone();
let con = regional_redis_client;
let span = tracing::info_span!("endpoints_cache");
maintenance_tasks.spawn(async move { cache.do_read(con).await }.instrument(span));
maintenance_tasks
.build_task()
.name("redis endpoints cache read")
.spawn(async move { cache.do_read(con).await }.instrument(span))
.unwrap();
}
}
}
@@ -458,9 +514,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
};
if args.allow_self_signed_compute {
warn!("allowing self-signed compute certificates");
}
let backup_metric_collection_config = config::MetricBackupCollectionConfig {
interval: args.metric_backup_collection_interval,
remote_storage_config: remote_storage_from_toml(
@@ -525,7 +578,10 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
)
.unwrap(),
));
tokio::spawn(locks.garbage_collect_worker());
tokio::task::Builder::new()
.name("wake compute lock gc")
.spawn(locks.garbage_collect_worker())
.unwrap();
let url = args.auth_endpoint.parse()?;
let endpoint = http::Endpoint::new(url, http::new_client());
@@ -575,7 +631,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
tls_config,
auth_backend,
metric_collection,
allow_self_signed_compute: args.allow_self_signed_compute,
http_config,
authentication_config,
require_client_ip: args.require_client_ip,

View File

@@ -10,7 +10,12 @@ use crate::{
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
use std::{io, net::SocketAddr, time::Duration};
use std::{
io,
net::SocketAddr,
sync::{Arc, OnceLock},
time::Duration,
};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::tls::MakeTlsConnect;
@@ -28,9 +33,6 @@ pub enum ConnectionError {
#[error("{COULD_NOT_CONNECT}: {0}")]
CouldNotConnect(#[from] io::Error),
#[error("{COULD_NOT_CONNECT}: {0}")]
TlsError(#[from] native_tls::Error),
#[error("{COULD_NOT_CONNECT}: {0}")]
WakeComputeError(#[from] WakeComputeError),
}
@@ -69,7 +71,6 @@ impl ReportableError for ConnectionError {
}
ConnectionError::Postgres(_) => crate::error::ErrorKind::Compute,
ConnectionError::CouldNotConnect(_) => crate::error::ErrorKind::Compute,
ConnectionError::TlsError(_) => crate::error::ErrorKind::Compute,
ConnectionError::WakeComputeError(e) => e.get_error_kind(),
}
}
@@ -239,7 +240,7 @@ pub struct PostgresConnection {
/// Socket connected to a compute node.
pub stream: tokio_postgres::maybe_tls_stream::MaybeTlsStream<
tokio::net::TcpStream,
postgres_native_tls::TlsStream<tokio::net::TcpStream>,
tokio_postgres_rustls::RustlsStream<tokio::net::TcpStream>,
>,
/// PostgreSQL connection parameters.
pub params: std::collections::HashMap<String, String>,
@@ -251,22 +252,39 @@ pub struct PostgresConnection {
_guage: NumDbConnectionsGuard<'static>,
}
static ROOT_STORE: OnceLock<Arc<rustls::RootCertStore>> = OnceLock::new();
impl ConnCfg {
/// Connect to a corresponding compute node.
pub async fn connect(
&self,
ctx: &mut RequestMonitoring,
allow_self_signed_compute: bool,
aux: MetricsAuxInfo,
timeout: Duration,
) -> Result<PostgresConnection, ConnectionError> {
let (socket_addr, stream, host) = self.connect_raw(timeout).await?;
let tls_connector = native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(allow_self_signed_compute)
.build()
.unwrap();
let mut mk_tls = postgres_native_tls::MakeTlsConnector::new(tls_connector);
let root_store = ROOT_STORE.get_or_init(|| {
let mut roots = rustls::RootCertStore::empty();
let certs = match rustls_native_certs::load_native_certs() {
Ok(certs) => certs,
Err(e) => {
error!("could not load native ssl certs: {e:?}");
return Arc::new(roots);
}
};
let (added, ignored) = roots.add_parsable_certificates(certs);
info!(added, ignored, "loaded native ssl certifications");
Arc::new(roots)
});
let client_config = rustls::ClientConfig::builder()
.with_root_certificates(root_store.clone())
.with_no_client_auth();
let mut mk_tls = tokio_postgres_rustls::MakeRustlsConnect::new(client_config);
let tls = MakeTlsConnect::<tokio::net::TcpStream>::make_tls_connect(&mut mk_tls, host)?;
// connect_raw() will not use TLS if sslmode is "disable"

View File

@@ -24,7 +24,6 @@ pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
pub auth_backend: auth::BackendType<'static, (), ()>,
pub metric_collection: Option<MetricCollectionConfig>,
pub allow_self_signed_compute: bool,
pub http_config: HttpConfig,
pub authentication_config: AuthenticationConfig,
pub require_client_ip: bool,

View File

@@ -40,28 +40,31 @@ pub async fn task_main(listener: TcpListener) -> anyhow::Result<Infallible> {
let span = info_span!("mgmt", peer = %peer_addr);
tokio::task::spawn(
async move {
info!("serving a new console management API connection");
tokio::task::Builder::new()
.name("mgmt handler")
.spawn(
async move {
info!("serving a new console management API connection");
// these might be long running connections, have a separate logging for cancelling
// on shutdown and other ways of stopping.
let cancelled = scopeguard::guard(tracing::Span::current(), |span| {
let _e = span.entered();
info!("console management API task cancelled");
});
// these might be long running connections, have a separate logging for cancelling
// on shutdown and other ways of stopping.
let cancelled = scopeguard::guard(tracing::Span::current(), |span| {
let _e = span.entered();
info!("console management API task cancelled");
});
if let Err(e) = handle_connection(socket).await {
error!("serving failed with an error: {e}");
} else {
info!("serving completed");
if let Err(e) = handle_connection(socket).await {
error!("serving failed with an error: {e}");
} else {
info!("serving completed");
}
// we can no longer get dropped
scopeguard::ScopeGuard::into_inner(cancelled);
}
// we can no longer get dropped
scopeguard::ScopeGuard::into_inner(cancelled);
}
.instrument(span),
);
.instrument(span),
)
.unwrap();
}
}

View File

@@ -293,9 +293,6 @@ pub struct NodeInfo {
/// Labels for proxy's metrics.
pub aux: MetricsAuxInfo,
/// Whether we should accept self-signed certificates (for testing)
pub allow_self_signed_compute: bool,
}
impl NodeInfo {
@@ -304,17 +301,9 @@ impl NodeInfo {
ctx: &mut RequestMonitoring,
timeout: Duration,
) -> Result<compute::PostgresConnection, compute::ConnectionError> {
self.config
.connect(
ctx,
self.allow_self_signed_compute,
self.aux.clone(),
timeout,
)
.await
self.config.connect(ctx, self.aux.clone(), timeout).await
}
pub fn reuse_settings(&mut self, other: Self) {
self.allow_self_signed_compute = other.allow_self_signed_compute;
self.config.reuse_password(other.config);
}

View File

@@ -63,7 +63,10 @@ impl Api {
let (client, connection) =
tokio_postgres::connect(self.endpoint.as_str(), tokio_postgres::NoTls).await?;
tokio::spawn(connection);
tokio::task::Builder::new()
.name("mock conn")
.spawn(connection)
.unwrap();
let secret = match get_execute_postgres_query(
&client,
"select rolpassword from pg_catalog.pg_authid where rolname = $1",
@@ -126,7 +129,6 @@ impl Api {
branch_id: (&BranchId::from("branch")).into(),
cold_start_info: crate::console::messages::ColdStartInfo::Warm,
},
allow_self_signed_compute: false,
};
Ok(node)

View File

@@ -175,7 +175,6 @@ impl Api {
let node = NodeInfo {
config,
aux: body.aux,
allow_self_signed_compute: false,
};
Ok(node)

View File

@@ -141,12 +141,15 @@ pub async fn worker(
LOG_CHAN.set(tx.downgrade()).unwrap();
// setup row stream that will close on cancellation
tokio::spawn(async move {
cancellation_token.cancelled().await;
// dropping this sender will cause the channel to close only once
// all the remaining inflight requests have been completed.
drop(tx);
});
tokio::task::Builder::new()
.name("drop parquet conn")
.spawn(async move {
cancellation_token.cancelled().await;
// dropping this sender will cause the channel to close only once
// all the remaining inflight requests have been completed.
drop(tx);
})
.unwrap();
let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
let rx = rx.map(RequestData::from);

View File

@@ -75,6 +75,10 @@ async fn prometheus_metrics_handler(
let span = info_span!("blocking");
let body = tokio::task::spawn_blocking(move || {
// there are situations where we lose scraped metrics under load, try to gather some clues
// since all nodes are queried this, keep the message count low.
let spawned_at = std::time::Instant::now();
let _span = span.entered();
let mut state = state.lock().unwrap();
@@ -84,11 +88,19 @@ async fn prometheus_metrics_handler(
.collect_group_into(&mut *encoder)
.unwrap_or_else(|infallible| match infallible {});
let encoded_at = std::time::Instant::now();
let body = encoder.finish();
let spawned_in = spawned_at - started_at;
let encoded_in = encoded_at - spawned_at;
let total = encoded_at - started_at;
tracing::info!(
bytes = body.len(),
elapsed_ms = started_at.elapsed().as_millis(),
total_ms = total.as_millis(),
spawning_ms = spawned_in.as_millis(),
encoding_ms = encoded_in.as_millis(),
"responded /metrics"
);

View File

@@ -26,7 +26,12 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
.await
.map(OpenTelemetryLayer::new);
// spawn the console server in the background,
// returning a `Layer`:
let console_layer = console_subscriber::spawn();
tracing_subscriber::registry()
.with(console_layer)
.with(env_filter)
.with(otlp_layer)
.with(fmt_layer)

View File

@@ -87,7 +87,7 @@ pub async fn task_main(
tracing::info!(protocol = "tcp", %session_id, "accepted new TCP connection");
connections.spawn(async move {
tokio::task::Builder::new().name("tcp client connection").spawn(connections.track_future(async move {
let mut socket = WithClientIp::new(socket);
let mut peer_addr = peer_addr.ip();
match socket.wait_for_addr().await {
@@ -152,7 +152,7 @@ pub async fn task_main(
}
}
}
});
})).unwrap();
}
connections.close();
@@ -178,13 +178,6 @@ impl ClientMode {
}
}
pub fn allow_self_signed_compute(&self, config: &ProxyConfig) -> bool {
match self {
ClientMode::Tcp => config.allow_self_signed_compute,
ClientMode::Websockets { .. } => false,
}
}
fn hostname<'a, S>(&'a self, s: &'a Stream<S>) -> Option<&'a str> {
match self {
ClientMode::Tcp => s.sni_hostname(),
@@ -303,14 +296,9 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
}
};
let mut node = connect_to_compute(
ctx,
&TcpMechanism { params: &params },
&user_info,
mode.allow_self_signed_compute(config),
)
.or_else(|e| stream.throw_error(e))
.await?;
let mut node = connect_to_compute(ctx, &TcpMechanism { params: &params }, &user_info)
.or_else(|e| stream.throw_error(e))
.await?;
let session = cancellation_handler.get_session();
prepare_client_connection(&node, &session, &mut stream).await?;

View File

@@ -92,7 +92,6 @@ pub async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
ctx: &mut RequestMonitoring,
mechanism: &M,
user_info: &B,
allow_self_signed_compute: bool,
) -> Result<M::Connection, M::Error>
where
M::ConnectError: ShouldRetry + std::fmt::Debug,
@@ -103,8 +102,6 @@ where
if let Some(keys) = user_info.get_keys() {
node_info.set_keys(keys);
}
node_info.allow_self_signed_compute = allow_self_signed_compute;
// let mut node_info = credentials.get_node_info(ctx, user_info).await?;
mechanism.update_connect_config(&mut node_info.config);
// try once

View File

@@ -519,7 +519,6 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
branch_id: (&BranchId::from("branch")).into(),
cold_start_info: crate::console::messages::ColdStartInfo::Warm,
},
allow_self_signed_compute: false,
};
let (_, node) = cache.insert("key".into(), node);
node
@@ -549,7 +548,7 @@ async fn connect_to_compute_success() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
connect_to_compute(&mut ctx, &mechanism, &user_info)
.await
.unwrap();
mechanism.verify();
@@ -562,7 +561,7 @@ async fn connect_to_compute_retry() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
connect_to_compute(&mut ctx, &mechanism, &user_info)
.await
.unwrap();
mechanism.verify();
@@ -576,7 +575,7 @@ async fn connect_to_compute_non_retry_1() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Fail]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
connect_to_compute(&mut ctx, &mechanism, &user_info)
.await
.unwrap_err();
mechanism.verify();
@@ -590,7 +589,7 @@ async fn connect_to_compute_non_retry_2() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, Fail, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
connect_to_compute(&mut ctx, &mechanism, &user_info)
.await
.unwrap();
mechanism.verify();
@@ -608,7 +607,7 @@ async fn connect_to_compute_non_retry_3() {
Retry, Retry, Retry, Retry, Retry, /* the 17th time */ Retry,
]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
connect_to_compute(&mut ctx, &mechanism, &user_info)
.await
.unwrap_err();
mechanism.verify();
@@ -622,7 +621,7 @@ async fn wake_retry() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![WakeRetry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
connect_to_compute(&mut ctx, &mechanism, &user_info)
.await
.unwrap();
mechanism.verify();
@@ -636,7 +635,7 @@ async fn wake_non_retry() {
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![WakeRetry, WakeFail]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
connect_to_compute(&mut ctx, &mechanism, &user_info)
.await
.unwrap_err();
mechanism.verify();

View File

@@ -108,10 +108,12 @@ impl ConnectionWithCredentialsProvider {
if let Credentials::Dynamic(credentials_provider, _) = &self.credentials {
let credentials_provider = credentials_provider.clone();
let con2 = con.clone();
let f = tokio::spawn(async move {
let _ = Self::keep_connection(con2, credentials_provider).await;
});
self.refresh_token_task = Some(f);
let f = tokio::task::Builder::new()
.name("redis keep connection")
.spawn(async move {
let _ = Self::keep_connection(con2, credentials_provider).await;
});
self.refresh_token_task = Some(f.unwrap());
}
match Self::ping(&mut con).await {
Ok(()) => {

View File

@@ -142,10 +142,13 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
// To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds.
// TODO: include the version (or the timestamp) in the message and invalidate only if the entry is cached before the message.
let cache = self.cache.clone();
tokio::spawn(async move {
tokio::time::sleep(INVALIDATION_LAG).await;
invalidate_cache(cache, msg);
});
tokio::task::Builder::new()
.name("invalidate cache lazy")
.spawn(async move {
tokio::time::sleep(INVALIDATION_LAG).await;
invalidate_cache(cache, msg);
})
.unwrap();
}
}

View File

@@ -61,22 +61,28 @@ pub async fn task_main(
let conn_pool = conn_pool::GlobalConnPool::new(&config.http_config);
{
let conn_pool = Arc::clone(&conn_pool);
tokio::spawn(async move {
conn_pool.gc_worker(StdRng::from_entropy()).await;
});
tokio::task::Builder::new()
.name("serverless pool gc")
.spawn(async move {
conn_pool.gc_worker(StdRng::from_entropy()).await;
})
.unwrap();
}
// shutdown the connection pool
tokio::spawn({
let cancellation_token = cancellation_token.clone();
let conn_pool = conn_pool.clone();
async move {
cancellation_token.cancelled().await;
tokio::task::spawn_blocking(move || conn_pool.shutdown())
.await
.unwrap();
}
});
tokio::task::Builder::new()
.name("serverless pool shutdown")
.spawn({
let cancellation_token = cancellation_token.clone();
let conn_pool = conn_pool.clone();
async move {
cancellation_token.cancelled().await;
tokio::task::spawn_blocking(move || conn_pool.shutdown())
.await
.unwrap();
}
})
.unwrap();
let backend = Arc::new(PoolingBackend {
pool: Arc::clone(&conn_pool),
@@ -109,20 +115,25 @@ pub async fn task_main(
let conn_id = uuid::Uuid::new_v4();
let http_conn_span = tracing::info_span!("http_conn", ?conn_id);
connections.spawn(
connection_handler(
config,
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
cancellation_token.clone(),
server.clone(),
tls_acceptor.clone(),
conn,
peer_addr,
tokio::task::Builder::new()
.name("serverless conn handler")
.spawn(
connections.track_future(
connection_handler(
config,
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
cancellation_token.clone(),
server.clone(),
tls_acceptor.clone(),
conn,
peer_addr,
)
.instrument(http_conn_span),
),
)
.instrument(http_conn_span),
);
.unwrap();
}
connections.wait().await;
@@ -218,20 +229,25 @@ async fn connection_handler(
// `request_handler` is not cancel safe. It expects to be cancelled only at specific times.
// By spawning the future, we ensure it never gets cancelled until it decides to.
let handler = connections.spawn(
request_handler(
req,
config,
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
session_id,
peer_addr,
http_request_token,
let handler = tokio::task::Builder::new()
.name("serverless request handler")
.spawn(
connections.track_future(
request_handler(
req,
config,
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
session_id,
peer_addr,
http_request_token,
)
.in_current_span()
.map_ok_or_else(api_error_into_response, |r| r),
),
)
.in_current_span()
.map_ok_or_else(api_error_into_response, |r| r),
);
.unwrap();
async move {
let res = handler.await;
@@ -290,17 +306,27 @@ async fn request_handler(
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)
.map_err(|e| ApiError::BadRequest(e.into()))?;
ws_connections.spawn(
async move {
if let Err(e) =
websocket::serve_websocket(config, ctx, websocket, cancellation_handler, host)
tokio::task::Builder::new()
.name("websocket client conn")
.spawn(
ws_connections.track_future(
async move {
if let Err(e) = websocket::serve_websocket(
config,
ctx,
websocket,
cancellation_handler,
host,
)
.await
{
error!("error in websocket connection: {e:#}");
}
}
.instrument(span),
);
{
error!("error in websocket connection: {e:#}");
}
}
.instrument(span),
),
)
.unwrap();
// Return the response so the spawned future can continue.
Ok(response)

View File

@@ -107,7 +107,6 @@ impl PoolingBackend {
pool: self.pool.clone(),
},
&backend,
false, // do not allow self signed compute for http flow
)
.await
}

View File

@@ -492,7 +492,7 @@ pub fn poll_client<C: ClientInnerExt>(
let cancel = CancellationToken::new();
let cancelled = cancel.clone().cancelled_owned();
tokio::spawn(
tokio::task::Builder::new().name("pooled conn").spawn(
async move {
let _conn_gauge = conn_gauge;
let mut idle_timeout = pin!(tokio::time::sleep(idle));
@@ -565,7 +565,7 @@ pub fn poll_client<C: ClientInnerExt>(
}).await;
}
.instrument(span));
.instrument(span)).unwrap();
let inner = ClientInner {
inner: client,
session: tx,

View File

@@ -38,6 +38,7 @@ futures-sink = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown = { version = "0.14", features = ["raw"] }
hdrhistogram = { version = "7" }
hex = { version = "0.4", features = ["serde"] }
hmac = { version = "0.12", default-features = false, features = ["reset"] }
hyper = { version = "0.14", features = ["full"] }
@@ -66,8 +67,9 @@ sha2 = { version = "0.10", features = ["asm"] }
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
subtle = { version = "2" }
time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util", "tracing"] }
tokio-rustls = { version = "0.24" }
tokio-stream = { version = "0.1", features = ["net"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
toml_edit = { version = "0.19", features = ["serde"] }