Propagate information about the protocol to console (#6102)

## Problem

In snowflake logs currently there is no information about the protocol,
that the client uses.

## Summary of changes

Propagate the information about the protocol together with the app_name.
In format: `{app_name}/{sql_over_http/tcp/ws}`.

This will give to @stepashka more observability on what our clients are
using.
This commit is contained in:
Anna Khanova
2023-12-12 12:42:51 +01:00
committed by GitHub
parent fead836f26
commit 9e071e4458
7 changed files with 36 additions and 33 deletions

View File

@@ -166,7 +166,7 @@ impl TryFrom<ClientCredentials> for ComputeUserInfo {
/// All authentication flows will emit an AuthenticationOk message if successful.
async fn auth_quirks(
api: &impl console::Api,
extra: &ConsoleReqExtra<'_>,
extra: &ConsoleReqExtra,
creds: ClientCredentials,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
allow_cleartext: bool,
@@ -235,7 +235,7 @@ async fn auth_quirks(
/// only if authentication was successfuly.
async fn auth_and_wake_compute(
api: &impl console::Api,
extra: &ConsoleReqExtra<'_>,
extra: &ConsoleReqExtra,
creds: ClientCredentials,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
allow_cleartext: bool,
@@ -314,7 +314,7 @@ impl<'a> BackendType<'a, ClientCredentials> {
#[tracing::instrument(fields(allow_cleartext = allow_cleartext), skip_all)]
pub async fn authenticate(
self,
extra: &ConsoleReqExtra<'_>,
extra: &ConsoleReqExtra,
client: &mut stream::PqStream<Stream<impl AsyncRead + AsyncWrite + Unpin>>,
allow_cleartext: bool,
config: &'static AuthenticationConfig,
@@ -387,7 +387,7 @@ impl<'a> BackendType<'a, ClientCredentials> {
impl BackendType<'_, ComputeUserInfo> {
pub async fn get_allowed_ips(
&self,
extra: &ConsoleReqExtra<'_>,
extra: &ConsoleReqExtra,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
use BackendType::*;
match self {
@@ -404,7 +404,7 @@ impl BackendType<'_, ComputeUserInfo> {
/// The link auth flow doesn't support this, so we return [`None`] in that case.
pub async fn wake_compute(
&self,
extra: &ConsoleReqExtra<'_>,
extra: &ConsoleReqExtra,
) -> Result<Option<CachedNodeInfo>, console::errors::WakeComputeError> {
use BackendType::*;

View File

@@ -196,15 +196,15 @@ pub mod errors {
}
/// Extra query params we'd like to pass to the console.
pub struct ConsoleReqExtra<'a> {
pub struct ConsoleReqExtra {
/// A unique identifier for a connection.
pub session_id: uuid::Uuid,
/// Name of client application, if set.
pub application_name: Option<&'a str>,
pub application_name: String,
pub options: Vec<(String, String)>,
}
impl<'a> ConsoleReqExtra<'a> {
impl ConsoleReqExtra {
// https://swagger.io/docs/specification/serialization/ DeepObject format
// paramName[prop1]=value1&paramName[prop2]=value2&....
pub fn options_as_deep_object(&self) -> Vec<(String, String)> {
@@ -259,20 +259,20 @@ pub trait Api {
/// Get the client's auth secret for authentication.
async fn get_auth_info(
&self,
extra: &ConsoleReqExtra<'_>,
extra: &ConsoleReqExtra,
creds: &ComputeUserInfo,
) -> Result<AuthInfo, errors::GetAuthInfoError>;
async fn get_allowed_ips(
&self,
extra: &ConsoleReqExtra<'_>,
extra: &ConsoleReqExtra,
creds: &ComputeUserInfo,
) -> Result<Arc<Vec<String>>, errors::GetAuthInfoError>;
/// Wake up the compute node and return the corresponding connection info.
async fn wake_compute(
&self,
extra: &ConsoleReqExtra<'_>,
extra: &ConsoleReqExtra,
creds: &ComputeUserInfo,
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
}

View File

@@ -144,7 +144,7 @@ impl super::Api for Api {
#[tracing::instrument(skip_all)]
async fn get_auth_info(
&self,
_extra: &ConsoleReqExtra<'_>,
_extra: &ConsoleReqExtra,
creds: &ComputeUserInfo,
) -> Result<AuthInfo, GetAuthInfoError> {
self.do_get_auth_info(creds).await
@@ -152,7 +152,7 @@ impl super::Api for Api {
async fn get_allowed_ips(
&self,
_extra: &ConsoleReqExtra<'_>,
_extra: &ConsoleReqExtra,
creds: &ComputeUserInfo,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
Ok(Arc::new(self.do_get_auth_info(creds).await?.allowed_ips))
@@ -161,7 +161,7 @@ impl super::Api for Api {
#[tracing::instrument(skip_all)]
async fn wake_compute(
&self,
_extra: &ConsoleReqExtra<'_>,
_extra: &ConsoleReqExtra,
_creds: &ComputeUserInfo,
) -> Result<CachedNodeInfo, WakeComputeError> {
self.do_wake_compute()

View File

@@ -48,7 +48,7 @@ impl Api {
async fn do_get_auth_info(
&self,
extra: &ConsoleReqExtra<'_>,
extra: &ConsoleReqExtra,
creds: &ComputeUserInfo,
) -> Result<AuthInfo, GetAuthInfoError> {
let request_id = uuid::Uuid::new_v4().to_string();
@@ -60,9 +60,9 @@ impl Api {
.header("Authorization", format!("Bearer {}", &self.jwt))
.query(&[("session_id", extra.session_id)])
.query(&[
("application_name", extra.application_name),
("project", Some(&creds.endpoint)),
("role", Some(&creds.inner.user)),
("application_name", extra.application_name.as_str()),
("project", creds.endpoint.as_str()),
("role", creds.inner.user.as_str()),
])
.build()?;
@@ -101,7 +101,7 @@ impl Api {
async fn do_wake_compute(
&self,
extra: &ConsoleReqExtra<'_>,
extra: &ConsoleReqExtra,
creds: &ComputeUserInfo,
) -> Result<NodeInfo, WakeComputeError> {
let request_id = uuid::Uuid::new_v4().to_string();
@@ -113,8 +113,8 @@ impl Api {
.header("Authorization", format!("Bearer {}", &self.jwt))
.query(&[("session_id", extra.session_id)])
.query(&[
("application_name", extra.application_name),
("project", Some(&creds.endpoint)),
("application_name", extra.application_name.as_str()),
("project", creds.endpoint.as_str()),
]);
request_builder = if extra.options.is_empty() {
@@ -161,7 +161,7 @@ impl super::Api for Api {
#[tracing::instrument(skip_all)]
async fn get_auth_info(
&self,
extra: &ConsoleReqExtra<'_>,
extra: &ConsoleReqExtra,
creds: &ComputeUserInfo,
) -> Result<AuthInfo, GetAuthInfoError> {
self.do_get_auth_info(extra, creds).await
@@ -169,7 +169,7 @@ impl super::Api for Api {
async fn get_allowed_ips(
&self,
extra: &ConsoleReqExtra<'_>,
extra: &ConsoleReqExtra,
creds: &ComputeUserInfo,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
let key: &str = &creds.endpoint;
@@ -192,7 +192,7 @@ impl super::Api for Api {
#[tracing::instrument(skip_all)]
async fn wake_compute(
&self,
extra: &ConsoleReqExtra<'_>,
extra: &ConsoleReqExtra,
creds: &ComputeUserInfo,
) -> Result<CachedNodeInfo, WakeComputeError> {
let key: &str = &creds.inner.cache_key;

View File

@@ -671,7 +671,7 @@ fn report_error(e: &WakeComputeError, retry: bool) {
pub async fn connect_to_compute<M: ConnectMechanism>(
mechanism: &M,
mut node_info: console::CachedNodeInfo,
extra: &console::ConsoleReqExtra<'_>,
extra: &console::ConsoleReqExtra,
creds: &auth::BackendType<'_, auth::backend::ComputeUserInfo>,
mut latency_timer: LatencyTimer,
) -> Result<M::Connection, M::Error>
@@ -968,13 +968,17 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
allow_self_signed_compute,
} = self;
let proto = mode.protocol_label();
let extra = console::ConsoleReqExtra {
session_id, // aka this connection's id
application_name: params.get("application_name"),
application_name: format!(
"{}/{}",
params.get("application_name").unwrap_or_default(),
proto
),
options: neon_options(params),
};
let mut latency_timer = LatencyTimer::new(mode.protocol_label());
let mut latency_timer = LatencyTimer::new(proto);
let user = creds.get_user().to_owned();
let auth_result = match creds
@@ -1012,7 +1016,6 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
.or_else(|e| stream.throw_error(e))
.await?;
let proto = mode.protocol_label();
NUM_DB_CONNECTIONS_OPENED_COUNTER
.with_label_values(&[proto])
.inc();

View File

@@ -484,13 +484,13 @@ fn helper_create_connect_info(
mechanism: &TestConnectMechanism,
) -> (
CachedNodeInfo,
console::ConsoleReqExtra<'static>,
console::ConsoleReqExtra,
auth::BackendType<'_, ComputeUserInfo>,
) {
let cache = helper_create_cached_node_info();
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some("TEST"),
application_name: "TEST".into(),
options: vec![],
};
let creds = auth::BackendType::Test(mechanism);

View File

@@ -37,7 +37,7 @@ use crate::proxy::ConnectMechanism;
use tracing::{error, warn, Span};
use tracing::{info, info_span, Instrument};
pub const APP_NAME: &str = "sql_over_http";
pub const APP_NAME: &str = "/sql_over_http";
const MAX_CONNS_PER_ENDPOINT: usize = 20;
#[derive(Debug, Clone)]
@@ -432,7 +432,7 @@ async fn connect_to_compute(
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some(APP_NAME),
application_name: APP_NAME.to_string(),
options: console_options,
};
// TODO(anna): this is a bit hacky way, consider using console notification listener.