mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-22 12:52:55 +00:00
Compare commits
8 Commits
min_prefet
...
conrad/pro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4ada80d915 | ||
|
|
fd263a0c23 | ||
|
|
26dc39053e | ||
|
|
1f62ee5f5c | ||
|
|
e78254657a | ||
|
|
640500aa6d | ||
|
|
b0c712f63f | ||
|
|
f84e73c323 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5273,6 +5273,7 @@ dependencies = [
|
||||
"tokio-rustls 0.26.2",
|
||||
"tokio-tungstenite 0.21.0",
|
||||
"tokio-util",
|
||||
"toml",
|
||||
"tracing",
|
||||
"tracing-log",
|
||||
"tracing-opentelemetry",
|
||||
|
||||
@@ -89,6 +89,7 @@ tokio-postgres = { workspace = true, optional = true }
|
||||
tokio-rustls.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tokio = { workspace = true, features = ["signal"] }
|
||||
toml.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -279,7 +279,6 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
|
||||
},
|
||||
proxy_protocol_v2: config::ProxyProtocolV2::Rejected,
|
||||
handshake_timeout: Duration::from_secs(10),
|
||||
region: "local".into(),
|
||||
wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?,
|
||||
connect_compute_locks,
|
||||
connect_to_compute: compute_config,
|
||||
|
||||
@@ -237,7 +237,6 @@ pub(super) async fn task_main(
|
||||
extra: None,
|
||||
},
|
||||
crate::metrics::Protocol::SniRouter,
|
||||
"sni",
|
||||
);
|
||||
handle_client(ctx, dest_suffix, tls_config, compute_tls_config, socket).await
|
||||
}
|
||||
|
||||
@@ -8,14 +8,15 @@ use std::time::Duration;
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
use anyhow::Context;
|
||||
use anyhow::{bail, ensure};
|
||||
use anyhow::{bail, anyhow};
|
||||
use arc_swap::ArcSwapOption;
|
||||
use futures::future::Either;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use serde::Deserialize;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, info, warn};
|
||||
use tracing::{Instrument, info};
|
||||
use utils::sentry_init::init_sentry;
|
||||
use utils::{project_build_tag, project_git_version};
|
||||
|
||||
@@ -39,7 +40,7 @@ use crate::serverless::cancel_set::CancelSet;
|
||||
use crate::tls::client_config::compute_client_config_with_root_certs;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
use crate::url::ApiUrl;
|
||||
use crate::{auth, control_plane, http, serverless, usage_metrics};
|
||||
use crate::{auth, control_plane, http, pglb, serverless, usage_metrics};
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
project_build_tag!(BUILD_TAG);
|
||||
@@ -59,6 +60,262 @@ enum AuthBackendType {
|
||||
Postgres,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Root {
|
||||
#[serde(flatten)]
|
||||
legacy: LegacyModes,
|
||||
introspection: Introspection,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(untagged)]
|
||||
enum LegacyModes {
|
||||
Proxy {
|
||||
pglb: Pglb,
|
||||
neonkeeper: NeonKeeper,
|
||||
http: Option<Http>,
|
||||
pg_sni_router: Option<PgSniRouter>,
|
||||
},
|
||||
AuthBroker {
|
||||
neonkeeper: NeonKeeper,
|
||||
http: Http,
|
||||
},
|
||||
ConsoleRedirect {
|
||||
console_redirect: ConsoleRedirect,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Pglb {
|
||||
listener: Listener,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Listener {
|
||||
/// address to bind to
|
||||
addr: SocketAddr,
|
||||
/// which header should we expect to see on this socket
|
||||
/// from the load balancer
|
||||
header: Option<ProxyHeader>,
|
||||
|
||||
/// certificates used for TLS.
|
||||
/// first cert is the default.
|
||||
/// TLS not used if no certs provided.
|
||||
certs: Vec<KeyPair>,
|
||||
|
||||
/// Timeout to use for TLS handshake
|
||||
timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
enum ProxyHeader {
|
||||
/// Accept the PROXY! protocol V2.
|
||||
ProxyProtocolV2(ProxyProtocolV2Kind),
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
enum ProxyProtocolV2Kind {
|
||||
/// Expect AWS TLVs in the header.
|
||||
Aws,
|
||||
/// Expect Azure TLVs in the header.
|
||||
Azure,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct KeyPair {
|
||||
key: PathBuf,
|
||||
cert: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
/// The service that authenticates all incoming connection attempts,
|
||||
/// provides monitoring and also wakes computes.
|
||||
struct NeonKeeper {
|
||||
cplane: ControlPlaneBackend,
|
||||
redis: Option<Redis>,
|
||||
auth: Vec<AuthMechanism>,
|
||||
|
||||
/// map of endpoint->computeinfo
|
||||
compute: Cache,
|
||||
/// cache for GetEndpointAccessControls.
|
||||
project_info_cache: config::ProjectInfoCacheOptions,
|
||||
/// cache for all valid endpoints
|
||||
endpoint_cache_config: config::EndpointCacheConfig,
|
||||
|
||||
request_log_export: Option<RequestLogExport>,
|
||||
data_transfer_export: Option<DataTransferExport>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Redis {
|
||||
/// Cancellation channel size (max queue size for redis kv client)
|
||||
cancellation_ch_size: usize,
|
||||
/// Cancellation ops batch size for redis
|
||||
cancellation_batch_size: usize,
|
||||
|
||||
auth: RedisAuthentication,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
enum RedisAuthentication {
|
||||
/// i don't remember what this stands for.
|
||||
/// IAM roles for service accounts?
|
||||
Irsa {
|
||||
host: String,
|
||||
port: u16,
|
||||
cluster_name: Option<String>,
|
||||
user_id: Option<String>,
|
||||
aws_region: String,
|
||||
},
|
||||
Basic {
|
||||
url: url::Url,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct PgSniRouter {
|
||||
/// The listener to use to proxy connections to compute,
|
||||
/// assuming the compute does not support TLS.
|
||||
listener: Listener,
|
||||
|
||||
/// The listener to use to proxy connections to compute,
|
||||
/// assuming the compute requires TLS.
|
||||
listener_tls: Listener,
|
||||
|
||||
/// append this domain zone to the SNI hostname to get the destination address
|
||||
dest: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
/// `psql -h pg.neon.tech`.
|
||||
struct ConsoleRedirect {
|
||||
/// Connection requests from clients.
|
||||
listener: Listener,
|
||||
/// Messages from control plane to accept the connection.
|
||||
cplane: Listener,
|
||||
|
||||
/// The base url to use for redirects.
|
||||
console: url::Url,
|
||||
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
enum ControlPlaneBackend {
|
||||
/// Use the HTTP API to access the control plane.
|
||||
Http(url::Url),
|
||||
/// Stub the control plane with a postgres instance.
|
||||
#[cfg(feature = "testing")]
|
||||
PostgresMock(url::Url),
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Http {
|
||||
listener: Listener,
|
||||
sql_over_http: SqlOverHttp,
|
||||
|
||||
// todo: move into Pglb.
|
||||
websockets: Option<Websockets>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct SqlOverHttp {
|
||||
pool_max_conns_per_endpoint: usize,
|
||||
pool_max_total_conns: usize,
|
||||
pool_idle_timeout: Duration,
|
||||
pool_gc_epoch: Duration,
|
||||
pool_shards: usize,
|
||||
|
||||
client_conn_threshold: u64,
|
||||
cancel_set_shards: usize,
|
||||
|
||||
timeout: Duration,
|
||||
max_request_size_bytes: usize,
|
||||
max_response_size_bytes: usize,
|
||||
|
||||
auth: Vec<AuthMechanism>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
enum AuthMechanism {
|
||||
Sasl {
|
||||
/// timeout for SASL handshake
|
||||
timeout: Duration,
|
||||
},
|
||||
CleartextPassword {
|
||||
/// number of threads for the thread pool
|
||||
threads: usize,
|
||||
},
|
||||
// add something about the jwks cache i guess.
|
||||
Jwt {},
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Websockets {
|
||||
auth: Vec<AuthMechanism>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
/// The HTTP API used for internal monitoring.
|
||||
struct Introspection {
|
||||
listener: Listener,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
enum RequestLogExport {
|
||||
Parquet {
|
||||
location: RemoteStorageConfig,
|
||||
disconnect: RemoteStorageConfig,
|
||||
|
||||
/// The region identifier to tag the entries with.
|
||||
region: String,
|
||||
|
||||
/// How many rows to include in a row group
|
||||
row_group_size: usize,
|
||||
|
||||
/// How large each column page should be in bytes
|
||||
page_size: usize,
|
||||
|
||||
/// How large the total parquet file should be in bytes
|
||||
size: i64,
|
||||
|
||||
/// How long to wait before forcing a file upload
|
||||
maximum_duration: tokio::time::Duration,
|
||||
// /// What level of compression to use
|
||||
// compression: Compression,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
enum Cache {
|
||||
/// Expire by LRU or by idle.
|
||||
/// Note: "live" in "time-to-live" actually means idle here.
|
||||
LruTtl {
|
||||
/// Max number of entries.
|
||||
size: usize,
|
||||
/// Entry's time-to-live.
|
||||
ttl: Duration,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct DataTransferExport {
|
||||
/// http endpoint to receive periodic metric updates
|
||||
endpoint: Option<String>,
|
||||
/// how often metrics should be sent to a collection endpoint
|
||||
interval: Option<String>,
|
||||
|
||||
/// interval for backup metric collection
|
||||
backup_interval: std::time::Duration,
|
||||
/// remote storage configuration for backup metric collection
|
||||
/// Encoded as toml (same format as pageservers), eg
|
||||
/// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
|
||||
backup_remote_storage: Option<RemoteStorageConfig>,
|
||||
/// chunk size for backup metric collection
|
||||
/// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression.
|
||||
backup_chunk_size: usize,
|
||||
}
|
||||
|
||||
/// Neon proxy/router
|
||||
#[derive(Parser)]
|
||||
#[command(version = GIT_VERSION, about)]
|
||||
@@ -120,12 +377,6 @@ struct ProxyCliArgs {
|
||||
/// timeout for the TLS handshake
|
||||
#[clap(long, default_value = "15s", value_parser = humantime::parse_duration)]
|
||||
handshake_timeout: tokio::time::Duration,
|
||||
/// http endpoint to receive periodic metric updates
|
||||
#[clap(long)]
|
||||
metric_collection_endpoint: Option<String>,
|
||||
/// how often metrics should be sent to a collection endpoint
|
||||
#[clap(long)]
|
||||
metric_collection_interval: Option<String>,
|
||||
/// cache for `wake_compute` api method (use `size=0` to disable)
|
||||
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
|
||||
wake_compute_cache: String,
|
||||
@@ -152,40 +403,31 @@ struct ProxyCliArgs {
|
||||
/// Wake compute rate limiter max number of requests per second.
|
||||
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)]
|
||||
wake_compute_limit: Vec<RateBucketInfo>,
|
||||
/// Redis rate limiter max number of requests per second.
|
||||
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_REDIS_SET)]
|
||||
redis_rps_limit: Vec<RateBucketInfo>,
|
||||
/// Cancellation channel size (max queue size for redis kv client)
|
||||
#[clap(long, default_value_t = 1024)]
|
||||
cancellation_ch_size: usize,
|
||||
/// Cancellation ops batch size for redis
|
||||
#[clap(long, default_value_t = 8)]
|
||||
cancellation_batch_size: usize,
|
||||
/// cache for `allowed_ips` (use `size=0` to disable)
|
||||
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
|
||||
allowed_ips_cache: String,
|
||||
/// cache for `role_secret` (use `size=0` to disable)
|
||||
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
|
||||
role_secret_cache: String,
|
||||
/// redis url for notifications (if empty, redis_host:port will be used for both notifications and streaming connections)
|
||||
/// redis url for plain authentication
|
||||
#[clap(long, alias("redis-notifications"))]
|
||||
redis_plain: Option<String>,
|
||||
/// what from the available authentications type to use for redis. Supported are "irsa" and "plain".
|
||||
#[clap(long)]
|
||||
redis_notifications: Option<String>,
|
||||
/// what from the available authentications type to use for the regional redis we have. Supported are "irsa" and "plain".
|
||||
#[clap(long, default_value = "irsa")]
|
||||
redis_auth_type: String,
|
||||
/// redis host for streaming connections (might be different from the notifications host)
|
||||
redis_auth_type: Option<String>,
|
||||
/// redis host for irsa authentication
|
||||
#[clap(long)]
|
||||
redis_host: Option<String>,
|
||||
/// redis port for streaming connections (might be different from the notifications host)
|
||||
/// redis port for irsa authentication
|
||||
#[clap(long)]
|
||||
redis_port: Option<u16>,
|
||||
/// redis cluster name, used in aws elasticache
|
||||
/// redis cluster name for irsa authentication
|
||||
#[clap(long)]
|
||||
redis_cluster_name: Option<String>,
|
||||
/// redis user_id, used in aws elasticache
|
||||
/// redis user_id for irsa authentication
|
||||
#[clap(long)]
|
||||
redis_user_id: Option<String>,
|
||||
/// aws region to retrieve credentials
|
||||
/// aws region for irsa authentication
|
||||
#[clap(long, default_value_t = String::new())]
|
||||
aws_region: String,
|
||||
/// cache for `project_info` (use `size=0` to disable)
|
||||
@@ -197,6 +439,12 @@ struct ProxyCliArgs {
|
||||
#[clap(flatten)]
|
||||
parquet_upload: ParquetUploadArgs,
|
||||
|
||||
/// http endpoint to receive periodic metric updates
|
||||
#[clap(long)]
|
||||
metric_collection_endpoint: Option<String>,
|
||||
/// how often metrics should be sent to a collection endpoint
|
||||
#[clap(long)]
|
||||
metric_collection_interval: Option<String>,
|
||||
/// interval for backup metric collection
|
||||
#[clap(long, default_value = "10m", value_parser = humantime::parse_duration)]
|
||||
metric_backup_collection_interval: std::time::Duration,
|
||||
@@ -209,6 +457,7 @@ struct ProxyCliArgs {
|
||||
/// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression.
|
||||
#[clap(long, default_value = "4194304")]
|
||||
metric_backup_collection_chunk_size: usize,
|
||||
|
||||
/// Whether to retry the connection to the compute node
|
||||
#[clap(long, default_value = config::RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)]
|
||||
connect_to_compute_retry: String,
|
||||
@@ -319,208 +568,120 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
let args = ProxyCliArgs::parse();
|
||||
let config = build_config(&args)?;
|
||||
let auth_backend = build_auth_backend(&args)?;
|
||||
|
||||
match auth_backend {
|
||||
Either::Left(auth_backend) => info!("Authentication backend: {auth_backend}"),
|
||||
Either::Right(auth_backend) => info!("Authentication backend: {auth_backend:?}"),
|
||||
}
|
||||
info!("Using region: {}", args.aws_region);
|
||||
let (regional_redis_client, redis_notifications_client) = configure_redis(&args).await?;
|
||||
|
||||
// Check that we can bind to address before further initialization
|
||||
info!("Starting http on {}", args.http);
|
||||
let http_listener = TcpListener::bind(args.http).await?.into_std()?;
|
||||
|
||||
info!("Starting mgmt on {}", args.mgmt);
|
||||
let mgmt_listener = TcpListener::bind(args.mgmt).await?;
|
||||
|
||||
let proxy_listener = if args.is_auth_broker {
|
||||
None
|
||||
} else {
|
||||
info!("Starting proxy on {}", args.proxy);
|
||||
Some(TcpListener::bind(args.proxy).await?)
|
||||
};
|
||||
|
||||
let sni_router_listeners = {
|
||||
let args = &args.pg_sni_router;
|
||||
if args.dest.is_some() {
|
||||
ensure!(
|
||||
args.tls_key.is_some(),
|
||||
"sni-router-tls-key must be provided"
|
||||
);
|
||||
ensure!(
|
||||
args.tls_cert.is_some(),
|
||||
"sni-router-tls-cert must be provided"
|
||||
);
|
||||
|
||||
info!(
|
||||
"Starting pg-sni-router on {} and {}",
|
||||
args.listen, args.listen_tls
|
||||
);
|
||||
|
||||
Some((
|
||||
TcpListener::bind(args.listen).await?,
|
||||
TcpListener::bind(args.listen_tls).await?,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: rename the argument to something like serverless.
|
||||
// It now covers more than just websockets, it also covers SQL over HTTP.
|
||||
let serverless_listener = if let Some(serverless_address) = args.wss {
|
||||
info!("Starting wss on {serverless_address}");
|
||||
Some(TcpListener::bind(serverless_address).await?)
|
||||
} else if args.is_auth_broker {
|
||||
bail!("wss arg must be present for auth-broker")
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let cancellation_token = CancellationToken::new();
|
||||
|
||||
let redis_rps_limit = Vec::leak(args.redis_rps_limit.clone());
|
||||
RateBucketInfo::validate(redis_rps_limit)?;
|
||||
|
||||
let redis_kv_client = regional_redis_client
|
||||
.as_ref()
|
||||
.map(|redis_publisher| RedisKVClient::new(redis_publisher.clone(), redis_rps_limit));
|
||||
|
||||
// channel size should be higher than redis client limit to avoid blocking
|
||||
let cancel_ch_size = args.cancellation_ch_size;
|
||||
let (tx_cancel, rx_cancel) = tokio::sync::mpsc::channel(cancel_ch_size);
|
||||
let cancellation_handler = Arc::new(CancellationHandler::new(
|
||||
&config.connect_to_compute,
|
||||
Some(tx_cancel),
|
||||
));
|
||||
|
||||
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new_with_shards(
|
||||
RateBucketInfo::to_leaky_bucket(&args.endpoint_rps_limit)
|
||||
.unwrap_or(EndpointRateLimiter::DEFAULT),
|
||||
64,
|
||||
));
|
||||
let config: Root = toml::from_str(&tokio::fs::read_to_string("proxy.toml").await?)?;
|
||||
|
||||
// client facing tasks. these will exit on error or on cancellation
|
||||
// cancellation returns Ok(())
|
||||
let mut client_tasks = JoinSet::new();
|
||||
match auth_backend {
|
||||
Either::Left(auth_backend) => {
|
||||
if let Some(proxy_listener) = proxy_listener {
|
||||
client_tasks.spawn(crate::proxy::task_main(
|
||||
config,
|
||||
auth_backend,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(serverless_listener) = serverless_listener {
|
||||
client_tasks.spawn(serverless::task_main(
|
||||
config,
|
||||
auth_backend,
|
||||
serverless_listener,
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
Either::Right(auth_backend) => {
|
||||
if let Some(proxy_listener) = proxy_listener {
|
||||
client_tasks.spawn(crate::console_redirect_proxy::task_main(
|
||||
config,
|
||||
auth_backend,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// spawn pg-sni-router mode.
|
||||
if let Some((listen, listen_tls)) = sni_router_listeners {
|
||||
let args = args.pg_sni_router;
|
||||
let dest = args.dest.expect("already asserted it is set");
|
||||
let key_path = args.tls_key.expect("already asserted it is set");
|
||||
let cert_path = args.tls_cert.expect("already asserted it is set");
|
||||
|
||||
let tls_config = super::pg_sni_router::parse_tls(&key_path, &cert_path)?;
|
||||
|
||||
let dest = Arc::new(dest);
|
||||
|
||||
client_tasks.spawn(super::pg_sni_router::task_main(
|
||||
dest.clone(),
|
||||
tls_config.clone(),
|
||||
None,
|
||||
listen,
|
||||
cancellation_token.clone(),
|
||||
));
|
||||
|
||||
client_tasks.spawn(super::pg_sni_router::task_main(
|
||||
dest,
|
||||
tls_config,
|
||||
Some(config.connect_to_compute.tls.clone()),
|
||||
listen_tls,
|
||||
cancellation_token.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
client_tasks.spawn(crate::context::parquet::worker(
|
||||
cancellation_token.clone(),
|
||||
args.parquet_upload,
|
||||
));
|
||||
|
||||
// maintenance tasks. these never return unless there's an error
|
||||
let mut maintenance_tasks = JoinSet::new();
|
||||
maintenance_tasks.spawn(crate::signals::handle(cancellation_token.clone(), || {}));
|
||||
maintenance_tasks.spawn(http::health_server::task_main(
|
||||
http_listener,
|
||||
AppMetrics {
|
||||
jemalloc,
|
||||
neon_metrics,
|
||||
proxy: crate::metrics::Metrics::get(),
|
||||
},
|
||||
));
|
||||
maintenance_tasks.spawn(control_plane::mgmt::task_main(mgmt_listener));
|
||||
|
||||
if let Some(metrics_config) = &config.metric_collection {
|
||||
// TODO: Add gc regardles of the metric collection being enabled.
|
||||
maintenance_tasks.spawn(usage_metrics::task_main(metrics_config));
|
||||
}
|
||||
let cancellation_token = CancellationToken::new();
|
||||
|
||||
#[cfg_attr(not(any(test, feature = "testing")), expect(irrefutable_let_patterns))]
|
||||
if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend {
|
||||
if let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api {
|
||||
match (redis_notifications_client, regional_redis_client.clone()) {
|
||||
(None, None) => {}
|
||||
(client1, client2) => {
|
||||
let cache = api.caches.project_info.clone();
|
||||
if let Some(client) = client1 {
|
||||
maintenance_tasks.spawn(notifications::task_main(
|
||||
client,
|
||||
cache.clone(),
|
||||
args.region.clone(),
|
||||
));
|
||||
}
|
||||
if let Some(client) = client2 {
|
||||
maintenance_tasks.spawn(notifications::task_main(
|
||||
client,
|
||||
cache.clone(),
|
||||
args.region.clone(),
|
||||
));
|
||||
}
|
||||
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
|
||||
}
|
||||
match config.legacy {
|
||||
LegacyModes::Proxy {
|
||||
pglb,
|
||||
neonkeeper,
|
||||
http,
|
||||
pg_sni_router,
|
||||
} => {
|
||||
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new_with_shards(
|
||||
// todo: use neonkeeper config.
|
||||
EndpointRateLimiter::DEFAULT,
|
||||
64,
|
||||
));
|
||||
|
||||
info!("Starting proxy on {}", pglb.listener.addr);
|
||||
let proxy_listener = TcpListener::bind(pglb.listener.addr).await?;
|
||||
|
||||
client_tasks.spawn(crate::proxy::task_main(
|
||||
config,
|
||||
auth_backend,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
));
|
||||
|
||||
if let Some(http) = http {
|
||||
info!("Starting wss on {}", http.listener.addr);
|
||||
let http_listener = TcpListener::bind(http.listener.addr).await?;
|
||||
|
||||
client_tasks.spawn(serverless::task_main(
|
||||
config,
|
||||
auth_backend,
|
||||
http_listener,
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
));
|
||||
};
|
||||
|
||||
if let Some(redis) = neonkeeper.redis {
|
||||
let client = configure_redis(redis.auth);
|
||||
}
|
||||
|
||||
if let Some(mut redis_kv_client) = redis_kv_client {
|
||||
if let Some(sni_router) = pg_sni_router {
|
||||
let listen = TcpListener::bind(sni_router.listener.addr).await?;
|
||||
let listen_tls = TcpListener::bind(sni_router.listener_tls.addr).await?;
|
||||
|
||||
let [KeyPair { key, cert }] = sni_router
|
||||
.listener
|
||||
.certs
|
||||
.try_into()
|
||||
.map_err(|_| anyhow!("only 1 keypair is supported for pg-sni-router"))?;
|
||||
|
||||
let tls_config = super::pg_sni_router::parse_tls(&key, &cert)?;
|
||||
|
||||
let dest = Arc::new(sni_router.dest);
|
||||
|
||||
client_tasks.spawn(super::pg_sni_router::task_main(
|
||||
dest.clone(),
|
||||
tls_config.clone(),
|
||||
None,
|
||||
listen,
|
||||
cancellation_token.clone(),
|
||||
));
|
||||
|
||||
client_tasks.spawn(super::pg_sni_router::task_main(
|
||||
dest,
|
||||
tls_config,
|
||||
Some(config.connect_to_compute.tls.clone()),
|
||||
listen_tls,
|
||||
cancellation_token.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
match neonkeeper.request_log_export {
|
||||
Some(RequestLogExport::Parquet {
|
||||
location,
|
||||
disconnect,
|
||||
region,
|
||||
row_group_size,
|
||||
page_size,
|
||||
size,
|
||||
maximum_duration,
|
||||
}) => {
|
||||
client_tasks.spawn(crate::context::parquet::worker(
|
||||
cancellation_token.clone(),
|
||||
args.parquet_upload,
|
||||
args.region,
|
||||
));
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
|
||||
if let (ControlPlaneBackend::Http(api), Some(redis)) =
|
||||
(neonkeeper.cplane, neonkeeper.redis)
|
||||
{
|
||||
// project info cache and invalidation of that cache.
|
||||
let cache = api.caches.project_info.clone();
|
||||
maintenance_tasks.spawn(notifications::task_main(client.clone(), cache.clone()));
|
||||
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
|
||||
|
||||
// cancellation key management
|
||||
let mut redis_kv_client = RedisKVClient::new(client.clone());
|
||||
maintenance_tasks.spawn(async move {
|
||||
redis_kv_client.try_connect().await?;
|
||||
handle_cancel_messages(
|
||||
@@ -537,18 +698,139 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
// so let's wait forever instead.
|
||||
std::future::pending().await
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(regional_redis_client) = regional_redis_client {
|
||||
// listen for notifications of new projects/endpoints/branches
|
||||
let cache = api.caches.endpoints_cache.clone();
|
||||
let con = regional_redis_client;
|
||||
let span = tracing::info_span!("endpoints_cache");
|
||||
maintenance_tasks.spawn(
|
||||
async move { cache.do_read(con, cancellation_token.clone()).await }
|
||||
async move { cache.do_read(client, cancellation_token.clone()).await }
|
||||
.instrument(span),
|
||||
);
|
||||
}
|
||||
}
|
||||
LegacyModes::AuthBroker { neonkeeper, http } => {
|
||||
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new_with_shards(
|
||||
// todo: use neonkeeper config.
|
||||
EndpointRateLimiter::DEFAULT,
|
||||
64,
|
||||
));
|
||||
|
||||
info!("Starting wss on {}", http.listener.addr);
|
||||
let http_listener = TcpListener::bind(http.listener.addr).await?;
|
||||
|
||||
if let Some(redis) = neonkeeper.redis {
|
||||
let client = configure_redis(redis.auth);
|
||||
}
|
||||
|
||||
client_tasks.spawn(serverless::task_main(
|
||||
config,
|
||||
auth_backend,
|
||||
serverless_listener,
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
));
|
||||
|
||||
match neonkeeper.request_log_export {
|
||||
Some(RequestLogExport::Parquet {
|
||||
location,
|
||||
disconnect,
|
||||
region,
|
||||
row_group_size,
|
||||
page_size,
|
||||
size,
|
||||
maximum_duration,
|
||||
}) => {
|
||||
client_tasks.spawn(crate::context::parquet::worker(
|
||||
cancellation_token.clone(),
|
||||
args.parquet_upload,
|
||||
args.region,
|
||||
));
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
|
||||
if let (ControlPlaneBackend::Http(api), Some(redis)) =
|
||||
(neonkeeper.cplane, neonkeeper.redis)
|
||||
{
|
||||
// project info cache and invalidation of that cache.
|
||||
let cache = api.caches.project_info.clone();
|
||||
maintenance_tasks.spawn(notifications::task_main(client.clone(), cache.clone()));
|
||||
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
|
||||
|
||||
// cancellation key management
|
||||
let mut redis_kv_client = RedisKVClient::new(client.clone());
|
||||
maintenance_tasks.spawn(async move {
|
||||
redis_kv_client.try_connect().await?;
|
||||
handle_cancel_messages(
|
||||
&mut redis_kv_client,
|
||||
rx_cancel,
|
||||
args.cancellation_batch_size,
|
||||
)
|
||||
.await?;
|
||||
|
||||
drop(redis_kv_client);
|
||||
|
||||
// `handle_cancel_messages` was terminated due to the tx_cancel
|
||||
// being dropped. this is not worthy of an error, and this task can only return `Err`,
|
||||
// so let's wait forever instead.
|
||||
std::future::pending().await
|
||||
});
|
||||
|
||||
// listen for notifications of new projects/endpoints/branches
|
||||
let cache = api.caches.endpoints_cache.clone();
|
||||
let span = tracing::info_span!("endpoints_cache");
|
||||
maintenance_tasks.spawn(
|
||||
async move { cache.do_read(client, cancellation_token.clone()).await }
|
||||
.instrument(span),
|
||||
);
|
||||
}
|
||||
}
|
||||
LegacyModes::ConsoleRedirect { console_redirect } => {
|
||||
info!("Starting proxy on {}", console_redirect.listener.addr);
|
||||
let proxy_listener = TcpListener::bind(console_redirect.listener.addr).await?;
|
||||
|
||||
info!("Starting mgmt on {}", console_redirect.listener.addr);
|
||||
let mgmt_listener = TcpListener::bind(console_redirect.listener.addr).await?;
|
||||
|
||||
client_tasks.spawn(crate::console_redirect_proxy::task_main(
|
||||
config,
|
||||
auth_backend,
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
cancellation_handler.clone(),
|
||||
));
|
||||
maintenance_tasks.spawn(control_plane::mgmt::task_main(mgmt_listener));
|
||||
}
|
||||
}
|
||||
|
||||
// Check that we can bind to address before further initialization
|
||||
info!("Starting http on {}", config.introspection.listener.addr);
|
||||
let http_listener = TcpListener::bind(config.introspection.listener.addr)
|
||||
.await?
|
||||
.into_std()?;
|
||||
|
||||
// channel size should be higher than redis client limit to avoid blocking
|
||||
let cancel_ch_size = args.cancellation_ch_size;
|
||||
let (tx_cancel, rx_cancel) = tokio::sync::mpsc::channel(cancel_ch_size);
|
||||
let cancellation_handler = Arc::new(CancellationHandler::new(
|
||||
&config.connect_to_compute,
|
||||
Some(tx_cancel),
|
||||
));
|
||||
|
||||
maintenance_tasks.spawn(crate::signals::handle(cancellation_token.clone(), || {}));
|
||||
maintenance_tasks.spawn(http::health_server::task_main(
|
||||
http_listener,
|
||||
AppMetrics {
|
||||
jemalloc,
|
||||
neon_metrics,
|
||||
proxy: crate::metrics::Metrics::get(),
|
||||
},
|
||||
));
|
||||
|
||||
if let Some(metrics_config) = &config.metric_collection {
|
||||
// TODO: Add gc regardles of the metric collection being enabled.
|
||||
maintenance_tasks.spawn(usage_metrics::task_main(metrics_config));
|
||||
}
|
||||
|
||||
let maintenance = loop {
|
||||
@@ -673,7 +955,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
authentication_config,
|
||||
proxy_protocol_v2: args.proxy_protocol_v2,
|
||||
handshake_timeout: args.handshake_timeout,
|
||||
region: args.region.clone(),
|
||||
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,
|
||||
connect_compute_locks,
|
||||
connect_to_compute: compute_config,
|
||||
@@ -833,58 +1114,45 @@ fn build_auth_backend(
|
||||
}
|
||||
}
|
||||
|
||||
async fn configure_redis(
|
||||
args: &ProxyCliArgs,
|
||||
) -> anyhow::Result<(
|
||||
Option<ConnectionWithCredentialsProvider>,
|
||||
Option<ConnectionWithCredentialsProvider>,
|
||||
)> {
|
||||
// TODO: untangle the config args
|
||||
let regional_redis_client = match (args.redis_auth_type.as_str(), &args.redis_notifications) {
|
||||
("plain", redis_url) => match redis_url {
|
||||
None => {
|
||||
bail!("plain auth requires redis_notifications to be set");
|
||||
}
|
||||
Some(url) => {
|
||||
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url.clone()))
|
||||
}
|
||||
},
|
||||
("irsa", _) => match (&args.redis_host, args.redis_port) {
|
||||
(Some(host), Some(port)) => Some(
|
||||
ConnectionWithCredentialsProvider::new_with_credentials_provider(
|
||||
host.clone(),
|
||||
port,
|
||||
elasticache::CredentialsProvider::new(
|
||||
args.aws_region.clone(),
|
||||
args.redis_cluster_name.clone(),
|
||||
args.redis_user_id.clone(),
|
||||
)
|
||||
.await,
|
||||
),
|
||||
),
|
||||
(None, None) => {
|
||||
// todo: upgrade to error?
|
||||
warn!(
|
||||
"irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client"
|
||||
);
|
||||
None
|
||||
}
|
||||
_ => {
|
||||
bail!("redis-host and redis-port must be specified together");
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
bail!("unknown auth type given");
|
||||
async fn configure_redis(auth: RedisAuthentication) -> ConnectionWithCredentialsProvider {
|
||||
match auth {
|
||||
RedisAuthentication::Irsa {
|
||||
host,
|
||||
port,
|
||||
cluster_name,
|
||||
user_id,
|
||||
aws_region,
|
||||
} => ConnectionWithCredentialsProvider::new_with_credentials_provider(
|
||||
host,
|
||||
port,
|
||||
elasticache::CredentialsProvider::new(aws_region, cluster_name, user_id).await,
|
||||
),
|
||||
RedisAuthentication::Basic { url } => {
|
||||
ConnectionWithCredentialsProvider::new_with_static_credentials(url.clone())
|
||||
}
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let redis_notifications_client = if let Some(url) = &args.redis_notifications {
|
||||
Some(ConnectionWithCredentialsProvider::new_with_static_credentials(&**url))
|
||||
} else {
|
||||
regional_redis_client.clone()
|
||||
// let redis_notifications_client = if let Some(url) = &args.redis_notifications {
|
||||
// Some(ConnectionWithCredentialsProvider::new_with_static_credentials(&**url))
|
||||
// } else {
|
||||
// regional_redis_client.clone()
|
||||
// };
|
||||
|
||||
Ok(redis_client)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
Ok((regional_redis_client, redis_notifications_client))
|
||||
// let redis_notifications_client = if let Some(url) = &args.redis_notifications {
|
||||
// Some(ConnectionWithCredentialsProvider::new_with_static_credentials(&**url))
|
||||
// } else {
|
||||
// regional_redis_client.clone()
|
||||
// };
|
||||
|
||||
Ok(redis_client)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -22,7 +22,6 @@ pub struct ProxyConfig {
|
||||
pub http_config: HttpConfig,
|
||||
pub authentication_config: AuthenticationConfig,
|
||||
pub proxy_protocol_v2: ProxyProtocolV2,
|
||||
pub region: String,
|
||||
pub handshake_timeout: Duration,
|
||||
pub wake_compute_retry_config: RetryConfig,
|
||||
pub connect_compute_locks: ApiLocks<Host>,
|
||||
@@ -70,7 +69,7 @@ pub struct AuthenticationConfig {
|
||||
pub console_redirect_confirmation_timeout: tokio::time::Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
pub struct EndpointCacheConfig {
|
||||
/// Batch size to receive all endpoints on the startup.
|
||||
pub initial_batch_size: usize,
|
||||
@@ -206,7 +205,7 @@ impl FromStr for CacheOptions {
|
||||
}
|
||||
|
||||
/// Helper for cmdline cache options parsing.
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
pub struct ProjectInfoCacheOptions {
|
||||
/// Max number of entries.
|
||||
pub size: usize,
|
||||
|
||||
@@ -90,12 +90,7 @@ pub async fn task_main(
|
||||
}
|
||||
}
|
||||
|
||||
let ctx = RequestContext::new(
|
||||
session_id,
|
||||
conn_info,
|
||||
crate::metrics::Protocol::Tcp,
|
||||
&config.region,
|
||||
);
|
||||
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Tcp);
|
||||
|
||||
let res = handle_client(
|
||||
config,
|
||||
|
||||
@@ -46,7 +46,6 @@ struct RequestContextInner {
|
||||
pub(crate) session_id: Uuid,
|
||||
pub(crate) protocol: Protocol,
|
||||
first_packet: chrono::DateTime<Utc>,
|
||||
region: &'static str,
|
||||
pub(crate) span: Span,
|
||||
|
||||
// filled in as they are discovered
|
||||
@@ -94,7 +93,6 @@ impl Clone for RequestContext {
|
||||
session_id: inner.session_id,
|
||||
protocol: inner.protocol,
|
||||
first_packet: inner.first_packet,
|
||||
region: inner.region,
|
||||
span: info_span!("background_task"),
|
||||
|
||||
project: inner.project,
|
||||
@@ -124,12 +122,7 @@ impl Clone for RequestContext {
|
||||
}
|
||||
|
||||
impl RequestContext {
|
||||
pub fn new(
|
||||
session_id: Uuid,
|
||||
conn_info: ConnectionInfo,
|
||||
protocol: Protocol,
|
||||
region: &'static str,
|
||||
) -> Self {
|
||||
pub fn new(session_id: Uuid, conn_info: ConnectionInfo, protocol: Protocol) -> Self {
|
||||
// TODO: be careful with long lived spans
|
||||
let span = info_span!(
|
||||
"connect_request",
|
||||
@@ -145,7 +138,6 @@ impl RequestContext {
|
||||
session_id,
|
||||
protocol,
|
||||
first_packet: Utc::now(),
|
||||
region,
|
||||
span,
|
||||
|
||||
project: None,
|
||||
@@ -179,7 +171,7 @@ impl RequestContext {
|
||||
let ip = IpAddr::from([127, 0, 0, 1]);
|
||||
let addr = SocketAddr::new(ip, 5432);
|
||||
let conn_info = ConnectionInfo { addr, extra: None };
|
||||
RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp, "test")
|
||||
RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp)
|
||||
}
|
||||
|
||||
pub(crate) fn console_application_name(&self) -> String {
|
||||
|
||||
@@ -74,7 +74,7 @@ pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
|
||||
|
||||
#[derive(parquet_derive::ParquetRecordWriter)]
|
||||
pub(crate) struct RequestData {
|
||||
region: &'static str,
|
||||
region: String,
|
||||
protocol: &'static str,
|
||||
/// Must be UTC. The derive macro doesn't like the timezones
|
||||
timestamp: chrono::NaiveDateTime,
|
||||
@@ -147,7 +147,7 @@ impl From<&RequestContextInner> for RequestData {
|
||||
}),
|
||||
jwt_issuer: value.jwt_issuer.clone(),
|
||||
protocol: value.protocol.as_str(),
|
||||
region: value.region,
|
||||
region: String::new(),
|
||||
error: value.error_kind.as_ref().map(|e| e.to_metric_label()),
|
||||
success: value.success,
|
||||
cold_start_info: value.cold_start_info.as_str(),
|
||||
@@ -167,6 +167,7 @@ impl From<&RequestContextInner> for RequestData {
|
||||
pub async fn worker(
|
||||
cancellation_token: CancellationToken,
|
||||
config: ParquetUploadArgs,
|
||||
region: String,
|
||||
) -> anyhow::Result<()> {
|
||||
let Some(remote_storage_config) = config.parquet_upload_remote_storage else {
|
||||
tracing::warn!("parquet request upload: no s3 bucket configured");
|
||||
@@ -232,12 +233,17 @@ pub async fn worker(
|
||||
.context("remote storage for disconnect events init")?;
|
||||
let parquet_config_disconnect = parquet_config.clone();
|
||||
tokio::try_join!(
|
||||
worker_inner(storage, rx, parquet_config),
|
||||
worker_inner(storage_disconnect, rx_disconnect, parquet_config_disconnect)
|
||||
worker_inner(storage, rx, parquet_config, ®ion),
|
||||
worker_inner(
|
||||
storage_disconnect,
|
||||
rx_disconnect,
|
||||
parquet_config_disconnect,
|
||||
®ion
|
||||
)
|
||||
)
|
||||
.map(|_| ())
|
||||
} else {
|
||||
worker_inner(storage, rx, parquet_config).await
|
||||
worker_inner(storage, rx, parquet_config, ®ion).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -257,6 +263,7 @@ async fn worker_inner(
|
||||
storage: GenericRemoteStorage,
|
||||
rx: impl Stream<Item = RequestData>,
|
||||
config: ParquetConfig,
|
||||
region: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
let storage = if config.test_remote_failures > 0 {
|
||||
@@ -277,7 +284,8 @@ async fn worker_inner(
|
||||
let mut last_upload = time::Instant::now();
|
||||
|
||||
let mut len = 0;
|
||||
while let Some(row) = rx.next().await {
|
||||
while let Some(mut row) = rx.next().await {
|
||||
region.clone_into(&mut row.region);
|
||||
rows.push(row);
|
||||
let force = last_upload.elapsed() > config.max_duration;
|
||||
if rows.len() == config.rows_per_group || force {
|
||||
@@ -533,7 +541,7 @@ mod tests {
|
||||
auth_method: None,
|
||||
jwt_issuer: None,
|
||||
protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
|
||||
region: "us-east-1",
|
||||
region: String::new(),
|
||||
error: None,
|
||||
success: rng.r#gen(),
|
||||
cold_start_info: "no",
|
||||
@@ -565,7 +573,9 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
worker_inner(storage, rx, config).await.unwrap();
|
||||
worker_inner(storage, rx, config, "us-east-1")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut files = WalkDir::new(tmpdir.as_std_path())
|
||||
.into_iter()
|
||||
|
||||
@@ -134,12 +134,7 @@ pub async fn task_main(
|
||||
}
|
||||
}
|
||||
|
||||
let ctx = RequestContext::new(
|
||||
session_id,
|
||||
conn_info,
|
||||
crate::metrics::Protocol::Tcp,
|
||||
&config.region,
|
||||
);
|
||||
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Tcp);
|
||||
|
||||
let res = handle_client(
|
||||
config,
|
||||
|
||||
@@ -140,12 +140,6 @@ impl RateBucketInfo {
|
||||
Self::new(200, Duration::from_secs(600)),
|
||||
];
|
||||
|
||||
// For all the sessions will be cancel key. So this limit is essentially global proxy limit.
|
||||
pub const DEFAULT_REDIS_SET: [Self; 2] = [
|
||||
Self::new(100_000, Duration::from_secs(1)),
|
||||
Self::new(50_000, Duration::from_secs(10)),
|
||||
];
|
||||
|
||||
pub fn rps(&self) -> f64 {
|
||||
(self.max_rpi as f64) / self.interval.as_secs_f64()
|
||||
}
|
||||
|
||||
@@ -2,11 +2,9 @@ use redis::aio::ConnectionLike;
|
||||
use redis::{Cmd, FromRedisValue, Pipeline, RedisResult};
|
||||
|
||||
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
|
||||
use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
|
||||
|
||||
pub struct RedisKVClient {
|
||||
client: ConnectionWithCredentialsProvider,
|
||||
limiter: GlobalRateLimiter,
|
||||
}
|
||||
|
||||
#[allow(async_fn_in_trait)]
|
||||
@@ -27,11 +25,8 @@ impl Queryable for Cmd {
|
||||
}
|
||||
|
||||
impl RedisKVClient {
|
||||
pub fn new(client: ConnectionWithCredentialsProvider, info: &'static [RateBucketInfo]) -> Self {
|
||||
Self {
|
||||
client,
|
||||
limiter: GlobalRateLimiter::new(info.into()),
|
||||
}
|
||||
pub fn new(client: ConnectionWithCredentialsProvider) -> Self {
|
||||
Self { client }
|
||||
}
|
||||
|
||||
pub async fn try_connect(&mut self) -> anyhow::Result<()> {
|
||||
@@ -49,11 +44,6 @@ impl RedisKVClient {
|
||||
&mut self,
|
||||
q: &impl Queryable,
|
||||
) -> anyhow::Result<T> {
|
||||
if !self.limiter.check() {
|
||||
tracing::info!("Rate limit exceeded. Skipping query");
|
||||
return Err(anyhow::anyhow!("Rate limit exceeded"));
|
||||
}
|
||||
|
||||
match q.query(&mut self.client).await {
|
||||
Ok(t) => return Ok(t),
|
||||
Err(e) => {
|
||||
|
||||
@@ -141,29 +141,19 @@ where
|
||||
|
||||
struct MessageHandler<C: ProjectInfoCache + Send + Sync + 'static> {
|
||||
cache: Arc<C>,
|
||||
region_id: String,
|
||||
}
|
||||
|
||||
impl<C: ProjectInfoCache + Send + Sync + 'static> Clone for MessageHandler<C> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
cache: self.cache.clone(),
|
||||
region_id: self.region_id.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
pub(crate) fn new(cache: Arc<C>, region_id: String) -> Self {
|
||||
Self { cache, region_id }
|
||||
}
|
||||
|
||||
pub(crate) async fn increment_active_listeners(&self) {
|
||||
self.cache.increment_active_listeners().await;
|
||||
}
|
||||
|
||||
pub(crate) async fn decrement_active_listeners(&self) {
|
||||
self.cache.decrement_active_listeners().await;
|
||||
pub(crate) fn new(cache: Arc<C>) -> Self {
|
||||
Self { cache }
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, msg), fields(session_id = tracing::field::Empty))]
|
||||
@@ -276,7 +266,7 @@ async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
|
||||
}
|
||||
let mut conn = match try_connect(&redis).await {
|
||||
Ok(conn) => {
|
||||
handler.increment_active_listeners().await;
|
||||
handler.cache.increment_active_listeners().await;
|
||||
conn
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -297,11 +287,11 @@ async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
|
||||
}
|
||||
}
|
||||
if cancellation_token.is_cancelled() {
|
||||
handler.decrement_active_listeners().await;
|
||||
handler.cache.decrement_active_listeners().await;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
handler.decrement_active_listeners().await;
|
||||
handler.cache.decrement_active_listeners().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -310,12 +300,11 @@ async fn handle_messages<C: ProjectInfoCache + Send + Sync + 'static>(
|
||||
pub async fn task_main<C>(
|
||||
redis: ConnectionWithCredentialsProvider,
|
||||
cache: Arc<C>,
|
||||
region_id: String,
|
||||
) -> anyhow::Result<Infallible>
|
||||
where
|
||||
C: ProjectInfoCache + Send + Sync + 'static,
|
||||
{
|
||||
let handler = MessageHandler::new(cache, region_id);
|
||||
let handler = MessageHandler::new(cache);
|
||||
// 6h - 1m.
|
||||
// There will be 1 minute overlap between two tasks. But at least we can be sure that no message is lost.
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(6 * 60 * 60 - 60));
|
||||
|
||||
@@ -417,12 +417,7 @@ async fn request_handler(
|
||||
if config.http_config.accept_websockets
|
||||
&& framed_websockets::upgrade::is_upgrade_request(&request)
|
||||
{
|
||||
let ctx = RequestContext::new(
|
||||
session_id,
|
||||
conn_info,
|
||||
crate::metrics::Protocol::Ws,
|
||||
&config.region,
|
||||
);
|
||||
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Ws);
|
||||
|
||||
ctx.set_user_agent(
|
||||
request
|
||||
@@ -462,12 +457,7 @@ async fn request_handler(
|
||||
// Return the response so the spawned future can continue.
|
||||
Ok(response.map(|b| b.map_err(|x| match x {}).boxed()))
|
||||
} else if request.uri().path() == "/sql" && *request.method() == Method::POST {
|
||||
let ctx = RequestContext::new(
|
||||
session_id,
|
||||
conn_info,
|
||||
crate::metrics::Protocol::Http,
|
||||
&config.region,
|
||||
);
|
||||
let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Http);
|
||||
let span = ctx.span();
|
||||
|
||||
let testodrome_id = request
|
||||
|
||||
Reference in New Issue
Block a user