diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index 8cd30e2ccf..6d30d24432 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -15,7 +15,7 @@ use remote_storage::RemoteStorageConfig; use tokio::net::TcpListener; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; -use tracing::{Instrument, info, warn}; +use tracing::{Instrument, info}; use utils::sentry_init::init_sentry; use utils::{project_build_tag, project_git_version}; @@ -167,25 +167,25 @@ struct ProxyCliArgs { /// cache for `role_secret` (use `size=0` to disable) #[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)] role_secret_cache: String, - /// redis url for notifications (if empty, redis_host:port will be used for both notifications and streaming connections) + /// redis url for plain authentication + #[clap(long, alias("redis-notifications"))] + redis_plain: Option, + /// what from the available authentications type to use for redis. Supported are "irsa" and "plain". #[clap(long)] - redis_notifications: Option, - /// what from the available authentications type to use for the regional redis we have. Supported are "irsa" and "plain". - #[clap(long, default_value = "irsa")] - redis_auth_type: String, - /// redis host for streaming connections (might be different from the notifications host) + redis_auth_type: Option, + /// redis host for irsa authentication #[clap(long)] redis_host: Option, - /// redis port for streaming connections (might be different from the notifications host) + /// redis port for irsa authentication #[clap(long)] redis_port: Option, - /// redis cluster name, used in aws elasticache + /// redis cluster name for irsa authentication #[clap(long)] redis_cluster_name: Option, - /// redis user_id, used in aws elasticache + /// redis user_id for irsa authentication #[clap(long)] redis_user_id: Option, - /// aws region to retrieve credentials + /// aws region for irsa authentication #[clap(long, default_value_t = String::new())] aws_region: String, /// cache for `project_info` (use `size=0` to disable) @@ -328,7 +328,7 @@ pub async fn run() -> anyhow::Result<()> { Either::Right(auth_backend) => info!("Authentication backend: {auth_backend:?}"), } info!("Using region: {}", args.aws_region); - let (regional_redis_client, redis_notifications_client) = configure_redis(&args).await?; + let redis_client = configure_redis(&args).await?; // Check that we can bind to address before further initialization info!("Starting http on {}", args.http); @@ -386,10 +386,6 @@ pub async fn run() -> anyhow::Result<()> { let redis_rps_limit = Vec::leak(args.redis_rps_limit.clone()); RateBucketInfo::validate(redis_rps_limit)?; - let redis_kv_client = regional_redis_client - .as_ref() - .map(|redis_publisher| RedisKVClient::new(redis_publisher.clone(), redis_rps_limit)); - // channel size should be higher than redis client limit to avoid blocking let cancel_ch_size = args.cancellation_ch_size; let (tx_cancel, rx_cancel) = tokio::sync::mpsc::channel(cancel_ch_size); @@ -499,21 +495,14 @@ pub async fn run() -> anyhow::Result<()> { #[cfg_attr(not(any(test, feature = "testing")), expect(irrefutable_let_patterns))] if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend { if let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api { - match (redis_notifications_client, regional_redis_client.clone()) { - (None, None) => {} - (client1, client2) => { - let cache = api.caches.project_info.clone(); - if let Some(client) = client1 { - maintenance_tasks.spawn(notifications::task_main(client, cache.clone())); - } - if let Some(client) = client2 { - maintenance_tasks.spawn(notifications::task_main(client, cache.clone())); - } - maintenance_tasks.spawn(async move { cache.clone().gc_worker().await }); - } - } + if let Some(client) = redis_client { + // project info cache and invalidation of that cache. + let cache = api.caches.project_info.clone(); + maintenance_tasks.spawn(notifications::task_main(client.clone(), cache.clone())); + maintenance_tasks.spawn(async move { cache.clone().gc_worker().await }); - if let Some(mut redis_kv_client) = redis_kv_client { + // cancellation key management + let mut redis_kv_client = RedisKVClient::new(client.clone(), redis_rps_limit); maintenance_tasks.spawn(async move { redis_kv_client.try_connect().await?; handle_cancel_messages( @@ -530,14 +519,12 @@ pub async fn run() -> anyhow::Result<()> { // so let's wait forever instead. std::future::pending().await }); - } - if let Some(regional_redis_client) = regional_redis_client { + // listen for notifications of new projects/endpoints/branches let cache = api.caches.endpoints_cache.clone(); - let con = regional_redis_client; let span = tracing::info_span!("endpoints_cache"); maintenance_tasks.spawn( - async move { cache.do_read(con, cancellation_token.clone()).await } + async move { cache.do_read(client, cancellation_token.clone()).await } .instrument(span), ); } @@ -827,21 +814,39 @@ fn build_auth_backend( async fn configure_redis( args: &ProxyCliArgs, -) -> anyhow::Result<( - Option, - Option, -)> { +) -> anyhow::Result> { + // For some reason, we have two redis'. + // Why? + // In the past, we used to have a single global redis instance, + // as redis was only used for console<->cplane communication. + // + // After proxy started using redis, this needed fixing so we added the regional + // redis instances after. + // + // + // regional_redis is used for: + // 1. Stream of new endpoints/projects/branches. + // 2. KV for cancellation keys + // + // redis_notifications is used for: + // 1. Stream of new endpoints/projects/branches. + // + // In AWS, these are different[citation needed] + // In Azure, these are the same[citation needed] + // + // I think we can get rid of the notifications junk by now. To confirm. + // TODO: untangle the config args - let regional_redis_client = match (args.redis_auth_type.as_str(), &args.redis_notifications) { - ("plain", redis_url) => match redis_url { + let redis_client = match args.redis_auth_type.as_deref() { + Some("plain") => match &args.redis_plain { None => { - bail!("plain auth requires redis_notifications to be set"); + bail!("plain auth requires redis_plain to be set"); } Some(url) => { Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url.clone())) } }, - ("irsa", _) => match (&args.redis_host, args.redis_port) { + Some("irsa") => match (&args.redis_host, args.redis_port) { (Some(host), Some(port)) => Some( ConnectionWithCredentialsProvider::new_with_credentials_provider( host.clone(), @@ -854,29 +859,30 @@ async fn configure_redis( .await, ), ), - (None, None) => { - // todo: upgrade to error? - warn!( - "irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client" - ); - None - } + // (None, None) => { + // // todo: upgrade to error? + // warn!( + // "irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client" + // ); + // None + // } _ => { - bail!("redis-host and redis-port must be specified together"); + bail!("redis-host and redis-port must be specified together") } }, - _ => { - bail!("unknown auth type given"); + Some(auth_type) => { + bail!("unknown auth type {auth_type:?} given") } + None => None, }; - let redis_notifications_client = if let Some(url) = &args.redis_notifications { - Some(ConnectionWithCredentialsProvider::new_with_static_credentials(&**url)) - } else { - regional_redis_client.clone() - }; + // let redis_notifications_client = if let Some(url) = &args.redis_notifications { + // Some(ConnectionWithCredentialsProvider::new_with_static_credentials(&**url)) + // } else { + // regional_redis_client.clone() + // }; - Ok((regional_redis_client, redis_notifications_client)) + Ok(redis_client) } #[cfg(test)]