mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 00:50:36 +00:00
proxy: some more parquet data (#6711)
## Summary of changes add auth_method and database to the parquet logs
This commit is contained in:
@@ -194,8 +194,7 @@ async fn auth_quirks(
|
||||
// We now expect to see a very specific payload in the place of password.
|
||||
let (info, unauthenticated_password) = match user_info.try_into() {
|
||||
Err(info) => {
|
||||
let res = hacks::password_hack_no_authentication(info, client, &mut ctx.latency_timer)
|
||||
.await?;
|
||||
let res = hacks::password_hack_no_authentication(ctx, info, client).await?;
|
||||
|
||||
ctx.set_endpoint_id(res.info.endpoint.clone());
|
||||
tracing::Span::current().record("ep", &tracing::field::display(&res.info.endpoint));
|
||||
@@ -276,11 +275,12 @@ async fn authenticate_with_secret(
|
||||
// Perform cleartext auth if we're allowed to do that.
|
||||
// Currently, we use it for websocket connections (latency).
|
||||
if allow_cleartext {
|
||||
return hacks::authenticate_cleartext(info, client, &mut ctx.latency_timer, secret).await;
|
||||
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
|
||||
return hacks::authenticate_cleartext(ctx, info, client, secret).await;
|
||||
}
|
||||
|
||||
// Finally, proceed with the main auth flow (SCRAM-based).
|
||||
classic::authenticate(info, client, config, &mut ctx.latency_timer, secret).await
|
||||
classic::authenticate(ctx, info, client, config, secret).await
|
||||
}
|
||||
|
||||
impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint> {
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::{
|
||||
compute,
|
||||
config::AuthenticationConfig,
|
||||
console::AuthSecret,
|
||||
metrics::LatencyTimer,
|
||||
context::RequestMonitoring,
|
||||
sasl,
|
||||
stream::{PqStream, Stream},
|
||||
};
|
||||
@@ -12,10 +12,10 @@ use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, warn};
|
||||
|
||||
pub(super) async fn authenticate(
|
||||
ctx: &mut RequestMonitoring,
|
||||
creds: ComputeUserInfo,
|
||||
client: &mut PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
|
||||
config: &'static AuthenticationConfig,
|
||||
latency_timer: &mut LatencyTimer,
|
||||
secret: AuthSecret,
|
||||
) -> auth::Result<ComputeCredentials<ComputeCredentialKeys>> {
|
||||
let flow = AuthFlow::new(client);
|
||||
@@ -27,13 +27,11 @@ pub(super) async fn authenticate(
|
||||
}
|
||||
AuthSecret::Scram(secret) => {
|
||||
info!("auth endpoint chooses SCRAM");
|
||||
let scram = auth::Scram(&secret);
|
||||
let scram = auth::Scram(&secret, &mut *ctx);
|
||||
|
||||
let auth_outcome = tokio::time::timeout(
|
||||
config.scram_protocol_timeout,
|
||||
async {
|
||||
// pause the timer while we communicate with the client
|
||||
let _paused = latency_timer.pause();
|
||||
|
||||
flow.begin(scram).await.map_err(|error| {
|
||||
warn!(?error, "error sending scram acknowledgement");
|
||||
|
||||
@@ -4,7 +4,7 @@ use super::{
|
||||
use crate::{
|
||||
auth::{self, AuthFlow},
|
||||
console::AuthSecret,
|
||||
metrics::LatencyTimer,
|
||||
context::RequestMonitoring,
|
||||
sasl,
|
||||
stream::{self, Stream},
|
||||
};
|
||||
@@ -16,15 +16,16 @@ use tracing::{info, warn};
|
||||
/// These properties are benefical for serverless JS workers, so we
|
||||
/// use this mechanism for websocket connections.
|
||||
pub async fn authenticate_cleartext(
|
||||
ctx: &mut RequestMonitoring,
|
||||
info: ComputeUserInfo,
|
||||
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
|
||||
latency_timer: &mut LatencyTimer,
|
||||
secret: AuthSecret,
|
||||
) -> auth::Result<ComputeCredentials<ComputeCredentialKeys>> {
|
||||
warn!("cleartext auth flow override is enabled, proceeding");
|
||||
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
|
||||
|
||||
// pause the timer while we communicate with the client
|
||||
let _paused = latency_timer.pause();
|
||||
let _paused = ctx.latency_timer.pause();
|
||||
|
||||
let auth_outcome = AuthFlow::new(client)
|
||||
.begin(auth::CleartextPassword(secret))
|
||||
@@ -47,14 +48,15 @@ pub async fn authenticate_cleartext(
|
||||
/// Similar to [`authenticate_cleartext`], but there's a specific password format,
|
||||
/// and passwords are not yet validated (we don't know how to validate them!)
|
||||
pub async fn password_hack_no_authentication(
|
||||
ctx: &mut RequestMonitoring,
|
||||
info: ComputeUserInfoNoEndpoint,
|
||||
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
|
||||
latency_timer: &mut LatencyTimer,
|
||||
) -> auth::Result<ComputeCredentials<Vec<u8>>> {
|
||||
warn!("project not specified, resorting to the password hack auth flow");
|
||||
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
|
||||
|
||||
// pause the timer while we communicate with the client
|
||||
let _paused = latency_timer.pause();
|
||||
let _paused = ctx.latency_timer.pause();
|
||||
|
||||
let payload = AuthFlow::new(client)
|
||||
.begin(auth::PasswordHack)
|
||||
|
||||
@@ -61,6 +61,8 @@ pub(super) async fn authenticate(
|
||||
link_uri: &reqwest::Url,
|
||||
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
|
||||
) -> auth::Result<NodeInfo> {
|
||||
ctx.set_auth_method(crate::context::AuthMethod::Web);
|
||||
|
||||
// registering waiter can fail if we get unlucky with rng.
|
||||
// just try again.
|
||||
let (psql_session_id, waiter) = loop {
|
||||
|
||||
@@ -99,6 +99,9 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
// record the values if we have them
|
||||
ctx.set_application(params.get("application_name").map(SmolStr::from));
|
||||
ctx.set_user(user.clone());
|
||||
if let Some(dbname) = params.get("database") {
|
||||
ctx.set_dbname(dbname.into());
|
||||
}
|
||||
|
||||
// Project name might be passed via PG's command-line options.
|
||||
let endpoint_option = params
|
||||
|
||||
@@ -4,9 +4,11 @@ use super::{backend::ComputeCredentialKeys, AuthErrorImpl, PasswordHackPayload};
|
||||
use crate::{
|
||||
config::TlsServerEndPoint,
|
||||
console::AuthSecret,
|
||||
context::RequestMonitoring,
|
||||
sasl, scram,
|
||||
stream::{PqStream, Stream},
|
||||
};
|
||||
use postgres_protocol::authentication::sasl::{SCRAM_SHA_256, SCRAM_SHA_256_PLUS};
|
||||
use pq_proto::{BeAuthenticationSaslMessage, BeMessage, BeMessage as Be};
|
||||
use std::io;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
@@ -23,7 +25,7 @@ pub trait AuthMethod {
|
||||
pub struct Begin;
|
||||
|
||||
/// Use [SCRAM](crate::scram)-based auth in [`AuthFlow`].
|
||||
pub struct Scram<'a>(pub &'a scram::ServerSecret);
|
||||
pub struct Scram<'a>(pub &'a scram::ServerSecret, pub &'a mut RequestMonitoring);
|
||||
|
||||
impl AuthMethod for Scram<'_> {
|
||||
#[inline(always)]
|
||||
@@ -138,6 +140,11 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, CleartextPassword> {
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
|
||||
/// Perform user authentication. Raise an error in case authentication failed.
|
||||
pub async fn authenticate(self) -> super::Result<sasl::Outcome<scram::ScramKey>> {
|
||||
let Scram(secret, ctx) = self.state;
|
||||
|
||||
// pause the timer while we communicate with the client
|
||||
let _paused = ctx.latency_timer.pause();
|
||||
|
||||
// Initial client message contains the chosen auth method's name.
|
||||
let msg = self.stream.read_password_message().await?;
|
||||
let sasl = sasl::FirstMessage::parse(&msg)
|
||||
@@ -148,9 +155,15 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AuthFlow<'_, S, Scram<'_>> {
|
||||
return Err(super::AuthError::bad_auth_method(sasl.method));
|
||||
}
|
||||
|
||||
match sasl.method {
|
||||
SCRAM_SHA_256 => ctx.auth_method = Some(crate::context::AuthMethod::ScramSha256),
|
||||
SCRAM_SHA_256_PLUS => {
|
||||
ctx.auth_method = Some(crate::context::AuthMethod::ScramSha256Plus)
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
info!("client chooses {}", sasl.method);
|
||||
|
||||
let secret = self.state.0;
|
||||
let outcome = sasl::SaslStream::new(self.stream, sasl.message)
|
||||
.authenticate(scram::Exchange::new(
|
||||
secret,
|
||||
|
||||
@@ -11,7 +11,7 @@ use crate::{
|
||||
console::messages::MetricsAuxInfo,
|
||||
error::ErrorKind,
|
||||
metrics::{LatencyTimer, ENDPOINT_ERRORS_BY_KIND, ERROR_BY_KIND},
|
||||
BranchId, EndpointId, ProjectId, RoleName,
|
||||
BranchId, DbName, EndpointId, ProjectId, RoleName,
|
||||
};
|
||||
|
||||
pub mod parquet;
|
||||
@@ -34,9 +34,11 @@ pub struct RequestMonitoring {
|
||||
project: Option<ProjectId>,
|
||||
branch: Option<BranchId>,
|
||||
endpoint_id: Option<EndpointId>,
|
||||
dbname: Option<DbName>,
|
||||
user: Option<RoleName>,
|
||||
application: Option<SmolStr>,
|
||||
error_kind: Option<ErrorKind>,
|
||||
pub(crate) auth_method: Option<AuthMethod>,
|
||||
success: bool,
|
||||
|
||||
// extra
|
||||
@@ -45,6 +47,15 @@ pub struct RequestMonitoring {
|
||||
pub latency_timer: LatencyTimer,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum AuthMethod {
|
||||
// aka link aka passwordless
|
||||
Web,
|
||||
ScramSha256,
|
||||
ScramSha256Plus,
|
||||
Cleartext,
|
||||
}
|
||||
|
||||
impl RequestMonitoring {
|
||||
pub fn new(
|
||||
session_id: Uuid,
|
||||
@@ -62,9 +73,11 @@ impl RequestMonitoring {
|
||||
project: None,
|
||||
branch: None,
|
||||
endpoint_id: None,
|
||||
dbname: None,
|
||||
user: None,
|
||||
application: None,
|
||||
error_kind: None,
|
||||
auth_method: None,
|
||||
success: false,
|
||||
|
||||
sender: LOG_CHAN.get().and_then(|tx| tx.upgrade()),
|
||||
@@ -106,10 +119,18 @@ impl RequestMonitoring {
|
||||
self.application = app.or_else(|| self.application.clone());
|
||||
}
|
||||
|
||||
pub fn set_dbname(&mut self, dbname: DbName) {
|
||||
self.dbname = Some(dbname);
|
||||
}
|
||||
|
||||
pub fn set_user(&mut self, user: RoleName) {
|
||||
self.user = Some(user);
|
||||
}
|
||||
|
||||
pub fn set_auth_method(&mut self, auth_method: AuthMethod) {
|
||||
self.auth_method = Some(auth_method);
|
||||
}
|
||||
|
||||
pub fn set_error_kind(&mut self, kind: ErrorKind) {
|
||||
ERROR_BY_KIND
|
||||
.with_label_values(&[kind.to_metric_label()])
|
||||
|
||||
@@ -84,8 +84,10 @@ struct RequestData {
|
||||
username: Option<String>,
|
||||
application_name: Option<String>,
|
||||
endpoint_id: Option<String>,
|
||||
database: Option<String>,
|
||||
project: Option<String>,
|
||||
branch: Option<String>,
|
||||
auth_method: Option<&'static str>,
|
||||
error: Option<&'static str>,
|
||||
/// Success is counted if we form a HTTP response with sql rows inside
|
||||
/// Or if we make it to proxy_pass
|
||||
@@ -104,8 +106,15 @@ impl From<RequestMonitoring> for RequestData {
|
||||
username: value.user.as_deref().map(String::from),
|
||||
application_name: value.application.as_deref().map(String::from),
|
||||
endpoint_id: value.endpoint_id.as_deref().map(String::from),
|
||||
database: value.dbname.as_deref().map(String::from),
|
||||
project: value.project.as_deref().map(String::from),
|
||||
branch: value.branch.as_deref().map(String::from),
|
||||
auth_method: value.auth_method.as_ref().map(|x| match x {
|
||||
super::AuthMethod::Web => "web",
|
||||
super::AuthMethod::ScramSha256 => "scram_sha_256",
|
||||
super::AuthMethod::ScramSha256Plus => "scram_sha_256_plus",
|
||||
super::AuthMethod::Cleartext => "cleartext",
|
||||
}),
|
||||
protocol: value.protocol,
|
||||
region: value.region,
|
||||
error: value.error_kind.as_ref().map(|e| e.to_metric_label()),
|
||||
@@ -431,8 +440,10 @@ mod tests {
|
||||
application_name: Some("test".to_owned()),
|
||||
username: Some(hex::encode(rng.gen::<[u8; 4]>())),
|
||||
endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())),
|
||||
database: Some(hex::encode(rng.gen::<[u8; 16]>())),
|
||||
project: Some(hex::encode(rng.gen::<[u8; 16]>())),
|
||||
branch: Some(hex::encode(rng.gen::<[u8; 16]>())),
|
||||
auth_method: None,
|
||||
protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
|
||||
region: "us-east-1",
|
||||
error: None,
|
||||
@@ -505,15 +516,15 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1087635, 3, 6000),
|
||||
(1087288, 3, 6000),
|
||||
(1087444, 3, 6000),
|
||||
(1087572, 3, 6000),
|
||||
(1087468, 3, 6000),
|
||||
(1087500, 3, 6000),
|
||||
(1087533, 3, 6000),
|
||||
(1087566, 3, 6000),
|
||||
(362671, 1, 2000)
|
||||
(1313727, 3, 6000),
|
||||
(1313720, 3, 6000),
|
||||
(1313780, 3, 6000),
|
||||
(1313737, 3, 6000),
|
||||
(1313867, 3, 6000),
|
||||
(1313709, 3, 6000),
|
||||
(1313501, 3, 6000),
|
||||
(1313737, 3, 6000),
|
||||
(438118, 1, 2000)
|
||||
],
|
||||
);
|
||||
|
||||
@@ -543,11 +554,11 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1028637, 5, 10000),
|
||||
(1031969, 5, 10000),
|
||||
(1019900, 5, 10000),
|
||||
(1020365, 5, 10000),
|
||||
(1025010, 5, 10000)
|
||||
(1219459, 5, 10000),
|
||||
(1225609, 5, 10000),
|
||||
(1227403, 5, 10000),
|
||||
(1226765, 5, 10000),
|
||||
(1218043, 5, 10000)
|
||||
],
|
||||
);
|
||||
|
||||
@@ -579,11 +590,11 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1210770, 6, 12000),
|
||||
(1211036, 6, 12000),
|
||||
(1210990, 6, 12000),
|
||||
(1210861, 6, 12000),
|
||||
(202073, 1, 2000)
|
||||
(1205106, 5, 10000),
|
||||
(1204837, 5, 10000),
|
||||
(1205130, 5, 10000),
|
||||
(1205118, 5, 10000),
|
||||
(1205373, 5, 10000)
|
||||
],
|
||||
);
|
||||
|
||||
@@ -608,15 +619,15 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1087635, 3, 6000),
|
||||
(1087288, 3, 6000),
|
||||
(1087444, 3, 6000),
|
||||
(1087572, 3, 6000),
|
||||
(1087468, 3, 6000),
|
||||
(1087500, 3, 6000),
|
||||
(1087533, 3, 6000),
|
||||
(1087566, 3, 6000),
|
||||
(362671, 1, 2000)
|
||||
(1313727, 3, 6000),
|
||||
(1313720, 3, 6000),
|
||||
(1313780, 3, 6000),
|
||||
(1313737, 3, 6000),
|
||||
(1313867, 3, 6000),
|
||||
(1313709, 3, 6000),
|
||||
(1313501, 3, 6000),
|
||||
(1313737, 3, 6000),
|
||||
(438118, 1, 2000)
|
||||
],
|
||||
);
|
||||
|
||||
@@ -653,7 +664,7 @@ mod tests {
|
||||
// files are smaller than the size threshold, but they took too long to fill so were flushed early
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[(545264, 2, 3001), (545025, 2, 3000), (544857, 2, 2999)],
|
||||
[(658383, 2, 3001), (658097, 2, 3000), (657893, 2, 2999)],
|
||||
);
|
||||
|
||||
tmpdir.close().unwrap();
|
||||
|
||||
@@ -144,7 +144,7 @@ impl TestAuth for Scram {
|
||||
stream: &mut PqStream<Stream<S>>,
|
||||
) -> anyhow::Result<()> {
|
||||
let outcome = auth::AuthFlow::new(stream)
|
||||
.begin(auth::Scram(&self.0))
|
||||
.begin(auth::Scram(&self.0, &mut RequestMonitoring::test()))
|
||||
.await?
|
||||
.authenticate()
|
||||
.await?;
|
||||
|
||||
@@ -36,6 +36,7 @@ use crate::error::ReportableError;
|
||||
use crate::metrics::HTTP_CONTENT_LENGTH;
|
||||
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::DbName;
|
||||
use crate::RoleName;
|
||||
|
||||
use super::backend::PoolingBackend;
|
||||
@@ -117,6 +118,9 @@ fn get_conn_info(
|
||||
headers: &HeaderMap,
|
||||
tls: &TlsConfig,
|
||||
) -> Result<ConnInfo, ConnInfoError> {
|
||||
// HTTP only uses cleartext (for now and likely always)
|
||||
ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
|
||||
|
||||
let connection_string = headers
|
||||
.get("Neon-Connection-String")
|
||||
.ok_or(ConnInfoError::InvalidHeader("Neon-Connection-String"))?
|
||||
@@ -134,7 +138,8 @@ fn get_conn_info(
|
||||
.path_segments()
|
||||
.ok_or(ConnInfoError::MissingDbName)?;
|
||||
|
||||
let dbname = url_path.next().ok_or(ConnInfoError::InvalidDbName)?;
|
||||
let dbname: DbName = url_path.next().ok_or(ConnInfoError::InvalidDbName)?.into();
|
||||
ctx.set_dbname(dbname.clone());
|
||||
|
||||
let username = RoleName::from(urlencoding::decode(connection_url.username())?);
|
||||
if username.is_empty() {
|
||||
@@ -174,7 +179,7 @@ fn get_conn_info(
|
||||
|
||||
Ok(ConnInfo {
|
||||
user_info,
|
||||
dbname: dbname.into(),
|
||||
dbname,
|
||||
password: match password {
|
||||
std::borrow::Cow::Borrowed(b) => b.into(),
|
||||
std::borrow::Cow::Owned(b) => b.into(),
|
||||
|
||||
Reference in New Issue
Block a user