Compare commits

...

6 Commits

Author SHA1 Message Date
Anna Khanova
bbdcfb7984 Log role 2024-04-17 13:39:14 +02:00
Anna Khanova
f9627729e4 Fmt 2024-04-17 12:59:58 +02:00
Anna Khanova
142ed18254 Review 2024-04-17 12:59:53 +02:00
Anna Khanova
33d1041d58 More logging 2024-04-17 12:48:55 +02:00
Anna Khanova
e0a266942c Confirm that connection was succesfully esablished 2024-04-17 12:39:29 +02:00
Anna Khanova
684d733ce8 proxy: Improve logging 2024-04-17 12:24:12 +02:00
4 changed files with 42 additions and 6 deletions

View File

@@ -42,6 +42,7 @@ use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::info; use tracing::info;
use tracing::warn; use tracing::warn;
use tracing::Instrument;
use utils::{project_build_tag, project_git_version, sentry_init::init_sentry}; use utils::{project_build_tag, project_git_version, sentry_init::init_sentry};
project_git_version!(GIT_VERSION); project_git_version!(GIT_VERSION);
@@ -418,7 +419,8 @@ async fn main() -> anyhow::Result<()> {
if let Some(regional_redis_client) = regional_redis_client { if let Some(regional_redis_client) = regional_redis_client {
let cache = api.caches.endpoints_cache.clone(); let cache = api.caches.endpoints_cache.clone();
let con = regional_redis_client; let con = regional_redis_client;
maintenance_tasks.spawn(async move { cache.do_read(con).await }); let span = tracing::info_span!("endpoints_cache");
maintenance_tasks.spawn(async move { cache.do_read(con).await }.instrument(span));
} }
} }
} }

View File

@@ -13,6 +13,7 @@ use redis::{
}; };
use serde::Deserialize; use serde::Deserialize;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::info;
use crate::{ use crate::{
config::EndpointCacheConfig, config::EndpointCacheConfig,
@@ -71,7 +72,9 @@ impl EndpointsCache {
} }
// If cache is disabled, just collect the metrics and return. // If cache is disabled, just collect the metrics and return.
if self.config.disable_cache { if self.config.disable_cache {
ctx.set_rejected(self.should_reject(endpoint)); let rejected = self.should_reject(endpoint);
ctx.set_rejected(rejected);
info!(?rejected, "check endpoint is valid, disabled cache");
return true; return true;
} }
// If the limiter allows, we don't need to check the cache. // If the limiter allows, we don't need to check the cache.
@@ -79,6 +82,7 @@ impl EndpointsCache {
return true; return true;
} }
let rejected = self.should_reject(endpoint); let rejected = self.should_reject(endpoint);
info!(?rejected, "check endpoint is valid, enabled cache");
ctx.set_rejected(rejected); ctx.set_rejected(rejected);
!rejected !rejected
} }
@@ -171,6 +175,9 @@ impl EndpointsCache {
if res.keys.is_empty() { if res.keys.is_empty() {
if return_when_finish { if return_when_finish {
if total != 0 {
break;
}
anyhow::bail!( anyhow::bail!(
"Redis stream {} is empty, cannot be used to filter endpoints", "Redis stream {} is empty, cannot be used to filter endpoints",
self.config.stream_name self.config.stream_name

View File

@@ -5,7 +5,7 @@ use once_cell::sync::OnceCell;
use smol_str::SmolStr; use smol_str::SmolStr;
use std::net::IpAddr; use std::net::IpAddr;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing::{field::display, info_span, Span}; use tracing::{field::display, info, info_span, Span};
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
@@ -76,6 +76,7 @@ impl RequestMonitoring {
?session_id, ?session_id,
%peer_addr, %peer_addr,
ep = tracing::field::Empty, ep = tracing::field::Empty,
role = tracing::field::Empty,
); );
Self { Self {
@@ -157,6 +158,7 @@ impl RequestMonitoring {
} }
pub fn set_user(&mut self, user: RoleName) { pub fn set_user(&mut self, user: RoleName) {
self.span.record("role", display(&user));
self.user = Some(user); self.user = Some(user);
} }
@@ -198,12 +200,25 @@ impl Drop for RequestMonitoring {
} else { } else {
ConnectOutcome::Failed ConnectOutcome::Failed
}; };
let rejected = self.rejected;
let ep = self
.endpoint_id
.as_ref()
.map(|x| x.as_str())
.unwrap_or_default();
// This makes sense only if cache is disabled
info!(
?ep,
?outcome,
?rejected,
"check endpoint is valid with outcome"
);
Metrics::get() Metrics::get()
.proxy .proxy
.invalid_endpoints_total .invalid_endpoints_total
.inc(InvalidEndpointsGroup { .inc(InvalidEndpointsGroup {
protocol: self.protocol, protocol: self.protocol,
rejected: self.rejected.into(), rejected: rejected.into(),
outcome, outcome,
}); });
if let Some(tx) = self.sender.take() { if let Some(tx) = self.sender.take() {

View File

@@ -77,10 +77,14 @@ impl ConnectionWithCredentialsProvider {
} }
} }
async fn ping(con: &mut MultiplexedConnection) -> RedisResult<()> {
redis::cmd("PING").query_async(con).await
}
pub async fn connect(&mut self) -> anyhow::Result<()> { pub async fn connect(&mut self) -> anyhow::Result<()> {
let _guard = self.mutex.lock().await; let _guard = self.mutex.lock().await;
if let Some(con) = self.con.as_mut() { if let Some(con) = self.con.as_mut() {
match redis::cmd("PING").query_async(con).await { match Self::ping(con).await {
Ok(()) => { Ok(()) => {
return Ok(()); return Ok(());
} }
@@ -96,7 +100,7 @@ impl ConnectionWithCredentialsProvider {
if let Some(f) = self.refresh_token_task.take() { if let Some(f) = self.refresh_token_task.take() {
f.abort() f.abort()
} }
let con = self let mut con = self
.get_client() .get_client()
.await? .await?
.get_multiplexed_tokio_connection() .get_multiplexed_tokio_connection()
@@ -109,6 +113,14 @@ impl ConnectionWithCredentialsProvider {
}); });
self.refresh_token_task = Some(f); self.refresh_token_task = Some(f);
} }
match Self::ping(&mut con).await {
Ok(()) => {
info!("Connection succesfully established");
}
Err(e) => {
error!("Connection is broken. Error during PING: {e:?}");
}
}
self.con = Some(con); self.con = Some(con);
Ok(()) Ok(())
} }