diff --git a/libs/postgres_backend/src/lib.rs b/libs/postgres_backend/src/lib.rs index 92a0ec7c73..1dae008a4f 100644 --- a/libs/postgres_backend/src/lib.rs +++ b/libs/postgres_backend/src/lib.rs @@ -38,6 +38,8 @@ pub enum QueryError { /// Authentication failure #[error("Unauthorized: {0}")] Unauthorized(std::borrow::Cow<'static, str>), + #[error("Simulated Connection Error")] + SimulatedConnectionError, /// Some other error #[error(transparent)] Other(#[from] anyhow::Error), @@ -52,7 +54,7 @@ impl From for QueryError { impl QueryError { pub fn pg_error_code(&self) -> &'static [u8; 5] { match self { - Self::Disconnected(_) => b"08006", // connection failure + Self::Disconnected(_) | Self::SimulatedConnectionError => b"08006", // connection failure Self::Shutdown => SQLSTATE_ADMIN_SHUTDOWN, Self::Unauthorized(_) => SQLSTATE_INTERNAL_ERROR, Self::Other(_) => SQLSTATE_INTERNAL_ERROR, // internal error @@ -736,6 +738,9 @@ impl PostgresBackend { if let Err(e) = handler.process_query(self, query_string).await { match e { QueryError::Shutdown => return Ok(ProcessMsgResult::Break), + QueryError::SimulatedConnectionError => { + return Err(QueryError::SimulatedConnectionError) + } e => { log_query_error(query_string, &e); let short_error = short_error(&e); @@ -971,6 +976,7 @@ pub fn short_error(e: &QueryError) -> String { QueryError::Disconnected(connection_error) => connection_error.to_string(), QueryError::Shutdown => "shutdown".to_string(), QueryError::Unauthorized(_e) => "JWT authentication error".to_string(), + QueryError::SimulatedConnectionError => "simulated connection error".to_string(), QueryError::Other(e) => format!("{e:#}"), } } @@ -987,6 +993,9 @@ fn log_query_error(query: &str, e: &QueryError) { QueryError::Disconnected(other_connection_error) => { error!("query handler for '{query}' failed with connection error: {other_connection_error:?}") } + QueryError::SimulatedConnectionError => { + error!("query handler for query '{query}' failed due to a simulated connection error") + } QueryError::Shutdown => { info!("query handler for '{query}' cancelled during tenant shutdown") } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 5487a9d7c1..2201d6c86b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -218,9 +218,27 @@ async fn page_service_conn_main( // no write timeout is used, because the kernel is assumed to error writes after some time. let mut socket = tokio_io_timeout::TimeoutReader::new(socket); - // timeout should be lower, but trying out multiple days for - // - socket.set_timeout(Some(std::time::Duration::from_secs(60 * 60 * 24 * 3))); + let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default + let socket_timeout_ms = (|| { + fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| { + // Exponential distribution for simulating + // poor network conditions, expect about avg_timeout_ms to be around 15 + // in tests + if let Some(avg_timeout_ms) = avg_timeout_ms { + let avg = avg_timeout_ms.parse::().unwrap() as f32; + let u = rand::random::(); + ((1.0 - u).ln() / (-avg)) as u64 + } else { + default_timeout_ms + } + }); + default_timeout_ms + })(); + + // A timeout here does not mean the client died, it can happen if it's just idle for + // a while: we will tear down this PageServerHandler and instantiate a new one if/when + // they reconnect. + socket.set_timeout(Some(std::time::Duration::from_millis(socket_timeout_ms))); let socket = std::pin::pin!(socket); // XXX: pgbackend.run() should take the connection_ctx, @@ -981,9 +999,13 @@ where pgb: &mut PostgresBackend, query_string: &str, ) -> Result<(), QueryError> { + fail::fail_point!("simulated-bad-compute-connection", |_| { + info!("Hit failpoint for bad connection"); + Err(QueryError::SimulatedConnectionError) + }); + let ctx = self.connection_ctx.attached_child(); debug!("process query {query_string:?}"); - if query_string.starts_with("pagestream ") { let (_, params_raw) = query_string.split_at("pagestream ".len()); let params = params_raw.split(' ').collect::>(); diff --git a/test_runner/regress/test_bad_connection.py b/test_runner/regress/test_bad_connection.py new file mode 100644 index 0000000000..ba0624c730 --- /dev/null +++ b/test_runner/regress/test_bad_connection.py @@ -0,0 +1,60 @@ +import random +import time + +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder + + +def test_compute_pageserver_connection_stress(neon_env_builder: NeonEnvBuilder): + env = neon_env_builder.init_start() + env.pageserver.allowed_errors.append(".*simulated connection error.*") + + pageserver_http = env.pageserver.http_client() + env.neon_cli.create_branch("test_compute_pageserver_connection_stress") + endpoint = env.endpoints.create_start("test_compute_pageserver_connection_stress") + + # Enable failpoint after starting everything else up so that loading initial + # basebackup doesn't fail + pageserver_http.configure_failpoints(("simulated-bad-compute-connection", "50%return(15)")) + + pg_conn = endpoint.connect() + cur = pg_conn.cursor() + + # Create table, and insert some rows. Make it big enough that it doesn't fit in + # shared_buffers, otherwise the SELECT after restart will just return answer + # from shared_buffers without hitting the page server, which defeats the point + # of this test. + cur.execute("CREATE TABLE foo (t text)") + cur.execute( + """ + INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 100000) g + """ + ) + + # Verify that the table is larger than shared_buffers + cur.execute( + """ + select setting::int * pg_size_bytes(unit) as shared_buffers, pg_relation_size('foo') as tbl_size + from pg_settings where name = 'shared_buffers' + """ + ) + row = cur.fetchone() + assert row is not None + log.info(f"shared_buffers is {row[0]}, table size {row[1]}") + assert int(row[0]) < int(row[1]) + + cur.execute("SELECT count(*) FROM foo") + assert cur.fetchone() == (100000,) + + end_time = time.time() + 30 + times_executed = 0 + while time.time() < end_time: + if random.random() < 0.5: + cur.execute("INSERT INTO foo VALUES ('stas'), ('heikki')") + else: + cur.execute("SELECT t FROM foo ORDER BY RANDOM() LIMIT 10") + cur.fetchall() + times_executed += 1 + log.info(f"Workload executed {times_executed} times")