remove some legacy from the early days of our redis support.

This commit is contained in:
Conrad Ludgate
2025-06-07 19:46:46 +01:00
parent b0c712f63f
commit 640500aa6d

View File

@@ -15,7 +15,7 @@ use remote_storage::RemoteStorageConfig;
use tokio::net::TcpListener;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, warn};
use tracing::{Instrument, info};
use utils::sentry_init::init_sentry;
use utils::{project_build_tag, project_git_version};
@@ -167,25 +167,25 @@ struct ProxyCliArgs {
/// cache for `role_secret` (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
role_secret_cache: String,
/// redis url for notifications (if empty, redis_host:port will be used for both notifications and streaming connections)
/// redis url for plain authentication
#[clap(long, alias("redis-notifications"))]
redis_plain: Option<String>,
/// what from the available authentications type to use for redis. Supported are "irsa" and "plain".
#[clap(long)]
redis_notifications: Option<String>,
/// what from the available authentications type to use for the regional redis we have. Supported are "irsa" and "plain".
#[clap(long, default_value = "irsa")]
redis_auth_type: String,
/// redis host for streaming connections (might be different from the notifications host)
redis_auth_type: Option<String>,
/// redis host for irsa authentication
#[clap(long)]
redis_host: Option<String>,
/// redis port for streaming connections (might be different from the notifications host)
/// redis port for irsa authentication
#[clap(long)]
redis_port: Option<u16>,
/// redis cluster name, used in aws elasticache
/// redis cluster name for irsa authentication
#[clap(long)]
redis_cluster_name: Option<String>,
/// redis user_id, used in aws elasticache
/// redis user_id for irsa authentication
#[clap(long)]
redis_user_id: Option<String>,
/// aws region to retrieve credentials
/// aws region for irsa authentication
#[clap(long, default_value_t = String::new())]
aws_region: String,
/// cache for `project_info` (use `size=0` to disable)
@@ -328,7 +328,7 @@ pub async fn run() -> anyhow::Result<()> {
Either::Right(auth_backend) => info!("Authentication backend: {auth_backend:?}"),
}
info!("Using region: {}", args.aws_region);
let (regional_redis_client, redis_notifications_client) = configure_redis(&args).await?;
let redis_client = configure_redis(&args).await?;
// Check that we can bind to address before further initialization
info!("Starting http on {}", args.http);
@@ -386,10 +386,6 @@ pub async fn run() -> anyhow::Result<()> {
let redis_rps_limit = Vec::leak(args.redis_rps_limit.clone());
RateBucketInfo::validate(redis_rps_limit)?;
let redis_kv_client = regional_redis_client
.as_ref()
.map(|redis_publisher| RedisKVClient::new(redis_publisher.clone(), redis_rps_limit));
// channel size should be higher than redis client limit to avoid blocking
let cancel_ch_size = args.cancellation_ch_size;
let (tx_cancel, rx_cancel) = tokio::sync::mpsc::channel(cancel_ch_size);
@@ -499,21 +495,14 @@ pub async fn run() -> anyhow::Result<()> {
#[cfg_attr(not(any(test, feature = "testing")), expect(irrefutable_let_patterns))]
if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend {
if let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api {
match (redis_notifications_client, regional_redis_client.clone()) {
(None, None) => {}
(client1, client2) => {
let cache = api.caches.project_info.clone();
if let Some(client) = client1 {
maintenance_tasks.spawn(notifications::task_main(client, cache.clone()));
}
if let Some(client) = client2 {
maintenance_tasks.spawn(notifications::task_main(client, cache.clone()));
}
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
}
}
if let Some(client) = redis_client {
// project info cache and invalidation of that cache.
let cache = api.caches.project_info.clone();
maintenance_tasks.spawn(notifications::task_main(client.clone(), cache.clone()));
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
if let Some(mut redis_kv_client) = redis_kv_client {
// cancellation key management
let mut redis_kv_client = RedisKVClient::new(client.clone(), redis_rps_limit);
maintenance_tasks.spawn(async move {
redis_kv_client.try_connect().await?;
handle_cancel_messages(
@@ -530,14 +519,12 @@ pub async fn run() -> anyhow::Result<()> {
// so let's wait forever instead.
std::future::pending().await
});
}
if let Some(regional_redis_client) = regional_redis_client {
// listen for notifications of new projects/endpoints/branches
let cache = api.caches.endpoints_cache.clone();
let con = regional_redis_client;
let span = tracing::info_span!("endpoints_cache");
maintenance_tasks.spawn(
async move { cache.do_read(con, cancellation_token.clone()).await }
async move { cache.do_read(client, cancellation_token.clone()).await }
.instrument(span),
);
}
@@ -827,21 +814,39 @@ fn build_auth_backend(
async fn configure_redis(
args: &ProxyCliArgs,
) -> anyhow::Result<(
Option<ConnectionWithCredentialsProvider>,
Option<ConnectionWithCredentialsProvider>,
)> {
) -> anyhow::Result<Option<ConnectionWithCredentialsProvider>> {
// For some reason, we have two redis'.
// Why?
// In the past, we used to have a single global redis instance,
// as redis was only used for console<->cplane communication.
//
// After proxy started using redis, this needed fixing so we added the regional
// redis instances after.
//
//
// regional_redis is used for:
// 1. Stream of new endpoints/projects/branches.
// 2. KV for cancellation keys
//
// redis_notifications is used for:
// 1. Stream of new endpoints/projects/branches.
//
// In AWS, these are different[citation needed]
// In Azure, these are the same[citation needed]
//
// I think we can get rid of the notifications junk by now. To confirm.
// TODO: untangle the config args
let regional_redis_client = match (args.redis_auth_type.as_str(), &args.redis_notifications) {
("plain", redis_url) => match redis_url {
let redis_client = match args.redis_auth_type.as_deref() {
Some("plain") => match &args.redis_plain {
None => {
bail!("plain auth requires redis_notifications to be set");
bail!("plain auth requires redis_plain to be set");
}
Some(url) => {
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url.clone()))
}
},
("irsa", _) => match (&args.redis_host, args.redis_port) {
Some("irsa") => match (&args.redis_host, args.redis_port) {
(Some(host), Some(port)) => Some(
ConnectionWithCredentialsProvider::new_with_credentials_provider(
host.clone(),
@@ -854,29 +859,30 @@ async fn configure_redis(
.await,
),
),
(None, None) => {
// todo: upgrade to error?
warn!(
"irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client"
);
None
}
// (None, None) => {
// // todo: upgrade to error?
// warn!(
// "irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client"
// );
// None
// }
_ => {
bail!("redis-host and redis-port must be specified together");
bail!("redis-host and redis-port must be specified together")
}
},
_ => {
bail!("unknown auth type given");
Some(auth_type) => {
bail!("unknown auth type {auth_type:?} given")
}
None => None,
};
let redis_notifications_client = if let Some(url) = &args.redis_notifications {
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(&**url))
} else {
regional_redis_client.clone()
};
// let redis_notifications_client = if let Some(url) = &args.redis_notifications {
// Some(ConnectionWithCredentialsProvider::new_with_static_credentials(&**url))
// } else {
// regional_redis_client.clone()
// };
Ok((regional_redis_client, redis_notifications_client))
Ok(redis_client)
}
#[cfg(test)]