From 87389bc933e870cb10d0f88aabce80333d1792a6 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Wed, 8 Nov 2023 11:48:57 -0800 Subject: [PATCH] Add test simulating bad connection between pageserver and compute (#5728) ## Problem We have a funny 3-day timeout for connections between the compute and pageserver. We want to get rid of it, so to do that we need to make sure the compute is resilient to connection failures. Closes: https://github.com/neondatabase/neon/issues/5518 ## Summary of changes This test makes the pageserver randomly drop the connection if the failpoint is enabled, and ensures we can keep querying the pageserver. This PR also reduces the default timeout to 10 minutes from 3 days. --- libs/postgres_backend/src/lib.rs | 11 +++- pageserver/src/page_service.rs | 30 +++++++++-- test_runner/regress/test_bad_connection.py | 60 ++++++++++++++++++++++ 3 files changed, 96 insertions(+), 5 deletions(-) create mode 100644 test_runner/regress/test_bad_connection.py diff --git a/libs/postgres_backend/src/lib.rs b/libs/postgres_backend/src/lib.rs index 92a0ec7c73..1dae008a4f 100644 --- a/libs/postgres_backend/src/lib.rs +++ b/libs/postgres_backend/src/lib.rs @@ -38,6 +38,8 @@ pub enum QueryError { /// Authentication failure #[error("Unauthorized: {0}")] Unauthorized(std::borrow::Cow<'static, str>), + #[error("Simulated Connection Error")] + SimulatedConnectionError, /// Some other error #[error(transparent)] Other(#[from] anyhow::Error), @@ -52,7 +54,7 @@ impl From for QueryError { impl QueryError { pub fn pg_error_code(&self) -> &'static [u8; 5] { match self { - Self::Disconnected(_) => b"08006", // connection failure + Self::Disconnected(_) | Self::SimulatedConnectionError => b"08006", // connection failure Self::Shutdown => SQLSTATE_ADMIN_SHUTDOWN, Self::Unauthorized(_) => SQLSTATE_INTERNAL_ERROR, Self::Other(_) => SQLSTATE_INTERNAL_ERROR, // internal error @@ -736,6 +738,9 @@ impl PostgresBackend { if let Err(e) = handler.process_query(self, query_string).await { match e { QueryError::Shutdown => return Ok(ProcessMsgResult::Break), + QueryError::SimulatedConnectionError => { + return Err(QueryError::SimulatedConnectionError) + } e => { log_query_error(query_string, &e); let short_error = short_error(&e); @@ -971,6 +976,7 @@ pub fn short_error(e: &QueryError) -> String { QueryError::Disconnected(connection_error) => connection_error.to_string(), QueryError::Shutdown => "shutdown".to_string(), QueryError::Unauthorized(_e) => "JWT authentication error".to_string(), + QueryError::SimulatedConnectionError => "simulated connection error".to_string(), QueryError::Other(e) => format!("{e:#}"), } } @@ -987,6 +993,9 @@ fn log_query_error(query: &str, e: &QueryError) { QueryError::Disconnected(other_connection_error) => { error!("query handler for '{query}' failed with connection error: {other_connection_error:?}") } + QueryError::SimulatedConnectionError => { + error!("query handler for query '{query}' failed due to a simulated connection error") + } QueryError::Shutdown => { info!("query handler for '{query}' cancelled during tenant shutdown") } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 5487a9d7c1..2201d6c86b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -218,9 +218,27 @@ async fn page_service_conn_main( // no write timeout is used, because the kernel is assumed to error writes after some time. let mut socket = tokio_io_timeout::TimeoutReader::new(socket); - // timeout should be lower, but trying out multiple days for - // - socket.set_timeout(Some(std::time::Duration::from_secs(60 * 60 * 24 * 3))); + let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default + let socket_timeout_ms = (|| { + fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| { + // Exponential distribution for simulating + // poor network conditions, expect about avg_timeout_ms to be around 15 + // in tests + if let Some(avg_timeout_ms) = avg_timeout_ms { + let avg = avg_timeout_ms.parse::().unwrap() as f32; + let u = rand::random::(); + ((1.0 - u).ln() / (-avg)) as u64 + } else { + default_timeout_ms + } + }); + default_timeout_ms + })(); + + // A timeout here does not mean the client died, it can happen if it's just idle for + // a while: we will tear down this PageServerHandler and instantiate a new one if/when + // they reconnect. + socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms))); let socket = std::pin::pin!(socket); // XXX: pgbackend.run() should take the connection_ctx, @@ -981,9 +999,13 @@ where pgb: &mut PostgresBackend, query_string: &str, ) -> Result<(), QueryError> { + fail::fail_point!("simulated-bad-compute-connection", |_| { + info!("Hit failpoint for bad connection"); + Err(QueryError::SimulatedConnectionError) + }); + let ctx = self.connection_ctx.attached_child(); debug!("process query {query_string:?}"); - if query_string.starts_with("pagestream ") { let (_, params_raw) = query_string.split_at("pagestream ".len()); let params = params_raw.split(' ').collect::>(); diff --git a/test_runner/regress/test_bad_connection.py b/test_runner/regress/test_bad_connection.py new file mode 100644 index 0000000000..ba0624c730 --- /dev/null +++ b/test_runner/regress/test_bad_connection.py @@ -0,0 +1,60 @@ +import random +import time + +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder + + +def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder): + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.append(".*simulated connection error.*") + + pageserver_http = env.pageserver.http_client() + env.neon_cli.create_branch("test_compute_pageserver_connection_stress") + endpoint = env.endpoints.create_start("test_compute_pageserver_connection_stress") + + # Enable failpoint after starting everything else up so that loading initial + # basebackup doesn't fail + pageserver_http.configure_failpoints(("simulated-bad-compute-connection", "50%return(15)")) + + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + # Create table, and insert some rows. Make it big enough that it doesn't fit in + # shared_buffers, otherwise the SELECT after restart will just return answer + # from shared_buffers without hitting the page server, which defeats the point + # of this test. + cur.execute("CREATE TABLE foo (t text)") + cur.execute( + """ + INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 100000) g + """ + ) + + # Verify that the table is larger than shared_buffers + cur.execute( + """ + select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size + from pg_settings where name = 'shared_buffers' + """ + ) + row = cur.fetchone() + assert row is not None + log.info(f"shared_buffers is {row[0]}, table size {row[1]}") + assert int(row[0]) < int(row[1]) + + cur.execute("SELECT count(*) FROM foo") + assert cur.fetchone() == (100000,) + + end_time = time.time() + 30 + times_executed = 0 + while time.time() < end_time: + if random.random() < 0.5: + cur.execute("INSERT INTO foo VALUES ('stas'), ('heikki')") + else: + cur.execute("SELECT t FROM foo ORDER BY RANDOM() LIMIT 10") + cur.fetchall() + times_executed += 1 + log.info(f"Workload executed {times_executed} times")