Merge pull request #2581 from neondatabase/main

Release 2022-10-07
This commit is contained in:
Stas Kelvich
2022-10-07 16:50:55 +03:00
committed by GitHub
16 changed files with 150 additions and 60 deletions

View File

@@ -494,7 +494,7 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build neon
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID
compute-tools-image:
runs-on: dev
@@ -508,7 +508,7 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute tools
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID
run: /kaniko/executor --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-tools --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID
compute-node-image:
runs-on: dev
@@ -527,7 +527,7 @@ jobs:
# cloud repo depends on this image name, thus duplicating it
# remove compute-node when cloud repo is updated
- name: Kaniko build compute node with extensions v14 (compatibility)
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --snapshotMode=redo --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID
compute-node-image-v14:
runs-on: dev
@@ -543,7 +543,7 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute node with extensions v14
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:$GITHUB_RUN_ID
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v14 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v14:$GITHUB_RUN_ID
compute-node-image-v15:
@@ -560,7 +560,7 @@ jobs:
run: echo "{\"credsStore\":\"ecr-login\"}" > /kaniko/.docker/config.json
- name: Kaniko build compute node with extensions v15
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --dockerfile Dockerfile.compute-node-v15 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:$GITHUB_RUN_ID
run: /kaniko/executor --skip-unused-stages --snapshotMode=redo --cache=true --cache-repo 369495373322.dkr.ecr.eu-central-1.amazonaws.com/cache --context . --build-arg GIT_VERSION=${{ github.sha }} --dockerfile Dockerfile.compute-node-v15 --destination 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-v15:$GITHUB_RUN_ID
promote-images:
runs-on: dev

3
Cargo.lock generated
View File

