From a8f3540f3d35a701239583389474904444393785 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Fri, 28 Jul 2023 19:10:55 -0400 Subject: [PATCH] proxy: add unit test for wake_compute (#4819) ## Problem ref https://github.com/neondatabase/neon/pull/4721, ref https://github.com/neondatabase/neon/issues/4709 ## Summary of changes This PR adds unit tests for wake_compute. The patch adds a new variant `Test` to auth backends. When `wake_compute` is called, we will verify if it is the exact operation sequence we are expecting. The operation sequence now contains 3 more operations: `Wake`, `WakeRetry`, and `WakeFail`. The unit tests for proxy connects are now complete and I'll continue work on WebSocket e2e test in future PRs. --------- Signed-off-by: Alex Chi Z --- proxy/src/auth/backend.rs | 15 +++++ proxy/src/proxy.rs | 35 ++++++----- proxy/src/proxy/tests.rs | 120 +++++++++++++++++++++++++++++++------- 3 files changed, 134 insertions(+), 36 deletions(-) diff --git a/proxy/src/auth/backend.rs b/proxy/src/auth/backend.rs index 9322e4f9ff..ff73f2b625 100644 --- a/proxy/src/auth/backend.rs +++ b/proxy/src/auth/backend.rs @@ -53,6 +53,12 @@ pub enum BackendType<'a, T> { Postgres(Cow<'a, console::provider::mock::Api>, T), /// Authentication via a web browser. Link(Cow<'a, url::ApiUrl>), + /// Test backend. + Test(&'a dyn TestBackend), +} + +pub trait TestBackend: Send + Sync + 'static { + fn wake_compute(&self) -> Result; } impl std::fmt::Display for BackendType<'_, ()> { @@ -62,6 +68,7 @@ impl std::fmt::Display for BackendType<'_, ()> { Console(endpoint, _) => fmt.debug_tuple("Console").field(&endpoint.url()).finish(), Postgres(endpoint, _) => fmt.debug_tuple("Postgres").field(&endpoint.url()).finish(), Link(url) => fmt.debug_tuple("Link").field(&url.as_str()).finish(), + Test(_) => fmt.debug_tuple("Test").finish(), } } } @@ -75,6 +82,7 @@ impl BackendType<'_, T> { Console(c, x) => Console(Cow::Borrowed(c), x), Postgres(c, x) => Postgres(Cow::Borrowed(c), x), Link(c) => Link(Cow::Borrowed(c)), + Test(x) => Test(*x), } } } @@ -89,6 +97,7 @@ impl<'a, T> BackendType<'a, T> { Console(c, x) => Console(c, f(x)), Postgres(c, x) => Postgres(c, f(x)), Link(c) => Link(c), + Test(x) => Test(x), } } } @@ -102,6 +111,7 @@ impl<'a, T, E> BackendType<'a, Result> { Console(c, x) => x.map(|x| Console(c, x)), Postgres(c, x) => x.map(|x| Postgres(c, x)), Link(c) => Ok(Link(c)), + Test(x) => Ok(Test(x)), } } } @@ -147,6 +157,7 @@ impl BackendType<'_, ClientCredentials<'_>> { Console(_, creds) => creds.project.clone(), Postgres(_, creds) => creds.project.clone(), Link(_) => Some("link".to_owned()), + Test(_) => Some("test".to_owned()), } } /// Authenticate the client via the requested backend, possibly using credentials. @@ -188,6 +199,9 @@ impl BackendType<'_, ClientCredentials<'_>> { .await? .map(CachedNodeInfo::new_uncached) } + Test(_) => { + unreachable!("this function should never be called in the test backend") + } }; info!("user successfully authenticated"); @@ -206,6 +220,7 @@ impl BackendType<'_, ClientCredentials<'_>> { Console(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await, Postgres(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await, Link(_) => Ok(None), + Test(x) => x.wake_compute().map(Some), } } } diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index b0c5a41efb..6a0a65578c 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -6,7 +6,7 @@ use crate::{ cancellation::{self, CancelMap}, compute::{self, PostgresConnection}, config::{ProxyConfig, TlsConfig}, - console::{self, errors::WakeComputeError, messages::MetricsAuxInfo}, + console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api}, stream::{PqStream, Stream}, }; use anyhow::{bail, Context}; @@ -413,18 +413,18 @@ where loop { match state { ConnectionState::Invalid(config, err) => { + info!("compute node's state has likely changed; requesting a wake-up"); + let wake_res = match creds { - auth::BackendType::Console(api, creds) => { - try_wake(api.as_ref(), extra, creds).await - } - auth::BackendType::Postgres(api, creds) => { - try_wake(api.as_ref(), extra, creds).await - } + auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await, + auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await, // nothing to do? auth::BackendType::Link(_) => return Err(err.into()), + // test backend + auth::BackendType::Test(x) => x.wake_compute(), }; - match wake_res { + match handle_try_wake(wake_res) { // there was an error communicating with the control plane Err(e) => return Err(e.into()), // failed to wake up but we can continue to retry @@ -478,13 +478,10 @@ where /// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable /// * Returns Ok(Break(node)) if the wakeup succeeded /// * Returns Err(e) if there was an error -pub async fn try_wake( - api: &impl console::Api, - extra: &console::ConsoleReqExtra<'_>, - creds: &auth::ClientCredentials<'_>, +fn handle_try_wake( + result: Result, ) -> Result, WakeComputeError> { - info!("compute node's state has likely changed; requesting a wake-up"); - match api.wake_compute(extra, creds).await { + match result { Err(err) => match &err { WakeComputeError::ApiError(api) if api.could_retry() => Ok(ControlFlow::Continue(err)), _ => Err(err), @@ -494,6 +491,16 @@ pub async fn try_wake( } } +/// Attempts to wake up the compute node. +pub async fn try_wake( + api: &impl console::Api, + extra: &console::ConsoleReqExtra<'_>, + creds: &auth::ClientCredentials<'_>, +) -> Result, WakeComputeError> { + info!("compute node's state has likely changed; requesting a wake-up"); + handle_try_wake(api.wake_compute(extra, creds).await) +} + pub trait ShouldRetry { fn could_retry(&self) -> bool; fn should_retry(&self, num_retries: u32) -> bool { diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index 565f86eecc..27e14d442c 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -1,10 +1,10 @@ //! A group of high-level tests for connection establishing logic and auth. -use std::borrow::Cow; - +//! use super::*; +use crate::auth::backend::TestBackend; use crate::auth::ClientCredentials; use crate::console::{CachedNodeInfo, NodeInfo}; -use crate::{auth, sasl, scram}; +use crate::{auth, http, sasl, scram}; use async_trait::async_trait; use rstest::rstest; use tokio_postgres::config::SslMode; @@ -309,8 +309,11 @@ fn connect_compute_total_wait() { assert!(total_wait > tokio::time::Duration::from_secs(10)); } -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug)] enum ConnectAction { + Wake, + WakeFail, + WakeRetry, Connect, Retry, Fail, @@ -321,6 +324,17 @@ struct TestConnectMechanism { sequence: Vec, } +impl TestConnectMechanism { + fn verify(&self) { + let counter = self.counter.lock().unwrap(); + assert_eq!( + *counter, + self.sequence.len(), + "sequence does not proceed to the end" + ); + } +} + impl TestConnectMechanism { fn new(sequence: Vec) -> Self { Self { @@ -370,30 +384,63 @@ impl ConnectMechanism for TestConnectMechanism { ConnectAction::Connect => Ok(TestConnection), ConnectAction::Retry => Err(TestConnectError { retryable: true }), ConnectAction::Fail => Err(TestConnectError { retryable: false }), + x => panic!("expecting action {:?}, connect is called instead", x), } } fn update_connect_config(&self, _conf: &mut compute::ConnCfg) {} } -fn helper_create_connect_info() -> ( - CachedNodeInfo, - console::ConsoleReqExtra<'static>, - auth::BackendType<'static, ClientCredentials<'static>>, -) { +impl TestBackend for TestConnectMechanism { + fn wake_compute(&self) -> Result { + let mut counter = self.counter.lock().unwrap(); + let action = self.sequence[*counter]; + *counter += 1; + match action { + ConnectAction::Wake => Ok(helper_create_cached_node_info()), + ConnectAction::WakeFail => { + let err = console::errors::ApiError::Console { + status: http::StatusCode::FORBIDDEN, + text: "TEST".into(), + }; + assert!(!err.could_retry()); + Err(console::errors::WakeComputeError::ApiError(err)) + } + ConnectAction::WakeRetry => { + let err = console::errors::ApiError::Console { + status: http::StatusCode::INTERNAL_SERVER_ERROR, + text: "TEST".into(), + }; + assert!(err.could_retry()); + Err(console::errors::WakeComputeError::ApiError(err)) + } + x => panic!("expecting action {:?}, wake_compute is called instead", x), + } + } +} + +fn helper_create_cached_node_info() -> CachedNodeInfo { let node = NodeInfo { config: compute::ConnCfg::new(), aux: Default::default(), allow_self_signed_compute: false, }; - let cache = CachedNodeInfo::new_uncached(node); + CachedNodeInfo::new_uncached(node) +} + +fn helper_create_connect_info( + mechanism: &TestConnectMechanism, +) -> ( + CachedNodeInfo, + console::ConsoleReqExtra<'static>, + auth::BackendType<'_, ClientCredentials<'static>>, +) { + let cache = helper_create_cached_node_info(); let extra = console::ConsoleReqExtra { session_id: uuid::Uuid::new_v4(), application_name: Some("TEST"), }; - let url = "https://TEST_URL".parse().unwrap(); - let api = console::provider::mock::Api::new(url); - let creds = auth::BackendType::Postgres(Cow::Owned(api), ClientCredentials::new_noop()); + let creds = auth::BackendType::Test(mechanism); (cache, extra, creds) } @@ -401,42 +448,46 @@ fn helper_create_connect_info() -> ( async fn connect_to_compute_success() { use ConnectAction::*; let mechanism = TestConnectMechanism::new(vec![Connect]); - let (cache, extra, creds) = helper_create_connect_info(); + let (cache, extra, creds) = helper_create_connect_info(&mechanism); connect_to_compute(&mechanism, cache, &extra, &creds) .await .unwrap(); + mechanism.verify(); } #[tokio::test] async fn connect_to_compute_retry() { use ConnectAction::*; - let mechanism = TestConnectMechanism::new(vec![Retry, Retry, Connect]); - let (cache, extra, creds) = helper_create_connect_info(); + let mechanism = TestConnectMechanism::new(vec![Retry, Wake, Retry, Connect]); + let (cache, extra, creds) = helper_create_connect_info(&mechanism); connect_to_compute(&mechanism, cache, &extra, &creds) .await .unwrap(); + mechanism.verify(); } /// Test that we don't retry if the error is not retryable. #[tokio::test] async fn connect_to_compute_non_retry_1() { use ConnectAction::*; - let mechanism = TestConnectMechanism::new(vec![Retry, Retry, Fail]); - let (cache, extra, creds) = helper_create_connect_info(); + let mechanism = TestConnectMechanism::new(vec![Retry, Wake, Retry, Fail]); + let (cache, extra, creds) = helper_create_connect_info(&mechanism); connect_to_compute(&mechanism, cache, &extra, &creds) .await .unwrap_err(); + mechanism.verify(); } /// Even for non-retryable errors, we should retry at least once. #[tokio::test] async fn connect_to_compute_non_retry_2() { use ConnectAction::*; - let mechanism = TestConnectMechanism::new(vec![Fail, Retry, Connect]); - let (cache, extra, creds) = helper_create_connect_info(); + let mechanism = TestConnectMechanism::new(vec![Fail, Wake, Retry, Connect]); + let (cache, extra, creds) = helper_create_connect_info(&mechanism); connect_to_compute(&mechanism, cache, &extra, &creds) .await .unwrap(); + mechanism.verify(); } /// Retry for at most `NUM_RETRIES_CONNECT` times. @@ -445,11 +496,36 @@ async fn connect_to_compute_non_retry_3() { assert_eq!(NUM_RETRIES_CONNECT, 10); use ConnectAction::*; let mechanism = TestConnectMechanism::new(vec![ - Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, + Retry, Wake, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, /* the 11th time */ Retry, ]); - let (cache, extra, creds) = helper_create_connect_info(); + let (cache, extra, creds) = helper_create_connect_info(&mechanism); connect_to_compute(&mechanism, cache, &extra, &creds) .await .unwrap_err(); + mechanism.verify(); +} + +/// Should retry wake compute. +#[tokio::test] +async fn wake_retry() { + use ConnectAction::*; + let mechanism = TestConnectMechanism::new(vec![Retry, WakeRetry, Wake, Connect]); + let (cache, extra, creds) = helper_create_connect_info(&mechanism); + connect_to_compute(&mechanism, cache, &extra, &creds) + .await + .unwrap(); + mechanism.verify(); +} + +/// Wake failed with a non-retryable error. +#[tokio::test] +async fn wake_non_retry() { + use ConnectAction::*; + let mechanism = TestConnectMechanism::new(vec![Retry, WakeFail]); + let (cache, extra, creds) = helper_create_connect_info(&mechanism); + connect_to_compute(&mechanism, cache, &extra, &creds) + .await + .unwrap_err(); + mechanism.verify(); }