mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
proxy: Improve logging (#7405)
## Problem It's unclear from logs what's going on with the regional redis. ## Summary of changes Make logs better.
This commit is contained in:
@@ -42,6 +42,7 @@ use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
use tracing::Instrument;
|
||||
use utils::{project_build_tag, project_git_version, sentry_init::init_sentry};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
@@ -418,7 +419,8 @@ async fn main() -> anyhow::Result<()> {
|
||||
if let Some(regional_redis_client) = regional_redis_client {
|
||||
let cache = api.caches.endpoints_cache.clone();
|
||||
let con = regional_redis_client;
|
||||
maintenance_tasks.spawn(async move { cache.do_read(con).await });
|
||||
let span = tracing::info_span!("endpoints_cache");
|
||||
maintenance_tasks.spawn(async move { cache.do_read(con).await }.instrument(span));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
9
proxy/src/cache/endpoints.rs
vendored
9
proxy/src/cache/endpoints.rs
vendored
@@ -13,6 +13,7 @@ use redis::{
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::info;
|
||||
|
||||
use crate::{
|
||||
config::EndpointCacheConfig,
|
||||
@@ -71,7 +72,9 @@ impl EndpointsCache {
|
||||
}
|
||||
// If cache is disabled, just collect the metrics and return.
|
||||
if self.config.disable_cache {
|
||||
ctx.set_rejected(self.should_reject(endpoint));
|
||||
let rejected = self.should_reject(endpoint);
|
||||
ctx.set_rejected(rejected);
|
||||
info!(?rejected, "check endpoint is valid, disabled cache");
|
||||
return true;
|
||||
}
|
||||
// If the limiter allows, we don't need to check the cache.
|
||||
@@ -79,6 +82,7 @@ impl EndpointsCache {
|
||||
return true;
|
||||
}
|
||||
let rejected = self.should_reject(endpoint);
|
||||
info!(?rejected, "check endpoint is valid, enabled cache");
|
||||
ctx.set_rejected(rejected);
|
||||
!rejected
|
||||
}
|
||||
@@ -171,6 +175,9 @@ impl EndpointsCache {
|
||||
|
||||
if res.keys.is_empty() {
|
||||
if return_when_finish {
|
||||
if total != 0 {
|
||||
break;
|
||||
}
|
||||
anyhow::bail!(
|
||||
"Redis stream {} is empty, cannot be used to filter endpoints",
|
||||
self.config.stream_name
|
||||
|
||||
@@ -5,7 +5,7 @@ use once_cell::sync::OnceCell;
|
||||
use smol_str::SmolStr;
|
||||
use std::net::IpAddr;
|
||||
use tokio::sync::mpsc;
|
||||
use tracing::{field::display, info_span, Span};
|
||||
use tracing::{field::display, info, info_span, Span};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
@@ -198,12 +198,25 @@ impl Drop for RequestMonitoring {
|
||||
} else {
|
||||
ConnectOutcome::Failed
|
||||
};
|
||||
let rejected = self.rejected;
|
||||
let ep = self
|
||||
.endpoint_id
|
||||
.as_ref()
|
||||
.map(|x| x.as_str())
|
||||
.unwrap_or_default();
|
||||
// This makes sense only if cache is disabled
|
||||
info!(
|
||||
?ep,
|
||||
?outcome,
|
||||
?rejected,
|
||||
"check endpoint is valid with outcome"
|
||||
);
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.invalid_endpoints_total
|
||||
.inc(InvalidEndpointsGroup {
|
||||
protocol: self.protocol,
|
||||
rejected: self.rejected.into(),
|
||||
rejected: rejected.into(),
|
||||
outcome,
|
||||
});
|
||||
if let Some(tx) = self.sender.take() {
|
||||
|
||||
@@ -77,10 +77,14 @@ impl ConnectionWithCredentialsProvider {
|
||||
}
|
||||
}
|
||||
|
||||
async fn ping(con: &mut MultiplexedConnection) -> RedisResult<()> {
|
||||
redis::cmd("PING").query_async(con).await
|
||||
}
|
||||
|
||||
pub async fn connect(&mut self) -> anyhow::Result<()> {
|
||||
let _guard = self.mutex.lock().await;
|
||||
if let Some(con) = self.con.as_mut() {
|
||||
match redis::cmd("PING").query_async(con).await {
|
||||
match Self::ping(con).await {
|
||||
Ok(()) => {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -96,7 +100,7 @@ impl ConnectionWithCredentialsProvider {
|
||||
if let Some(f) = self.refresh_token_task.take() {
|
||||
f.abort()
|
||||
}
|
||||
let con = self
|
||||
let mut con = self
|
||||
.get_client()
|
||||
.await?
|
||||
.get_multiplexed_tokio_connection()
|
||||
@@ -109,6 +113,14 @@ impl ConnectionWithCredentialsProvider {
|
||||
});
|
||||
self.refresh_token_task = Some(f);
|
||||
}
|
||||
match Self::ping(&mut con).await {
|
||||
Ok(()) => {
|
||||
info!("Connection succesfully established");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Connection is broken. Error during PING: {e:?}");
|
||||
}
|
||||
}
|
||||
self.con = Some(con);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user