@@ -2452,6 +2452,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"atty",
"base64",
"bstr",
"bytes",
@@ -2485,6 +2486,8 @@ dependencies = [
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-rustls",
"tracing",
"tracing-subscriber",
"url",
"utils",
"uuid",

View File

@@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
use std::{
borrow::Cow,
collections::HashMap,
fmt,
future::Future,
io::{self, Cursor},
str,
@@ -124,6 +125,19 @@ pub struct CancelKeyData {
pub cancel_key: i32,
}
impl fmt::Display for CancelKeyData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let hi = (self.backend_pid as u64) << 32;
let lo = self.cancel_key as u64;
let id = hi | lo;
// This format is more compact and might work better for logs.
f.debug_tuple("CancelKeyData")
.field(&format_args!("{:x}", id))
.finish()
}
}
use rand::distributions::{Distribution, Standard};
impl Distribution<CancelKeyData> for Standard {
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> CancelKeyData {

View File

@@ -24,7 +24,7 @@ pub mod defaults {
// This parameter determines L1 layer file size.
pub const DEFAULT_COMPACTION_TARGET_SIZE: u64 = 128 * 1024 * 1024;
pub const DEFAULT_COMPACTION_PERIOD: &str = "1 s";
pub const DEFAULT_COMPACTION_PERIOD: &str = "20 s";
pub const DEFAULT_COMPACTION_THRESHOLD: usize = 10;
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;

View File

@@ -5,7 +5,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
atty = "0.2.14"
base64 = "0.13.0"
bstr = "0.2.17"
bytes = { version = "1.0.1", features = ['serde'] }
@@ -35,6 +35,8 @@ thiserror = "1.0.30"
tokio = { version = "1.17", features = ["macros"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-rustls = "0.23.0"
tracing = "0.1.36"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.2.2"
uuid = { version = "0.8.2", features = ["v4", "serde"]}
x509-parser = "0.13.2"
@@ -44,6 +46,7 @@ metrics = { path = "../libs/metrics" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
[dev-dependencies]
async-trait = "0.1"
rcgen = "0.8.14"
rstest = "0.12"
tokio-postgres-rustls = "0.9.0"

View File

@@ -15,6 +15,7 @@ use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, warn};
static CPLANE_WAITERS: Lazy<Waiters<mgmt::ComputeReady>> = Lazy::new(Default::default);
@@ -171,6 +172,8 @@ impl BackendType<'_, ClientCredentials<'_>> {
// support SNI or other means of passing the project name.
// We now expect to see a very specific payload in the place of password.
if creds.project().is_none() {
warn!("project name not specified, resorting to the password hack auth flow");
let payload = AuthFlow::new(client)
.begin(auth::PasswordHack)
.await?
@@ -179,6 +182,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
// Finally we may finish the initialization of `creds`.
// TODO: add missing type safety to ClientCredentials.
info!(project = &payload.project, "received missing parameter");
creds.project = Some(payload.project.into());
let mut config = match &self {
@@ -196,6 +200,7 @@ impl BackendType<'_, ClientCredentials<'_>> {
// We should use a password from payload as well.
config.password(payload.password);
info!("user successfully authenticated (using the password hack)");
return Ok(compute::NodeInfo {
reported_auth_ok: false,
config,
@@ -203,19 +208,31 @@ impl BackendType<'_, ClientCredentials<'_>> {
}
}
match self {
let res = match self {
Console(endpoint, creds) => {
info!(
user = creds.user,
project = creds.project(),
"performing authentication using the console"
);
console::Api::new(&endpoint, extra, &creds)
.handle_user(client)
.await
}
Postgres(endpoint, creds) => {
info!("performing mock authentication using a local postgres instance");
postgres::Api::new(&endpoint, &creds)
.handle_user(client)
.await
}
// NOTE: this auth backend doesn't use client credentials.
Link(url) => link::handle_user(&url, client).await,
}
Link(url) => {
info!("performing link authentication");
link::handle_user(&url, client).await
}
}?;
info!("user successfully authenticated");
Ok(res)
}
}

View File

@@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize};
use std::future::Future;
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::info;
const REQUEST_FAILED: &str = "Console request failed";
@@ -148,10 +149,11 @@ impl<'a> Api<'a> {
}
async fn get_auth_info(&self) -> Result<AuthInfo, GetAuthInfoError> {
let request_id = uuid::Uuid::new_v4().to_string();
let req = self
.endpoint
.get("proxy_get_role_secret")
.header("X-Request-ID", uuid::Uuid::new_v4().to_string())
.header("X-Request-ID", &request_id)
.query(&[("session_id", self.extra.session_id)])
.query(&[
("application_name", self.extra.application_name),
@@ -160,9 +162,7 @@ impl<'a> Api<'a> {
])
.build()?;
// TODO: use a proper logger
println!("cplane request: {}", req.url());
info!(id = request_id, url = req.url().as_str(), "request");
let resp = self.endpoint.execute(req).await?;
if !resp.status().is_success() {
return Err(TransportError::HttpStatus(resp.status()).into());
@@ -177,10 +177,11 @@ impl<'a> Api<'a> {
/// Wake up the compute node and return the corresponding connection info.
pub(super) async fn wake_compute(&self) -> Result<ComputeConnCfg, WakeComputeError> {
let request_id = uuid::Uuid::new_v4().to_string();
let req = self
.endpoint
.get("proxy_wake_compute")
.header("X-Request-ID", uuid::Uuid::new_v4().to_string())
.header("X-Request-ID", &request_id)
.query(&[("session_id", self.extra.session_id)])
.query(&[
("application_name", self.extra.application_name),
@@ -188,9 +189,7 @@ impl<'a> Api<'a> {
])
.build()?;
// TODO: use a proper logger
println!("cplane request: {}", req.url());
info!(id = request_id, url = req.url().as_str(), "request");
let resp = self.endpoint.execute(req).await?;
if !resp.status().is_success() {
return Err(TransportError::HttpStatus(resp.status()).into());
@@ -227,15 +226,18 @@ where
GetAuthInfo: Future<Output = Result<AuthInfo, GetAuthInfoError>>,
WakeCompute: Future<Output = Result<ComputeConnCfg, WakeComputeError>>,
{
info!("fetching user's authentication info");
let auth_info = get_auth_info(endpoint).await?;
let flow = AuthFlow::new(client);
let scram_keys = match auth_info {
AuthInfo::Md5(_) => {
// TODO: decide if we should support MD5 in api v2
info!("auth endpoint chooses MD5");
return Err(auth::AuthError::bad_auth_method("MD5"));
}
AuthInfo::Scram(secret) => {
info!("auth endpoint chooses SCRAM");
let scram = auth::Scram(&secret);
Some(compute::ScramKeys {
client_key: flow.begin(scram).await?.authenticate().await?.as_bytes(),

View File

@@ -1,6 +1,7 @@
use crate::{auth, compute, error::UserFacingError, stream::PqStream, waiters};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::info;
use utils::pq_proto::{BeMessage as Be, BeParameterStatusMessage};
#[derive(Debug, Error)]
@@ -53,14 +54,16 @@ pub async fn handle_user(
let greeting = hello_message(link_uri, &psql_session_id);
let db_info = super::with_waiter(psql_session_id, |waiter| async {
// Give user a URL to spawn a new database
// Give user a URL to spawn a new database.
info!("sending the auth URL to the user");
client
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?
.write_message(&Be::NoticeResponse(&greeting))
.await?;
// Wait for web console response (see `mgmt`)
// Wait for web console response (see `mgmt`).
info!("waiting for console's reply...");
waiter.await?.map_err(LinkAuthError::AuthFailed)
})
.await?;

View File

@@ -3,6 +3,7 @@
use crate::error::UserFacingError;
use std::borrow::Cow;
use thiserror::Error;
use tracing::info;
use utils::pq_proto::StartupMessageParams;
#[derive(Debug, Error, PartialEq, Eq, Clone)]
@@ -82,6 +83,13 @@ impl<'a> ClientCredentials<'a> {
}
.transpose()?;
info!(
user = user,
dbname = dbname,
project = project.as_deref(),
"credentials"
);
Ok(Self {
user,
dbname,

View File

@@ -4,6 +4,7 @@ use parking_lot::Mutex;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio_postgres::{CancelToken, NoTls};
use tracing::info;
use utils::pq_proto::CancelKeyData;
/// Enables serving `CancelRequest`s.
@@ -18,8 +19,9 @@ impl CancelMap {
.lock()
.get(&key)
.and_then(|x| x.clone())
.with_context(|| format!("unknown session: {:?}", key))?;
.with_context(|| format!("query cancellation key not found: {key}"))?;
info!("cancelling query per user's request using key {key}");
cancel_closure.try_cancel_query().await
}
@@ -41,14 +43,16 @@ impl CancelMap {
self.0
.lock()
.try_insert(key, None)
.map_err(|_| anyhow!("session already exists: {:?}", key))?;
.map_err(|_| anyhow!("query cancellation key already exists: {key}"))?;
// This will guarantee that the session gets dropped
// as soon as the future is finished.
scopeguard::defer! {
self.0.lock().remove(&key);
info!("dropped query cancellation key {key}");
}
info!("registered new query cancellation key {key}");
let session = Session::new(key, self);
f(session).await
}
@@ -102,10 +106,13 @@ impl<'a> Session<'a> {
fn new(key: CancelKeyData, cancel_map: &'a CancelMap) -> Self {
Self { key, cancel_map }
}
}
impl Session<'_> {
/// Store the cancel token for the given session.
/// This enables query cancellation in [`crate::proxy::handshake`].
pub fn enable_query_cancellation(self, cancel_closure: CancelClosure) -> CancelKeyData {
info!("enabling query cancellation for this session");
self.cancel_map
.0
.lock()

View File

@@ -5,6 +5,7 @@ use std::{io, net::SocketAddr};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::NoTls;
use tracing::{error, info};
use utils::pq_proto::StartupMessageParams;
#[derive(Debug, Error)]
@@ -54,6 +55,7 @@ impl NodeInfo {
use tokio_postgres::config::Host;
let connect_once = |host, port| {
info!("trying to connect to a compute node at {host}:{port}");
TcpStream::connect((host, port)).and_then(|socket| async {
let socket_addr = socket.peer_addr()?;
// This prevents load balancer from severing the connection.
@@ -72,7 +74,11 @@ impl NodeInfo {
if ports.len() > 1 && ports.len() != hosts.len() {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("couldn't connect: bad compute config, ports and hosts entries' count does not match: {:?}", self.config),
format!(
"couldn't connect: bad compute config, \
ports and hosts entries' count does not match: {:?}",
self.config
),
));
}
@@ -88,7 +94,7 @@ impl NodeInfo {
Ok(socket) => return Ok(socket),
Err(err) => {
// We can't throw an error here, as there might be more hosts to try.
println!("failed to connect to compute `{host}:{port}`: {err}");
error!("failed to connect to a compute node at {host}:{port}: {err}");
connection_error = Some(err);
}
}
@@ -160,8 +166,8 @@ impl NodeInfo {
.ok_or(ConnectionError::FailedToFetchPgVersion)?
.into();
info!("connected to user's compute node at {socket_addr}");
let cancel_closure = CancelClosure::new(socket_addr, client.cancel_token());
let db = PostgresConnection { stream, version };
Ok((db, cancel_closure))

View File

@@ -1,6 +1,7 @@
use anyhow::anyhow;
use hyper::{Body, Request, Response, StatusCode};
use std::net::TcpListener;
use tracing::info;
use utils::http::{endpoint, error::ApiError, json::json_response, RouterBuilder, RouterService};
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
@@ -12,9 +13,9 @@ fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
router.get("/v1/status", status_handler)
}
pub async fn thread_main(http_listener: TcpListener) -> anyhow::Result<()> {
pub async fn task_main(http_listener: TcpListener) -> anyhow::Result<()> {
scopeguard::defer! {
println!("http has shut down");
info!("http has shut down");
}
let service = || RouterService::new(make_router().build()?);

View File

@@ -25,6 +25,7 @@ use config::ProxyConfig;
use futures::FutureExt;
use std::{borrow::Cow, future::Future, net::SocketAddr};
use tokio::{net::TcpListener, task::JoinError};
use tracing::info;
use utils::project_git_version;
project_git_version!(GIT_VERSION);
@@ -38,6 +39,11 @@ async fn flatten_err(
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_ansi(atty::is(atty::Stream::Stdout))
.with_target(false)
.init();
let arg_matches = clap::App::new("Neon proxy/router")
.version(GIT_VERSION)
.arg(
@@ -140,22 +146,22 @@ async fn main() -> anyhow::Result<()> {
auth_backend,
}));
println!("Version: {GIT_VERSION}");
println!("Authentication backend: {}", config.auth_backend);
info!("Version: {GIT_VERSION}");
info!("Authentication backend: {}", config.auth_backend);
// Check that we can bind to address before further initialization
println!("Starting http on {}", http_address);
info!("Starting http on {http_address}");
let http_listener = TcpListener::bind(http_address).await?.into_std()?;
println!("Starting mgmt on {}", mgmt_address);
info!("Starting mgmt on {mgmt_address}");
let mgmt_listener = TcpListener::bind(mgmt_address).await?.into_std()?;
println!("Starting proxy on {}", proxy_address);
info!("Starting proxy on {proxy_address}");
let proxy_listener = TcpListener::bind(proxy_address).await?;
let tasks = [
tokio::spawn(http::server::thread_main(http_listener)),
tokio::spawn(proxy::thread_main(config, proxy_listener)),
tokio::spawn(http::server::task_main(http_listener)),
tokio::spawn(proxy::task_main(config, proxy_listener)),
tokio::task::spawn_blocking(move || mgmt::thread_main(mgmt_listener)),
]
.map(flatten_err);

View File

@@ -5,6 +5,7 @@ use std::{
net::{TcpListener, TcpStream},
thread,
};
use tracing::{error, info};
use utils::{
postgres_backend::{self, AuthType, PostgresBackend},
pq_proto::{BeMessage, SINGLE_COL_ROWDESC},
@@ -19,7 +20,7 @@ use utils::{
///
pub fn thread_main(listener: TcpListener) -> anyhow::Result<()> {
scopeguard::defer! {
println!("mgmt has shut down");
info!("mgmt has shut down");
}
listener
@@ -27,14 +28,14 @@ pub fn thread_main(listener: TcpListener) -> anyhow::Result<()> {
.context("failed to set listener to blocking")?;
loop {
let (socket, peer_addr) = listener.accept().context("failed to accept a new client")?;
println!("accepted connection from {}", peer_addr);
info!("accepted connection from {peer_addr}");
socket
.set_nodelay(true)
.context("failed to set client socket option")?;
thread::spawn(move || {
if let Err(err) = handle_connection(socket) {
println!("error: {}", err);
error!("{err}");
}
});
}
@@ -102,14 +103,14 @@ impl postgres_backend::Handler for MgmtHandler {
let res = try_process_query(pgb, query_string);
// intercept and log error message
if res.is_err() {
println!("Mgmt query failed: #{:?}", res);
error!("mgmt query failed: {res:?}");
}
res
}
}
fn try_process_query(pgb: &mut PostgresBackend, query_string: &str) -> anyhow::Result<()> {
println!("Got mgmt query [redacted]"); // Content contains password, don't print it
info!("got mgmt query [redacted]"); // Content contains password, don't print it
let resp: PsqlSessionResponse = serde_json::from_str(query_string)?;

View File

@@ -8,6 +8,7 @@ use metrics::{register_int_counter, IntCounter};
use once_cell::sync::Lazy;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{error, info, info_span, Instrument};
use utils::pq_proto::{BeMessage as Be, *};
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
@@ -43,17 +44,17 @@ where
F: std::future::Future<Output = anyhow::Result<R>>,
{
future.await.map_err(|err| {
println!("error: {}", err);
error!("{err}");
err
})
}
pub async fn thread_main(
pub async fn task_main(
config: &'static ProxyConfig,
listener: tokio::net::TcpListener,
) -> anyhow::Result<()> {
scopeguard::defer! {
println!("proxy has shut down");
info!("proxy has shut down");
}
// When set for the server socket, the keepalive setting
@@ -63,22 +64,29 @@ pub async fn thread_main(
let cancel_map = Arc::new(CancelMap::default());
loop {
let (socket, peer_addr) = listener.accept().await?;
println!("accepted connection from {}", peer_addr);
info!("accepted connection from {peer_addr}");
let session_id = uuid::Uuid::new_v4();
let cancel_map = Arc::clone(&cancel_map);
tokio::spawn(log_error(async move {
socket
.set_nodelay(true)
.context("failed to set socket option")?;
tokio::spawn(
log_error(async move {
info!("spawned a task for {peer_addr}");
handle_client(config, &cancel_map, socket).await
}));
socket
.set_nodelay(true)
.context("failed to set socket option")?;
handle_client(config, &cancel_map, session_id, socket).await
})
.instrument(info_span!("client", session = format_args!("{session_id}"))),
);
}
}
async fn handle_client(
config: &ProxyConfig,
cancel_map: &CancelMap,
session_id: uuid::Uuid,
stream: impl AsyncRead + AsyncWrite + Unpin + Send,
) -> anyhow::Result<()> {
// The `closed` counter will increase when this future is destroyed.
@@ -88,7 +96,8 @@ async fn handle_client(
}
let tls = config.tls_config.as_ref();
let (mut stream, params) = match handshake(stream, tls, cancel_map).await? {
let do_handshake = handshake(stream, tls, cancel_map).instrument(info_span!("handshake"));
let (mut stream, params) = match do_handshake.await? {
Some(x) => x,
None => return Ok(()), // it's a cancellation request
};
@@ -106,7 +115,7 @@ async fn handle_client(
async { result }.or_else(|e| stream.throw_error(e)).await?
};
let client = Client::new(stream, creds, &params);
let client = Client::new(stream, creds, &params, session_id);
cancel_map
.with_session(|session| client.connect_to_db(session))
.await
@@ -127,7 +136,7 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
let mut stream = PqStream::new(Stream::from_raw(stream));
loop {
let msg = stream.read_startup_packet().await?;
println!("got message: {:?}", msg);
info!("received {msg:?}");
use FeStartupPacket::*;
match msg {
@@ -164,11 +173,13 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
stream.throw_error_str(ERR_INSECURE_CONNECTION).await?;
}
info!(session_type = "normal", "successful handshake");
break Ok(Some((stream, params)));
}
CancelRequest(cancel_key_data) => {
cancel_map.cancel_session(cancel_key_data).await?;
info!(session_type = "cancellation", "successful handshake");
break Ok(None);
}
}
@@ -183,6 +194,8 @@ struct Client<'a, S> {
creds: auth::BackendType<'a, auth::ClientCredentials<'a>>,
/// KV-dictionary with PostgreSQL connection params.
params: &'a StartupMessageParams,
/// Unique connection ID.
session_id: uuid::Uuid,
}
impl<'a, S> Client<'a, S> {
@@ -191,11 +204,13 @@ impl<'a, S> Client<'a, S> {
stream: PqStream<S>,
creds: auth::BackendType<'a, auth::ClientCredentials<'a>>,
params: &'a StartupMessageParams,
session_id: uuid::Uuid,
) -> Self {
Self {
stream,
creds,
params,
session_id,
}
}
}
@@ -207,17 +222,20 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
mut stream,
creds,
params,
session_id,
} = self;
let extra = auth::ConsoleReqExtra {
// Currently it's OK to generate a new UUID **here**, but
// it might be better to move this to `cancellation::Session`.
session_id: uuid::Uuid::new_v4(),
session_id, // aka this connection's id
application_name: params.get("application_name"),
};
// Authenticate and connect to a compute node.
let auth = creds.authenticate(&extra, &mut stream).await;
let auth = creds
.authenticate(&extra, &mut stream)
.instrument(info_span!("auth"))
.await;
let node = async { auth }.or_else(|e| stream.throw_error(e)).await?;
let reported_auth_ok = node.reported_auth_ok;
@@ -251,6 +269,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin + Send> Client<'_, S> {
}
// Starting from here we only proxy the client's traffic.
info!("performing the proxy pass...");
let mut db = MetricsStream::new(db.stream, inc_proxied);
let mut client = MetricsStream::new(stream.into_inner(), inc_proxied);
let _ = tokio::io::copy_bidirectional(&mut client, &mut db).await?;

View File

@@ -54,7 +54,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
for i in {
"checkpoint_distance": 10000,
"compaction_target_size": 1048576,
"compaction_period": 1,
"compaction_period": 20,
"compaction_threshold": 10,
"gc_horizon": 67108864,
"gc_period": 100,
@@ -74,7 +74,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
for i in {
"checkpoint_distance": 20000,
"compaction_target_size": 1048576,
"compaction_period": 1,
"compaction_period": 20,
"compaction_threshold": 10,
"gc_horizon": 67108864,
"gc_period": 30,
@@ -102,7 +102,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
for i in {
"checkpoint_distance": 15000,
"compaction_target_size": 1048576,
"compaction_period": 1,
"compaction_period": 20,
"compaction_threshold": 10,
"gc_horizon": 67108864,
"gc_period": 80,
@@ -125,7 +125,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}"""
for i in {
"checkpoint_distance": 15000,
"compaction_target_size": 1048576,
"compaction_period": 1,
"compaction_period": 20,
"compaction_threshold": 10,
"gc_horizon": 67108864,
"gc_period": 80,