mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 09:30:37 +00:00
proxy: clippy lints: handle some low hanging fruit (#8829)
Should be mostly uncontroversial ones.
This commit is contained in:
@@ -85,7 +85,7 @@ pub trait TestBackend: Send + Sync + 'static {
|
||||
impl std::fmt::Display for BackendType<'_, (), ()> {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Console(api, _) => match &**api {
|
||||
Self::Console(api, ()) => match &**api {
|
||||
ConsoleBackend::Console(endpoint) => {
|
||||
fmt.debug_tuple("Console").field(&endpoint.url()).finish()
|
||||
}
|
||||
@@ -96,7 +96,7 @@ impl std::fmt::Display for BackendType<'_, (), ()> {
|
||||
#[cfg(test)]
|
||||
ConsoleBackend::Test(_) => fmt.debug_tuple("Test").finish(),
|
||||
},
|
||||
Self::Link(url, _) => fmt.debug_tuple("Link").field(&url.as_str()).finish(),
|
||||
Self::Link(url, ()) => fmt.debug_tuple("Link").field(&url.as_str()).finish(),
|
||||
Self::Local(_) => fmt.debug_tuple("Local").finish(),
|
||||
}
|
||||
}
|
||||
@@ -324,21 +324,20 @@ async fn auth_quirks(
|
||||
};
|
||||
let (cached_entry, secret) = cached_secret.take_value();
|
||||
|
||||
let secret = match secret {
|
||||
Some(secret) => config.check_rate_limit(
|
||||
let secret = if let Some(secret) = secret {
|
||||
config.check_rate_limit(
|
||||
ctx,
|
||||
config,
|
||||
secret,
|
||||
&info.endpoint,
|
||||
unauthenticated_password.is_some() || allow_cleartext,
|
||||
)?,
|
||||
None => {
|
||||
// If we don't have an authentication secret, we mock one to
|
||||
// prevent malicious probing (possible due to missing protocol steps).
|
||||
// This mocked secret will never lead to successful authentication.
|
||||
info!("authentication info not found, mocking it");
|
||||
AuthSecret::Scram(scram::ServerSecret::mock(rand::random()))
|
||||
}
|
||||
)?
|
||||
} else {
|
||||
// If we don't have an authentication secret, we mock one to
|
||||
// prevent malicious probing (possible due to missing protocol steps).
|
||||
// This mocked secret will never lead to successful authentication.
|
||||
info!("authentication info not found, mocking it");
|
||||
AuthSecret::Scram(scram::ServerSecret::mock(rand::random()))
|
||||
};
|
||||
|
||||
match authenticate_with_secret(
|
||||
@@ -409,7 +408,7 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint, &()> {
|
||||
pub fn get_endpoint(&self) -> Option<EndpointId> {
|
||||
match self {
|
||||
Self::Console(_, user_info) => user_info.endpoint_id.clone(),
|
||||
Self::Link(_, _) => Some("link".into()),
|
||||
Self::Link(_, ()) => Some("link".into()),
|
||||
Self::Local(_) => Some("local".into()),
|
||||
}
|
||||
}
|
||||
@@ -418,7 +417,7 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint, &()> {
|
||||
pub fn get_user(&self) -> &str {
|
||||
match self {
|
||||
Self::Console(_, user_info) => &user_info.user,
|
||||
Self::Link(_, _) => "link",
|
||||
Self::Link(_, ()) => "link",
|
||||
Self::Local(_) => "local",
|
||||
}
|
||||
}
|
||||
@@ -454,7 +453,7 @@ impl<'a> BackendType<'a, ComputeUserInfoMaybeEndpoint, &()> {
|
||||
BackendType::Console(api, credentials)
|
||||
}
|
||||
// NOTE: this auth backend doesn't use client credentials.
|
||||
Self::Link(url, _) => {
|
||||
Self::Link(url, ()) => {
|
||||
info!("performing link authentication");
|
||||
|
||||
let info = link::authenticate(ctx, &url, client).await?;
|
||||
@@ -478,7 +477,7 @@ impl BackendType<'_, ComputeUserInfo, &()> {
|
||||
) -> Result<CachedRoleSecret, GetAuthInfoError> {
|
||||
match self {
|
||||
Self::Console(api, user_info) => api.get_role_secret(ctx, user_info).await,
|
||||
Self::Link(_, _) => Ok(Cached::new_uncached(None)),
|
||||
Self::Link(_, ()) => Ok(Cached::new_uncached(None)),
|
||||
Self::Local(_) => Ok(Cached::new_uncached(None)),
|
||||
}
|
||||
}
|
||||
@@ -489,7 +488,7 @@ impl BackendType<'_, ComputeUserInfo, &()> {
|
||||
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
|
||||
match self {
|
||||
Self::Console(api, user_info) => api.get_allowed_ips_and_secret(ctx, user_info).await,
|
||||
Self::Link(_, _) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
|
||||
Self::Link(_, ()) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
|
||||
Self::Local(_) => Ok((Cached::new_uncached(Arc::new(vec![])), None)),
|
||||
}
|
||||
}
|
||||
@@ -525,7 +524,7 @@ impl ComputeConnectBackend for BackendType<'_, ComputeCredentials, &()> {
|
||||
) -> Result<CachedNodeInfo, console::errors::WakeComputeError> {
|
||||
match self {
|
||||
Self::Console(api, creds) => api.wake_compute(ctx, &creds.info).await,
|
||||
Self::Link(_, _) => unreachable!("link auth flow doesn't support waking the compute"),
|
||||
Self::Link(_, ()) => unreachable!("link auth flow doesn't support waking the compute"),
|
||||
Self::Local(local) => Ok(Cached::new_uncached(local.node_info.clone())),
|
||||
}
|
||||
}
|
||||
@@ -533,7 +532,7 @@ impl ComputeConnectBackend for BackendType<'_, ComputeCredentials, &()> {
|
||||
fn get_keys(&self) -> &ComputeCredentialKeys {
|
||||
match self {
|
||||
Self::Console(_, creds) => &creds.keys,
|
||||
Self::Link(_, _) => &ComputeCredentialKeys::None,
|
||||
Self::Link(_, ()) => &ComputeCredentialKeys::None,
|
||||
Self::Local(_) => &ComputeCredentialKeys::None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -224,10 +224,10 @@ impl JwkCacheEntryLock {
|
||||
// where Signature = alg(<B64(Header)> || . || <B64(Payload)>);
|
||||
|
||||
let (header_payload, signature) = jwt
|
||||
.rsplit_once(".")
|
||||
.rsplit_once('.')
|
||||
.context("Provided authentication token is not a valid JWT encoding")?;
|
||||
let (header, payload) = header_payload
|
||||
.split_once(".")
|
||||
.split_once('.')
|
||||
.context("Provided authentication token is not a valid JWT encoding")?;
|
||||
|
||||
let header = base64::decode_config(header, base64::URL_SAFE_NO_PAD)
|
||||
@@ -320,14 +320,11 @@ impl JwkCache {
|
||||
// try with just a read lock first
|
||||
let key = (endpoint, role_name.clone());
|
||||
let entry = self.map.get(&key).as_deref().map(Arc::clone);
|
||||
let entry = match entry {
|
||||
Some(entry) => entry,
|
||||
None => {
|
||||
// acquire a write lock after to insert.
|
||||
let entry = self.map.entry(key).or_default();
|
||||
Arc::clone(&*entry)
|
||||
}
|
||||
};
|
||||
let entry = entry.unwrap_or_else(|| {
|
||||
// acquire a write lock after to insert.
|
||||
let entry = self.map.entry(key).or_default();
|
||||
Arc::clone(&*entry)
|
||||
});
|
||||
|
||||
entry
|
||||
.check_jwt(ctx, jwt, &self.client, role_name, fetch)
|
||||
|
||||
@@ -130,9 +130,12 @@ impl ComputeUserInfoMaybeEndpoint {
|
||||
}))
|
||||
}
|
||||
// Invariant: project name may not contain certain characters.
|
||||
(a, b) => a.or(b).map(|name| match project_name_valid(name.as_ref()) {
|
||||
false => Err(ComputeUserInfoParseError::MalformedProjectName(name)),
|
||||
true => Ok(name),
|
||||
(a, b) => a.or(b).map(|name| {
|
||||
if project_name_valid(name.as_ref()) {
|
||||
Ok(name)
|
||||
} else {
|
||||
Err(ComputeUserInfoParseError::MalformedProjectName(name))
|
||||
}
|
||||
}),
|
||||
}
|
||||
.transpose()?;
|
||||
|
||||
8
proxy/src/cache/project_info.rs
vendored
8
proxy/src/cache/project_info.rs
vendored
@@ -274,13 +274,13 @@ impl ProjectInfoCacheImpl {
|
||||
let ttl_disabled_since_us = self
|
||||
.ttl_disabled_since_us
|
||||
.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let ignore_cache_since = if ttl_disabled_since_us != u64::MAX {
|
||||
let ignore_cache_since = if ttl_disabled_since_us == u64::MAX {
|
||||
None
|
||||
} else {
|
||||
let ignore_cache_since = self.start_time + Duration::from_micros(ttl_disabled_since_us);
|
||||
// We are fine if entry is not older than ttl or was added before we are getting notifications.
|
||||
valid_since = valid_since.min(ignore_cache_since);
|
||||
Some(ignore_cache_since)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
(valid_since, ignore_cache_since)
|
||||
}
|
||||
@@ -306,7 +306,7 @@ impl ProjectInfoCacheImpl {
|
||||
let mut removed = 0;
|
||||
let shard = self.project2ep.shards()[shard].write();
|
||||
for (_, endpoints) in shard.iter() {
|
||||
for endpoint in endpoints.get().iter() {
|
||||
for endpoint in endpoints.get() {
|
||||
self.cache.remove(endpoint);
|
||||
removed += 1;
|
||||
}
|
||||
|
||||
@@ -220,7 +220,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn cancel_session_noop_regression() {
|
||||
let handler = CancellationHandler::<()>::new(Default::default(), CancellationSource::Local);
|
||||
let handler =
|
||||
CancellationHandler::<()>::new(CancelMap::default(), CancellationSource::Local);
|
||||
handler
|
||||
.cancel_session(
|
||||
CancelKeyData {
|
||||
|
||||
@@ -286,7 +286,7 @@ impl ConnCfg {
|
||||
|
||||
let client_config = if allow_self_signed_compute {
|
||||
// Allow all certificates for creating the connection
|
||||
let verifier = Arc::new(AcceptEverythingVerifier) as Arc<dyn ServerCertVerifier>;
|
||||
let verifier = Arc::new(AcceptEverythingVerifier);
|
||||
rustls::ClientConfig::builder()
|
||||
.dangerous()
|
||||
.with_custom_certificate_verifier(verifier)
|
||||
|
||||
@@ -318,7 +318,7 @@ impl CertResolver {
|
||||
// a) Instead of multi-cert approach use single cert with extra
|
||||
// domains listed in Subject Alternative Name (SAN).
|
||||
// b) Deploy separate proxy instances for extra domains.
|
||||
self.default.as_ref().cloned()
|
||||
self.default.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,7 +64,7 @@ impl Api {
|
||||
tokio_postgres::connect(self.endpoint.as_str(), tokio_postgres::NoTls).await?;
|
||||
|
||||
tokio::spawn(connection);
|
||||
let secret = match get_execute_postgres_query(
|
||||
let secret = if let Some(entry) = get_execute_postgres_query(
|
||||
&client,
|
||||
"select rolpassword from pg_catalog.pg_authid where rolname = $1",
|
||||
&[&&*user_info.user],
|
||||
@@ -72,15 +72,12 @@ impl Api {
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Some(entry) => {
|
||||
info!("got a secret: {entry}"); // safe since it's not a prod scenario
|
||||
let secret = scram::ServerSecret::parse(&entry).map(AuthSecret::Scram);
|
||||
secret.or_else(|| parse_md5(&entry).map(AuthSecret::Md5))
|
||||
}
|
||||
None => {
|
||||
warn!("user '{}' does not exist", user_info.user);
|
||||
None
|
||||
}
|
||||
info!("got a secret: {entry}"); // safe since it's not a prod scenario
|
||||
let secret = scram::ServerSecret::parse(&entry).map(AuthSecret::Scram);
|
||||
secret.or_else(|| parse_md5(&entry).map(AuthSecret::Md5))
|
||||
} else {
|
||||
warn!("user '{}' does not exist", user_info.user);
|
||||
None
|
||||
};
|
||||
let allowed_ips = match get_execute_postgres_query(
|
||||
&client,
|
||||
@@ -142,12 +139,11 @@ async fn get_execute_postgres_query(
|
||||
let rows = client.query(query, params).await?;
|
||||
|
||||
// We can get at most one row, because `rolname` is unique.
|
||||
let row = match rows.first() {
|
||||
Some(row) => row,
|
||||
let Some(row) = rows.first() else {
|
||||
// This means that the user doesn't exist, so there can be no secret.
|
||||
// However, this is still a *valid* outcome which is very similar
|
||||
// to getting `404 Not found` from the Neon console.
|
||||
None => return Ok(None),
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let entry = row.try_get(idx).map_err(MockApiError::PasswordNotSet)?;
|
||||
|
||||
@@ -38,9 +38,9 @@ impl Api {
|
||||
locks: &'static ApiLocks<EndpointCacheKey>,
|
||||
wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
|
||||
) -> Self {
|
||||
let jwt: String = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
|
||||
let jwt = match std::env::var("NEON_PROXY_TO_CONTROLPLANE_TOKEN") {
|
||||
Ok(v) => v,
|
||||
Err(_) => "".to_string(),
|
||||
Err(_) => String::new(),
|
||||
};
|
||||
Self {
|
||||
endpoint,
|
||||
@@ -96,10 +96,10 @@ impl Api {
|
||||
// Error 404 is special: it's ok not to have a secret.
|
||||
// TODO(anna): retry
|
||||
Err(e) => {
|
||||
if e.get_reason().is_not_found() {
|
||||
return Ok(AuthInfo::default());
|
||||
return if e.get_reason().is_not_found() {
|
||||
Ok(AuthInfo::default())
|
||||
} else {
|
||||
return Err(e.into());
|
||||
Err(e.into())
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// https://rust-lang.github.io/rust-clippy/master/index.html#?groups=restriction
|
||||
#![warn(
|
||||
clippy::undocumented_unsafe_blocks,
|
||||
// TODO: Enable once all individual checks are enabled.
|
||||
//clippy::as_conversions,
|
||||
clippy::dbg_macro,
|
||||
clippy::empty_enum_variants_with_brackets,
|
||||
clippy::exit,
|
||||
@@ -31,8 +33,15 @@
|
||||
)]
|
||||
// List of permanently allowed lints.
|
||||
#![allow(
|
||||
// It's ok to cast u8 to bool, etc.
|
||||
// It's ok to cast bool to u8, etc.
|
||||
clippy::cast_lossless,
|
||||
// Seems unavoidable.
|
||||
clippy::multiple_crate_versions,
|
||||
// While #[must_use] is a great feature this check is too noisy.
|
||||
clippy::must_use_candidate,
|
||||
// Inline consts, structs, fns, imports, etc. are ok if they're used by
|
||||
// the following statement(s).
|
||||
clippy::items_after_statements,
|
||||
)]
|
||||
// List of temporarily allowed lints.
|
||||
// TODO: Switch to except() once stable with 1.81.
|
||||
@@ -43,46 +52,26 @@
|
||||
clippy::cast_possible_wrap,
|
||||
clippy::cast_precision_loss,
|
||||
clippy::cast_sign_loss,
|
||||
clippy::default_trait_access,
|
||||
clippy::doc_markdown,
|
||||
clippy::explicit_iter_loop,
|
||||
clippy::float_cmp,
|
||||
clippy::if_not_else,
|
||||
clippy::ignored_unit_patterns,
|
||||
clippy::implicit_hasher,
|
||||
clippy::inconsistent_struct_constructor,
|
||||
clippy::inline_always,
|
||||
clippy::items_after_statements,
|
||||
clippy::manual_assert,
|
||||
clippy::manual_let_else,
|
||||
clippy::manual_string_new,
|
||||
clippy::match_bool,
|
||||
clippy::match_same_arms,
|
||||
clippy::match_wild_err_arm,
|
||||
clippy::missing_errors_doc,
|
||||
clippy::missing_panics_doc,
|
||||
clippy::module_name_repetitions,
|
||||
clippy::multiple_crate_versions,
|
||||
clippy::must_use_candidate,
|
||||
clippy::needless_for_each,
|
||||
clippy::needless_pass_by_value,
|
||||
clippy::needless_raw_string_hashes,
|
||||
clippy::option_as_ref_cloned,
|
||||
clippy::redundant_closure_for_method_calls,
|
||||
clippy::redundant_else,
|
||||
clippy::return_self_not_must_use,
|
||||
clippy::similar_names,
|
||||
clippy::single_char_pattern,
|
||||
clippy::single_match_else,
|
||||
clippy::struct_excessive_bools,
|
||||
clippy::struct_field_names,
|
||||
clippy::too_many_lines,
|
||||
clippy::uninlined_format_args,
|
||||
clippy::unnested_or_patterns,
|
||||
clippy::unreadable_literal,
|
||||
clippy::unused_async,
|
||||
clippy::unused_self,
|
||||
clippy::used_underscore_binding,
|
||||
clippy::wildcard_imports
|
||||
)]
|
||||
// List of temporarily allowed lints to unblock beta/nightly.
|
||||
|
||||
@@ -254,7 +254,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
|
||||
let metrics = &Metrics::get().proxy;
|
||||
let proto = ctx.protocol();
|
||||
let _request_gauge = metrics.connection_requests.guard(proto);
|
||||
let request_gauge = metrics.connection_requests.guard(proto);
|
||||
|
||||
let tls = config.tls_config.as_ref();
|
||||
|
||||
@@ -283,7 +283,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let result = config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
.map(|_| auth::ComputeUserInfoMaybeEndpoint::parse(ctx, ¶ms, hostname, common_names))
|
||||
.map(|()| auth::ComputeUserInfoMaybeEndpoint::parse(ctx, ¶ms, hostname, common_names))
|
||||
.transpose();
|
||||
|
||||
let user_info = match result {
|
||||
@@ -340,7 +340,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
client: stream,
|
||||
aux: node.aux.clone(),
|
||||
compute: node,
|
||||
req: _request_gauge,
|
||||
req: request_gauge,
|
||||
conn: conn_gauge,
|
||||
cancel: session,
|
||||
}))
|
||||
|
||||
@@ -30,9 +30,10 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> NodeInfo {
|
||||
if is_cached {
|
||||
warn!("invalidating stalled compute node info cache entry");
|
||||
}
|
||||
let label = match is_cached {
|
||||
true => ConnectionFailureKind::ComputeCached,
|
||||
false => ConnectionFailureKind::ComputeUncached,
|
||||
let label = if is_cached {
|
||||
ConnectionFailureKind::ComputeCached
|
||||
} else {
|
||||
ConnectionFailureKind::ComputeUncached
|
||||
};
|
||||
Metrics::get().proxy.connection_failures_total.inc(label);
|
||||
|
||||
|
||||
@@ -230,11 +230,10 @@ impl CopyBuffer {
|
||||
io::ErrorKind::WriteZero,
|
||||
"write zero byte into writer",
|
||||
))));
|
||||
} else {
|
||||
self.pos += i;
|
||||
self.amt += i as u64;
|
||||
self.need_flush = true;
|
||||
}
|
||||
self.pos += i;
|
||||
self.amt += i as u64;
|
||||
self.need_flush = true;
|
||||
}
|
||||
|
||||
// If pos larger than cap, this loop will never stop.
|
||||
|
||||
@@ -433,7 +433,7 @@ impl ReportableError for TestConnectError {
|
||||
|
||||
impl std::fmt::Display for TestConnectError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
write!(f, "{self:?}")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -475,7 +475,7 @@ impl ConnectMechanism for TestConnectMechanism {
|
||||
retryable: false,
|
||||
kind: ErrorKind::Compute,
|
||||
}),
|
||||
x => panic!("expecting action {:?}, connect is called instead", x),
|
||||
x => panic!("expecting action {x:?}, connect is called instead"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -515,7 +515,7 @@ impl TestBackend for TestConnectMechanism {
|
||||
assert!(err.could_retry());
|
||||
Err(console::errors::WakeComputeError::ApiError(err))
|
||||
}
|
||||
x => panic!("expecting action {:?}, wake_compute is called instead", x),
|
||||
x => panic!("expecting action {x:?}, wake_compute is called instead"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -115,9 +115,7 @@ where
|
||||
let mut buf = [0];
|
||||
stream.read_exact(&mut buf).await.unwrap();
|
||||
|
||||
if buf[0] != b'S' {
|
||||
panic!("ssl not supported by server");
|
||||
}
|
||||
assert!(buf[0] == b'S', "ssl not supported by server");
|
||||
|
||||
tls.connect(stream).await.unwrap()
|
||||
}
|
||||
|
||||
@@ -119,6 +119,7 @@ impl Default for LeakyBucketState {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::float_cmp)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
|
||||
@@ -174,9 +174,8 @@ impl DynamicLimiter {
|
||||
let mut inner = self.inner.lock();
|
||||
if inner.take(&self.ready).is_some() {
|
||||
break Ok(Token::new(self.clone()));
|
||||
} else {
|
||||
notified.set(self.ready.notified());
|
||||
}
|
||||
notified.set(self.ready.notified());
|
||||
}
|
||||
notified.as_mut().await;
|
||||
ready = true;
|
||||
|
||||
@@ -150,7 +150,7 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
Notification::AllowedIpsUpdate { .. } | Notification::PasswordUpdate { .. } => {
|
||||
invalidate_cache(self.cache.clone(), msg.clone());
|
||||
if matches!(msg, Notification::AllowedIpsUpdate { .. }) {
|
||||
Metrics::get()
|
||||
|
||||
@@ -89,7 +89,7 @@ impl<'a> ClientFirstMessage<'a> {
|
||||
write!(&mut message, "r={}", self.nonce).unwrap();
|
||||
base64::encode_config_buf(nonce, base64::STANDARD, &mut message);
|
||||
let combined_nonce = 2..message.len();
|
||||
write!(&mut message, ",s={},i={}", salt_base64, iterations).unwrap();
|
||||
write!(&mut message, ",s={salt_base64},i={iterations}").unwrap();
|
||||
|
||||
// This design guarantees that it's impossible to create a
|
||||
// server-first-message without receiving a client-first-message
|
||||
|
||||
@@ -82,13 +82,7 @@ mod tests {
|
||||
let stored_key = "D5h6KTMBlUvDJk2Y8ELfC1Sjtc6k9YHjRyuRZyBNJns=";
|
||||
let server_key = "Pi3QHbcluX//NDfVkKlFl88GGzlJ5LkyPwcdlN/QBvI=";
|
||||
|
||||
let secret = format!(
|
||||
"SCRAM-SHA-256${iterations}:{salt}${stored_key}:{server_key}",
|
||||
iterations = iterations,
|
||||
salt = salt,
|
||||
stored_key = stored_key,
|
||||
server_key = server_key,
|
||||
);
|
||||
let secret = format!("SCRAM-SHA-256${iterations}:{salt}${stored_key}:{server_key}");
|
||||
|
||||
let parsed = ServerSecret::parse(&secret).unwrap();
|
||||
assert_eq!(parsed.iterations, iterations);
|
||||
|
||||
@@ -222,12 +222,11 @@ fn thread_rt(pool: Arc<ThreadPool>, worker: Worker<JobSpec>, index: usize) {
|
||||
}
|
||||
|
||||
for i in 0.. {
|
||||
let mut job = match worker
|
||||
let Some(mut job) = worker
|
||||
.pop()
|
||||
.or_else(|| pool.steal(&mut rng, index, &worker))
|
||||
{
|
||||
Some(job) => job,
|
||||
None => continue 'wait,
|
||||
else {
|
||||
continue 'wait;
|
||||
};
|
||||
|
||||
pool.metrics
|
||||
|
||||
@@ -93,11 +93,11 @@ pub async fn task_main(
|
||||
let mut tls_server_config = rustls::ServerConfig::clone(&config.to_server_config());
|
||||
// prefer http2, but support http/1.1
|
||||
tls_server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
||||
Arc::new(tls_server_config) as Arc<_>
|
||||
Arc::new(tls_server_config)
|
||||
}
|
||||
None => {
|
||||
warn!("TLS config is missing");
|
||||
Arc::new(NoTls) as Arc<_>
|
||||
Arc::new(NoTls)
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -44,7 +44,11 @@ impl PoolingBackend {
|
||||
password: &[u8],
|
||||
) -> Result<ComputeCredentials, AuthError> {
|
||||
let user_info = user_info.clone();
|
||||
let backend = self.config.auth_backend.as_ref().map(|_| user_info.clone());
|
||||
let backend = self
|
||||
.config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
.map(|()| user_info.clone());
|
||||
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
|
||||
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
|
||||
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
|
||||
@@ -101,10 +105,10 @@ impl PoolingBackend {
|
||||
jwt: &str,
|
||||
) -> Result<ComputeCredentials, AuthError> {
|
||||
match &self.config.auth_backend {
|
||||
crate::auth::BackendType::Console(_, _) => {
|
||||
crate::auth::BackendType::Console(_, ()) => {
|
||||
Err(AuthError::auth_failed("JWT login is not yet supported"))
|
||||
}
|
||||
crate::auth::BackendType::Link(_, _) => Err(AuthError::auth_failed(
|
||||
crate::auth::BackendType::Link(_, ()) => Err(AuthError::auth_failed(
|
||||
"JWT login over link proxy is not supported",
|
||||
)),
|
||||
crate::auth::BackendType::Local(cache) => {
|
||||
@@ -138,12 +142,12 @@ impl PoolingBackend {
|
||||
keys: ComputeCredentials,
|
||||
force_new: bool,
|
||||
) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
|
||||
let maybe_client = if !force_new {
|
||||
info!("pool: looking for an existing connection");
|
||||
self.pool.get(ctx, &conn_info)?
|
||||
} else {
|
||||
let maybe_client = if force_new {
|
||||
info!("pool: pool is disabled");
|
||||
None
|
||||
} else {
|
||||
info!("pool: looking for an existing connection");
|
||||
self.pool.get(ctx, &conn_info)?
|
||||
};
|
||||
|
||||
if let Some(client) = maybe_client {
|
||||
@@ -152,7 +156,7 @@ impl PoolingBackend {
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
tracing::Span::current().record("conn_id", display(conn_id));
|
||||
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
let backend = self.config.auth_backend.as_ref().map(|_| keys);
|
||||
let backend = self.config.auth_backend.as_ref().map(|()| keys);
|
||||
crate::proxy::connect_compute::connect_to_compute(
|
||||
ctx,
|
||||
&TokioMechanism {
|
||||
|
||||
@@ -339,9 +339,9 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
} = pool.get_mut();
|
||||
|
||||
// ensure that closed clients are removed
|
||||
pools.iter_mut().for_each(|(_, db_pool)| {
|
||||
for db_pool in pools.values_mut() {
|
||||
clients_removed += db_pool.clear_closed_clients(total_conns);
|
||||
});
|
||||
}
|
||||
|
||||
// we only remove this pool if it has no active connections
|
||||
if *total_conns == 0 {
|
||||
@@ -405,21 +405,20 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
if client.is_closed() {
|
||||
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
|
||||
return Ok(None);
|
||||
} else {
|
||||
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
|
||||
tracing::Span::current().record(
|
||||
"pid",
|
||||
tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
info!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
client.session.send(ctx.session_id())?;
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
ctx.success();
|
||||
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
|
||||
}
|
||||
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
|
||||
tracing::Span::current().record(
|
||||
"pid",
|
||||
tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
info!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
client.session.send(ctx.session_id())?;
|
||||
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
|
||||
ctx.success();
|
||||
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
@@ -660,7 +659,7 @@ impl<C: ClientInnerExt> Client<C> {
|
||||
span: _,
|
||||
} = self;
|
||||
let inner = inner.as_mut().expect("client inner should not be removed");
|
||||
(&mut inner.inner, Discard { pool, conn_info })
|
||||
(&mut inner.inner, Discard { conn_info, pool })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -722,7 +721,9 @@ impl<C: ClientInnerExt> Drop for Client<C> {
|
||||
mod tests {
|
||||
use std::{mem, sync::atomic::AtomicBool};
|
||||
|
||||
use crate::{serverless::cancel_set::CancelSet, BranchId, EndpointId, ProjectId};
|
||||
use crate::{
|
||||
proxy::NeonOptions, serverless::cancel_set::CancelSet, BranchId, EndpointId, ProjectId,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -781,7 +782,7 @@ mod tests {
|
||||
user_info: ComputeUserInfo {
|
||||
user: "user".into(),
|
||||
endpoint: "endpoint".into(),
|
||||
options: Default::default(),
|
||||
options: NeonOptions::default(),
|
||||
},
|
||||
dbname: "dbname".into(),
|
||||
auth: AuthData::Password("password".as_bytes().into()),
|
||||
@@ -839,7 +840,7 @@ mod tests {
|
||||
user_info: ComputeUserInfo {
|
||||
user: "user".into(),
|
||||
endpoint: "endpoint-2".into(),
|
||||
options: Default::default(),
|
||||
options: NeonOptions::default(),
|
||||
},
|
||||
dbname: "dbname".into(),
|
||||
auth: AuthData::Password("password".as_bytes().into()),
|
||||
|
||||
@@ -55,7 +55,7 @@ fn json_array_to_pg_array(value: &Value) -> Option<String> {
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
|
||||
Some(format!("{{{}}}", vals))
|
||||
Some(format!("{{{vals}}}"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,12 +207,12 @@ fn get_conn_info(
|
||||
.ok_or(ConnInfoError::MalformedEndpoint)?
|
||||
} else {
|
||||
hostname
|
||||
.split_once(".")
|
||||
.split_once('.')
|
||||
.map_or(hostname, |(prefix, _)| prefix)
|
||||
.into()
|
||||
}
|
||||
}
|
||||
Some(url::Host::Ipv4(_)) | Some(url::Host::Ipv6(_)) | None => {
|
||||
Some(url::Host::Ipv4(_) | url::Host::Ipv6(_)) | None => {
|
||||
return Err(ConnInfoError::MissingHostname)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -67,7 +67,7 @@ impl<S: AsyncRead + Unpin> PqStream<S> {
|
||||
FeMessage::PasswordMessage(msg) => Ok(msg),
|
||||
bad => Err(io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!("unexpected message type: {:?}", bad),
|
||||
format!("unexpected message type: {bad:?}"),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -450,12 +450,9 @@ async fn upload_events_chunk(
|
||||
remote_path: &RemotePath,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let storage = match storage {
|
||||
Some(storage) => storage,
|
||||
None => {
|
||||
error!("no remote storage configured");
|
||||
return Ok(());
|
||||
}
|
||||
let Some(storage) = storage else {
|
||||
error!("no remote storage configured");
|
||||
return Ok(());
|
||||
};
|
||||
let data = serde_json::to_vec(&chunk).context("serialize metrics")?;
|
||||
let mut encoder = GzipEncoder::new(Vec::new());
|
||||
|
||||
@@ -31,7 +31,7 @@ pub struct Waiters<T>(pub(self) Mutex<HashMap<String, oneshot::Sender<T>>>);
|
||||
|
||||
impl<T> Default for Waiters<T> {
|
||||
fn default() -> Self {
|
||||
Waiters(Default::default())
|
||||
Waiters(Mutex::default())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user