From 9e1eb69d5543aef874152c6af32889e3b806850a Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 6 Oct 2022 18:42:05 +0300 Subject: [PATCH 1/3] Increase default compaction_period setting to 20 s. The previous default of 1 s caused excessive CPU usage when there were a lot of projects. Polling every timeline once a second was too aggressive so let's reduce it. Fixes https://github.com/neondatabase/neon/issues/2542, but we probably also want do to something so that we don't poll timelines that have received no new WAL or layers since last check. --- pageserver/src/tenant_config.rs | 2 +- test_runner/regress/test_tenant_conf.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pageserver/src/tenant_config.rs b/pageserver/src/tenant_config.rs index 4c5d5cc3f3..dc1b9353a6 100644 --- a/pageserver/src/tenant_config.rs +++ b/pageserver/src/tenant_config.rs @@ -24,7 +24,7 @@ pub mod defaults { // This parameter determines L1 layer file size. pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024; - pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s"; + pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s"; pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10; pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; diff --git a/test_runner/regress/test_tenant_conf.py b/test_runner/regress/test_tenant_conf.py index c6cf416d12..46a945a58b 100644 --- a/test_runner/regress/test_tenant_conf.py +++ b/test_runner/regress/test_tenant_conf.py @@ -54,7 +54,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}""" for i in { "checkpoint_distance": 10000, "compaction_target_size": 1048576, - "compaction_period": 1, + "compaction_period": 20, "compaction_threshold": 10, "gc_horizon": 67108864, "gc_period": 100, @@ -74,7 +74,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}""" for i in { "checkpoint_distance": 20000, "compaction_target_size": 1048576, - "compaction_period": 1, + "compaction_period": 20, "compaction_threshold": 10, "gc_horizon": 67108864, "gc_period": 30, @@ -102,7 +102,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}""" for i in { "checkpoint_distance": 15000, "compaction_target_size": 1048576, - "compaction_period": 1, + "compaction_period": 20, "compaction_threshold": 10, "gc_horizon": 67108864, "gc_period": 80, @@ -125,7 +125,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}""" for i in { "checkpoint_distance": 15000, "compaction_target_size": 1048576, - "compaction_period": 1, + "compaction_period": 20, "compaction_threshold": 10, "gc_horizon": 67108864, "gc_period": 80, From 8e51c27e1ad522aab5c7ab64208fdaf7bb0d181d Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 7 Oct 2022 13:58:31 +0300 Subject: [PATCH 2/3] Restore artifact versions (#2578) Context: https://github.com/neondatabase/neon/pull/2128/files#r989489965 Co-authored-by: Rory de Zoete --- .github/workflows/build_and_test.yml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 6556fb6c9b..7cc8715526 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -494,7 +494,7 @@ jobs: run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json - name: Kaniko build neon - run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID + run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID compute-tools-image: runs-on: dev @@ -508,7 +508,7 @@ jobs: run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json - name: Kaniko build compute tools - run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID + run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID compute-node-image: runs-on: dev @@ -527,7 +527,7 @@ jobs: # cloud repo depends on this image name, thus duplicating it # remove compute-node when cloud repo is updated - name: Kaniko build compute node with extensions v14 (compatibility) - run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID + run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID compute-node-image-v14: runs-on: dev @@ -543,7 +543,7 @@ jobs: run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json - name: Kaniko build compute node with extensions v14 - run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:$GITHUB_RUN_ID + run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:$GITHUB_RUN_ID compute-node-image-v15: @@ -560,7 +560,7 @@ jobs: run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json - name: Kaniko build compute node with extensions v15 - run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --dockerfile Dockerfile.compute-node-v15 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:$GITHUB_RUN_ID + run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v15 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:$GITHUB_RUN_ID promote-images: runs-on: dev From e516c376d66aa8d5363a35f32a4b0267c454ad4b Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Fri, 7 Oct 2022 14:34:57 +0300 Subject: [PATCH 3/3] [proxy] Improve logging (#2554) * [proxy] Use `tracing::*` instead of `println!` for logging * Fix a minor misnomer * Log more stuff --- Cargo.lock | 3 ++ libs/utils/src/pq_proto.rs | 14 ++++++++ proxy/Cargo.toml | 5 ++- proxy/src/auth/backend.rs | 23 ++++++++++++-- proxy/src/auth/backend/console.rs | 18 ++++++----- proxy/src/auth/backend/link.rs | 7 ++-- proxy/src/auth/credentials.rs | 8 +++++ proxy/src/cancellation.rs | 11 +++++-- proxy/src/compute.rs | 12 +++++-- proxy/src/http/server.rs | 5 +-- proxy/src/main.rs | 20 ++++++++---- proxy/src/mgmt.rs | 11 ++++--- proxy/src/proxy.rs | 53 +++++++++++++++++++++---------- 13 files changed, 140 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ab508c7109..8488fc4f9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2452,6 +2452,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "atty", "base64", "bstr", "bytes", @@ -2485,6 +2486,8 @@ dependencies = [ "tokio-postgres", "tokio-postgres-rustls", "tokio-rustls", + "tracing", + "tracing-subscriber", "url", "utils", "uuid", diff --git a/libs/utils/src/pq_proto.rs b/libs/utils/src/pq_proto.rs index 21952ab87e..8c4e297f82 100644 --- a/libs/utils/src/pq_proto.rs +++ b/libs/utils/src/pq_proto.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; use std::{ borrow::Cow, collections::HashMap, + fmt, future::Future, io::{self, Cursor}, str, @@ -124,6 +125,19 @@ pub struct CancelKeyData { pub cancel_key: i32, } +impl fmt::Display for CancelKeyData { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let hi = (self.backend_pid as u64) << 32; + let lo = self.cancel_key as u64; + let id = hi | lo; + + // This format is more compact and might work better for logs. + f.debug_tuple("CancelKeyData") + .field(&format_args!("{:x}", id)) + .finish() + } +} + use rand::distributions::{Distribution, Standard}; impl Distribution for Standard { fn sample(&self, rng: &mut R) -> CancelKeyData { diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 7d0449cd1a..8049737989 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [dependencies] anyhow = "1.0" -async-trait = "0.1" +atty = "0.2.14" base64 = "0.13.0" bstr = "0.2.17" bytes = { version = "1.0.1", features = ['serde'] } @@ -35,6 +35,8 @@ thiserror = "1.0.30" tokio = { version = "1.17", features = ["macros"] } tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } tokio-rustls = "0.23.0" +tracing = "0.1.36" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = "2.2.2" uuid = { version = "0.8.2", features = ["v4", "serde"]} x509-parser = "0.13.2" @@ -44,6 +46,7 @@ metrics = { path = "../libs/metrics" } workspace_hack = { version = "0.1", path = "../workspace_hack" } [dev-dependencies] +async-trait = "0.1" rcgen = "0.8.14" rstest = "0.12" tokio-postgres-rustls = "0.9.0" diff --git a/proxy/src/auth/backend.rs b/proxy/src/auth/backend.rs index 7e93a32950..bb919770c1 100644 --- a/proxy/src/auth/backend.rs +++ b/proxy/src/auth/backend.rs @@ -15,6 +15,7 @@ use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::{info, warn}; static CPLANE_WAITERS: Lazy> = Lazy::new(Default::default); @@ -171,6 +172,8 @@ impl BackendType<'_, ClientCredentials<'_>> { // support SNI or other means of passing the project name. // We now expect to see a very specific payload in the place of password. if creds.project().is_none() { + warn!("project name not specified, resorting to the password hack auth flow"); + let payload = AuthFlow::new(client) .begin(auth::PasswordHack) .await? @@ -179,6 +182,7 @@ impl BackendType<'_, ClientCredentials<'_>> { // Finally we may finish the initialization of `creds`. // TODO: add missing type safety to ClientCredentials. + info!(project = &payload.project, "received missing parameter"); creds.project = Some(payload.project.into()); let mut config = match &self { @@ -196,6 +200,7 @@ impl BackendType<'_, ClientCredentials<'_>> { // We should use a password from payload as well. config.password(payload.password); + info!("user successfully authenticated (using the password hack)"); return Ok(compute::NodeInfo { reported_auth_ok: false, config, @@ -203,19 +208,31 @@ impl BackendType<'_, ClientCredentials<'_>> { } } - match self { + let res = match self { Console(endpoint, creds) => { + info!( + user = creds.user, + project = creds.project(), + "performing authentication using the console" + ); console::Api::new(&endpoint, extra, &creds) .handle_user(client) .await } Postgres(endpoint, creds) => { + info!("performing mock authentication using a local postgres instance"); postgres::Api::new(&endpoint, &creds) .handle_user(client) .await } // NOTE: this auth backend doesn't use client credentials. - Link(url) => link::handle_user(&url, client).await, - } + Link(url) => { + info!("performing link authentication"); + link::handle_user(&url, client).await + } + }?; + + info!("user successfully authenticated"); + Ok(res) } } diff --git a/proxy/src/auth/backend/console.rs b/proxy/src/auth/backend/console.rs index a351b82c6a..7dbb173b88 100644 --- a/proxy/src/auth/backend/console.rs +++ b/proxy/src/auth/backend/console.rs @@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize}; use std::future::Future; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::info; const REQUEST_FAILED: &str = "Console request failed"; @@ -148,10 +149,11 @@ impl<'a> Api<'a> { } async fn get_auth_info(&self) -> Result { + let request_id = uuid::Uuid::new_v4().to_string(); let req = self .endpoint .get("proxy_get_role_secret") - .header("X-Request-ID", uuid::Uuid::new_v4().to_string()) + .header("X-Request-ID", &request_id) .query(&[("session_id", self.extra.session_id)]) .query(&[ ("application_name", self.extra.application_name), @@ -160,9 +162,7 @@ impl<'a> Api<'a> { ]) .build()?; - // TODO: use a proper logger - println!("cplane request: {}", req.url()); - + info!(id = request_id, url = req.url().as_str(), "request"); let resp = self.endpoint.execute(req).await?; if !resp.status().is_success() { return Err(TransportError::HttpStatus(resp.status()).into()); @@ -177,10 +177,11 @@ impl<'a> Api<'a> { /// Wake up the compute node and return the corresponding connection info. pub(super) async fn wake_compute(&self) -> Result { + let request_id = uuid::Uuid::new_v4().to_string(); let req = self .endpoint .get("proxy_wake_compute") - .header("X-Request-ID", uuid::Uuid::new_v4().to_string()) + .header("X-Request-ID", &request_id) .query(&[("session_id", self.extra.session_id)]) .query(&[ ("application_name", self.extra.application_name), @@ -188,9 +189,7 @@ impl<'a> Api<'a> { ]) .build()?; - // TODO: use a proper logger - println!("cplane request: {}", req.url()); - + info!(id = request_id, url = req.url().as_str(), "request"); let resp = self.endpoint.execute(req).await?; if !resp.status().is_success() { return Err(TransportError::HttpStatus(resp.status()).into()); @@ -227,15 +226,18 @@ where GetAuthInfo: Future>, WakeCompute: Future>, { + info!("fetching user's authentication info"); let auth_info = get_auth_info(endpoint).await?; let flow = AuthFlow::new(client); let scram_keys = match auth_info { AuthInfo::Md5(_) => { // TODO: decide if we should support MD5 in api v2 + info!("auth endpoint chooses MD5"); return Err(auth::AuthError::bad_auth_method("MD5")); } AuthInfo::Scram(secret) => { + info!("auth endpoint chooses SCRAM"); let scram = auth::Scram(&secret); Some(compute::ScramKeys { client_key: flow.begin(scram).await?.authenticate().await?.as_bytes(), diff --git a/proxy/src/auth/backend/link.rs b/proxy/src/auth/backend/link.rs index eefa246eba..863ed53645 100644 --- a/proxy/src/auth/backend/link.rs +++ b/proxy/src/auth/backend/link.rs @@ -1,6 +1,7 @@ use crate::{auth, compute, error::UserFacingError, stream::PqStream, waiters}; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::info; use utils::pq_proto::{BeMessage as Be, BeParameterStatusMessage}; #[derive(Debug, Error)] @@ -53,14 +54,16 @@ pub async fn handle_user( let greeting = hello_message(link_uri, &psql_session_id); let db_info = super::with_waiter(psql_session_id, |waiter| async { - // Give user a URL to spawn a new database + // Give user a URL to spawn a new database. + info!("sending the auth URL to the user"); client .write_message_noflush(&Be::AuthenticationOk)? .write_message_noflush(&BeParameterStatusMessage::encoding())? .write_message(&Be::NoticeResponse(&greeting)) .await?; - // Wait for web console response (see `mgmt`) + // Wait for web console response (see `mgmt`). + info!("waiting for console's reply..."); waiter.await?.map_err(LinkAuthError::AuthFailed) }) .await?; diff --git a/proxy/src/auth/credentials.rs b/proxy/src/auth/credentials.rs index e43bcf8791..57128a61f5 100644 --- a/proxy/src/auth/credentials.rs +++ b/proxy/src/auth/credentials.rs @@ -3,6 +3,7 @@ use crate::error::UserFacingError; use std::borrow::Cow; use thiserror::Error; +use tracing::info; use utils::pq_proto::StartupMessageParams; #[derive(Debug, Error, PartialEq, Eq, Clone)] @@ -82,6 +83,13 @@ impl<'a> ClientCredentials<'a> { } .transpose()?; + info!( + user = user, + dbname = dbname, + project = project.as_deref(), + "credentials" + ); + Ok(Self { user, dbname, diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index eb9312e6bb..404533ad42 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -4,6 +4,7 @@ use parking_lot::Mutex; use std::net::SocketAddr; use tokio::net::TcpStream; use tokio_postgres::{CancelToken, NoTls}; +use tracing::info; use utils::pq_proto::CancelKeyData; /// Enables serving `CancelRequest`s. @@ -18,8 +19,9 @@ impl CancelMap { .lock() .get(&key) .and_then(|x| x.clone()) - .with_context(|| format!("unknown session: {:?}", key))?; + .with_context(|| format!("query cancellation key not found: {key}"))?; + info!("cancelling query per user's request using key {key}"); cancel_closure.try_cancel_query().await } @@ -41,14 +43,16 @@ impl CancelMap { self.0 .lock() .try_insert(key, None) - .map_err(|_| anyhow!("session already exists: {:?}", key))?; + .map_err(|_| anyhow!("query cancellation key already exists: {key}"))?; // This will guarantee that the session gets dropped // as soon as the future is finished. scopeguard::defer! { self.0.lock().remove(&key); + info!("dropped query cancellation key {key}"); } + info!("registered new query cancellation key {key}"); let session = Session::new(key, self); f(session).await } @@ -102,10 +106,13 @@ impl<'a> Session<'a> { fn new(key: CancelKeyData, cancel_map: &'a CancelMap) -> Self { Self { key, cancel_map } } +} +impl Session<'_> { /// Store the cancel token for the given session. /// This enables query cancellation in [`crate::proxy::handshake`]. pub fn enable_query_cancellation(self, cancel_closure: CancelClosure) -> CancelKeyData { + info!("enabling query cancellation for this session"); self.cancel_map .0 .lock() diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 4ae44ded57..8e4caf6eeb 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -5,6 +5,7 @@ use std::{io, net::SocketAddr}; use thiserror::Error; use tokio::net::TcpStream; use tokio_postgres::NoTls; +use tracing::{error, info}; use utils::pq_proto::StartupMessageParams; #[derive(Debug, Error)] @@ -54,6 +55,7 @@ impl NodeInfo { use tokio_postgres::config::Host; let connect_once = |host, port| { + info!("trying to connect to a compute node at {host}:{port}"); TcpStream::connect((host, port)).and_then(|socket| async { let socket_addr = socket.peer_addr()?; // This prevents load balancer from severing the connection. @@ -72,7 +74,11 @@ impl NodeInfo { if ports.len() > 1 && ports.len() != hosts.len() { return Err(io::Error::new( io::ErrorKind::Other, - format!("couldn't connect: bad compute config, ports and hosts entries' count does not match: {:?}", self.config), + format!( + "couldn't connect: bad compute config, \ + ports and hosts entries' count does not match: {:?}", + self.config + ), )); } @@ -88,7 +94,7 @@ impl NodeInfo { Ok(socket) => return Ok(socket), Err(err) => { // We can't throw an error here, as there might be more hosts to try. - println!("failed to connect to compute `{host}:{port}`: {err}"); + error!("failed to connect to a compute node at {host}:{port}: {err}"); connection_error = Some(err); } } @@ -160,8 +166,8 @@ impl NodeInfo { .ok_or(ConnectionError::FailedToFetchPgVersion)? .into(); + info!("connected to user's compute node at {socket_addr}"); let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token()); - let db = PostgresConnection { stream, version }; Ok((db, cancel_closure)) diff --git a/proxy/src/http/server.rs b/proxy/src/http/server.rs index 5a75718742..05f6feb307 100644 --- a/proxy/src/http/server.rs +++ b/proxy/src/http/server.rs @@ -1,6 +1,7 @@ use anyhow::anyhow; use hyper::{Body, Request, Response, StatusCode}; use std::net::TcpListener; +use tracing::info; use utils::http::{endpoint, error::ApiError, json::json_response, RouterBuilder, RouterService}; async fn status_handler(_: Request) -> Result, ApiError> { @@ -12,9 +13,9 @@ fn make_router() -> RouterBuilder { router.get("/v1/status", status_handler) } -pub async fn thread_main(http_listener: TcpListener) -> anyhow::Result<()> { +pub async fn task_main(http_listener: TcpListener) -> anyhow::Result<()> { scopeguard::defer! { - println!("http has shut down"); + info!("http has shut down"); } let service = || RouterService::new(make_router().build()?); diff --git a/proxy/src/main.rs b/proxy/src/main.rs index f2dc7425ba..2e6c365d32 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -25,6 +25,7 @@ use config::ProxyConfig; use futures::FutureExt; use std::{borrow::Cow, future::Future, net::SocketAddr}; use tokio::{net::TcpListener, task::JoinError}; +use tracing::info; use utils::project_git_version; project_git_version!(GIT_VERSION); @@ -38,6 +39,11 @@ async fn flatten_err( #[tokio::main] async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_ansi(atty::is(atty::Stream::Stdout)) + .with_target(false) + .init(); + let arg_matches = clap::App::new("Neon proxy/router") .version(GIT_VERSION) .arg( @@ -140,22 +146,22 @@ async fn main() -> anyhow::Result<()> { auth_backend, })); - println!("Version: {GIT_VERSION}"); - println!("Authentication backend: {}", config.auth_backend); + info!("Version: {GIT_VERSION}"); + info!("Authentication backend: {}", config.auth_backend); // Check that we can bind to address before further initialization - println!("Starting http on {}", http_address); + info!("Starting http on {http_address}"); let http_listener = TcpListener::bind(http_address).await?.into_std()?; - println!("Starting mgmt on {}", mgmt_address); + info!("Starting mgmt on {mgmt_address}"); let mgmt_listener = TcpListener::bind(mgmt_address).await?.into_std()?; - println!("Starting proxy on {}", proxy_address); + info!("Starting proxy on {proxy_address}"); let proxy_listener = TcpListener::bind(proxy_address).await?; let tasks = [ - tokio::spawn(http::server::thread_main(http_listener)), - tokio::spawn(proxy::thread_main(config, proxy_listener)), + tokio::spawn(http::server::task_main(http_listener)), + tokio::spawn(proxy::task_main(config, proxy_listener)), tokio::task::spawn_blocking(move || mgmt::thread_main(mgmt_listener)), ] .map(flatten_err); diff --git a/proxy/src/mgmt.rs b/proxy/src/mgmt.rs index 8737d170b1..67693b1fb0 100644 --- a/proxy/src/mgmt.rs +++ b/proxy/src/mgmt.rs @@ -5,6 +5,7 @@ use std::{ net::{TcpListener, TcpStream}, thread, }; +use tracing::{error, info}; use utils::{ postgres_backend::{self, AuthType, PostgresBackend}, pq_proto::{BeMessage, SINGLE_COL_ROWDESC}, @@ -19,7 +20,7 @@ use utils::{ /// pub fn thread_main(listener: TcpListener) -> anyhow::Result<()> { scopeguard::defer! { - println!("mgmt has shut down"); + info!("mgmt has shut down"); } listener @@ -27,14 +28,14 @@ pub fn thread_main(listener: TcpListener) -> anyhow::Result<()> { .context("failed to set listener to blocking")?; loop { let (socket, peer_addr) = listener.accept().context("failed to accept a new client")?; - println!("accepted connection from {}", peer_addr); + info!("accepted connection from {peer_addr}"); socket .set_nodelay(true) .context("failed to set client socket option")?; thread::spawn(move || { if let Err(err) = handle_connection(socket) { - println!("error: {}", err); + error!("{err}"); } }); } @@ -102,14 +103,14 @@ impl postgres_backend::Handler for MgmtHandler { let res = try_process_query(pgb, query_string); // intercept and log error message if res.is_err() { - println!("Mgmt query failed: #{:?}", res); + error!("mgmt query failed: {res:?}"); } res } } fn try_process_query(pgb: &mut PostgresBackend, query_string: &str) -> anyhow::Result<()> { - println!("Got mgmt query [redacted]"); // Content contains password, don't print it + info!("got mgmt query [redacted]"); // Content contains password, don't print it let resp: PsqlSessionResponse = serde_json::from_str(query_string)?; diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index efb1b6f358..5dcaa000cf 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -8,6 +8,7 @@ use metrics::{register_int_counter, IntCounter}; use once_cell::sync::Lazy; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; +use tracing::{error, info, info_span, Instrument}; use utils::pq_proto::{BeMessage as Be, *}; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; @@ -43,17 +44,17 @@ where F: std::future::Future>, { future.await.map_err(|err| { - println!("error: {}", err); + error!("{err}"); err }) } -pub async fn thread_main( +pub async fn task_main( config: &'static ProxyConfig, listener: tokio::net::TcpListener, ) -> anyhow::Result<()> { scopeguard::defer! { - println!("proxy has shut down"); + info!("proxy has shut down"); } // When set for the server socket, the keepalive setting @@ -63,22 +64,29 @@ pub async fn thread_main( let cancel_map = Arc::new(CancelMap::default()); loop { let (socket, peer_addr) = listener.accept().await?; - println!("accepted connection from {}", peer_addr); + info!("accepted connection from {peer_addr}"); + let session_id = uuid::Uuid::new_v4(); let cancel_map = Arc::clone(&cancel_map); - tokio::spawn(log_error(async move { - socket - .set_nodelay(true) - .context("failed to set socket option")?; + tokio::spawn( + log_error(async move { + info!("spawned a task for {peer_addr}"); - handle_client(config, &cancel_map, socket).await - })); + socket + .set_nodelay(true) + .context("failed to set socket option")?; + + handle_client(config, &cancel_map, session_id, socket).await + }) + .instrument(info_span!("client", session = format_args!("{session_id}"))), + ); } } async fn handle_client( config: &ProxyConfig, cancel_map: &CancelMap, + session_id: uuid::Uuid, stream: impl AsyncRead + AsyncWrite + Unpin + Send, ) -> anyhow::Result<()> { // The `closed` counter will increase when this future is destroyed. @@ -88,7 +96,8 @@ async fn handle_client( } let tls = config.tls_config.as_ref(); - let (mut stream, params) = match handshake(stream, tls, cancel_map).await? { + let do_handshake = handshake(stream, tls, cancel_map).instrument(info_span!("handshake")); + let (mut stream, params) = match do_handshake.await? { Some(x) => x, None => return Ok(()), // it's a cancellation request }; @@ -106,7 +115,7 @@ async fn handle_client( async { result }.or_else(|e| stream.throw_error(e)).await? }; - let client = Client::new(stream, creds, ¶ms); + let client = Client::new(stream, creds, ¶ms, session_id); cancel_map .with_session(|session| client.connect_to_db(session)) .await @@ -127,7 +136,7 @@ async fn handshake( let mut stream = PqStream::new(Stream::from_raw(stream)); loop { let msg = stream.read_startup_packet().await?; - println!("got message: {:?}", msg); + info!("received {msg:?}"); use FeStartupPacket::*; match msg { @@ -164,11 +173,13 @@ async fn handshake( stream.throw_error_str(ERR_INSECURE_CONNECTION).await?; } + info!(session_type = "normal", "successful handshake"); break Ok(Some((stream, params))); } CancelRequest(cancel_key_data) => { cancel_map.cancel_session(cancel_key_data).await?; + info!(session_type = "cancellation", "successful handshake"); break Ok(None); } } @@ -183,6 +194,8 @@ struct Client<'a, S> { creds: auth::BackendType<'a, auth::ClientCredentials<'a>>, /// KV-dictionary with PostgreSQL connection params. params: &'a StartupMessageParams, + /// Unique connection ID. + session_id: uuid::Uuid, } impl<'a, S> Client<'a, S> { @@ -191,11 +204,13 @@ impl<'a, S> Client<'a, S> { stream: PqStream, creds: auth::BackendType<'a, auth::ClientCredentials<'a>>, params: &'a StartupMessageParams, + session_id: uuid::Uuid, ) -> Self { Self { stream, creds, params, + session_id, } } } @@ -207,17 +222,20 @@ impl Client<'_, S> { mut stream, creds, params, + session_id, } = self; let extra = auth::ConsoleReqExtra { - // Currently it's OK to generate a new UUID **here**, but - // it might be better to move this to `cancellation::Session`. - session_id: uuid::Uuid::new_v4(), + session_id, // aka this connection's id application_name: params.get("application_name"), }; // Authenticate and connect to a compute node. - let auth = creds.authenticate(&extra, &mut stream).await; + let auth = creds + .authenticate(&extra, &mut stream) + .instrument(info_span!("auth")) + .await; + let node = async { auth }.or_else(|e| stream.throw_error(e)).await?; let reported_auth_ok = node.reported_auth_ok; @@ -251,6 +269,7 @@ impl Client<'_, S> { } // Starting from here we only proxy the client's traffic. + info!("performing the proxy pass..."); let mut db = MetricsStream::new(db.stream, inc_proxied); let mut client = MetricsStream::new(stream.into_inner(), inc_proxied); let _ = tokio::io::copy_bidirectional(&mut client, &mut db).await?;