From e8db20eb264707b381cccf178c2ab755610b6988 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sat, 19 Nov 2022 11:49:35 +0200 Subject: [PATCH] On connection from compute, wait for tenant to become Active. If a connection from compute arrives while a tenant is still in Loading state, wait for it to become Active instead of throwing an error to the client. This should fix the errors from test_gc_cutoff test that repeatedly restarts the pageserver and immediately tries to connect to it. --- pageserver/src/page_service.rs | 46 +++++++++++++++---- pageserver/src/tenant.rs | 13 ++++++ .../regress/test_pageserver_restart.py | 19 +++++++- 3 files changed, 66 insertions(+), 12 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index eb9416a482..5de11017c6 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -25,6 +25,7 @@ use std::net::TcpListener; use std::str; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use tokio::pin; use tokio_util::io::StreamReader; use tokio_util::io::SyncIoBridge; @@ -46,7 +47,7 @@ use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME}; use crate::profiling::profpoint_start; use crate::task_mgr; use crate::task_mgr::TaskKind; -use crate::tenant::Timeline; +use crate::tenant::{Tenant, Timeline}; use crate::tenant_mgr; use crate::trace::Tracer; use crate::CheckpointConfig; @@ -278,7 +279,7 @@ impl PageServerHandler { task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); // Make request tracer if needed - let tenant = tenant_mgr::get_tenant(tenant_id, true)?; + let tenant = get_active_tenant_with_timeout(tenant_id).await?; let mut tracer = if tenant.get_trace_read_requests() { let connection_id = ConnectionId::generate(); let path = tenant @@ -290,7 +291,7 @@ impl PageServerHandler { }; // Check that the timeline exists - let timeline = get_local_timeline(tenant_id, timeline_id)?; + let timeline = tenant.get_timeline(timeline_id, true)?; // switch client to COPYBOTH pgb.write_message(&BeMessage::CopyBothResponse)?; @@ -375,7 +376,7 @@ impl PageServerHandler { task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); // Create empty timeline info!("creating new timeline"); - let tenant = tenant_mgr::get_tenant(tenant_id, true)?; + let tenant = get_active_tenant_with_timeout(tenant_id).await?; let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version)?; // TODO mark timeline as not ready until it reaches end_lsn. @@ -430,7 +431,7 @@ impl PageServerHandler { ) -> anyhow::Result<()> { task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); - let timeline = get_local_timeline(tenant_id, timeline_id)?; + let timeline = get_active_timeline_with_timeout(tenant_id, timeline_id).await?; ensure!(timeline.get_last_record_lsn() == start_lsn); // TODO leave clean state on error. For now you can use detach to clean @@ -623,7 +624,7 @@ impl PageServerHandler { full_backup: bool, ) -> anyhow::Result<()> { // check that the timeline exists - let timeline = get_local_timeline(tenant_id, timeline_id)?; + let timeline = get_active_timeline_with_timeout(tenant_id, timeline_id).await?; let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); if let Some(lsn) = lsn { // Backup was requested at a particular LSN. Wait for it to arrive. @@ -765,7 +766,7 @@ impl postgres_backend_async::Handler for PageServerHandler { let timeline_id = TimelineId::from_str(params[1])?; self.check_permission(Some(tenant_id))?; - let timeline = get_local_timeline(tenant_id, timeline_id)?; + let timeline = get_active_timeline_with_timeout(tenant_id, timeline_id).await?; let end_of_timeline = timeline.get_last_record_rlsn(); @@ -888,7 +889,7 @@ impl postgres_backend_async::Handler for PageServerHandler { self.check_permission(Some(tenant_id))?; - let tenant = tenant_mgr::get_tenant(tenant_id, true)?; + let tenant = get_active_tenant_with_timeout(tenant_id).await?; pgb.write_message(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"checkpoint_distance"), RowDescriptor::int8_col(b"checkpoint_timeout"), @@ -932,8 +933,33 @@ impl postgres_backend_async::Handler for PageServerHandler { } } -fn get_local_timeline(tenant_id: TenantId, timeline_id: TimelineId) -> Result> { - tenant_mgr::get_tenant(tenant_id, true) +/// Get active tenant. +/// +/// If the tenant is Loading, waits for it to become Active, for up to 30 s. That +/// ensures that queries don't fail immediately after pageserver startup, because +/// all tenants are still loading. +async fn get_active_tenant_with_timeout(tenant_id: TenantId) -> Result> { + let tenant = tenant_mgr::get_tenant(tenant_id, false)?; + + match tokio::time::timeout(Duration::from_secs(30), tenant.wait_until_loaded()).await { + Ok(result) => { + let state = result?; + if !matches!(state, pageserver_api::models::TenantState::Active { .. }) { + anyhow::bail!("Tenant {tenant_id} is not active. Current state: {state:?}"); + } + Ok(tenant) + } + Err(_) => anyhow::bail!("Timeout waiting for tenant {tenant_id} to become Active"), + } +} + +/// Shorthand for getting a reference to a Timeline of an Active tenant. +async fn get_active_timeline_with_timeout( + tenant_id: TenantId, + timeline_id: TimelineId, +) -> Result> { + get_active_tenant_with_timeout(tenant_id) + .await .and_then(|tenant| tenant.get_timeline(timeline_id, true)) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 81db7d593b..5056c9ddc2 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -766,6 +766,8 @@ impl Tenant { async fn load(self: &Arc) -> anyhow::Result<()> { info!("loading tenant task"); + utils::failpoint_sleep_millis_async!("before-loading-tenant"); + // TODO split this into two functions, scan and actual load // Load in-memory state to reflect the local files on disk @@ -1295,6 +1297,17 @@ impl Tenant { pub fn subscribe_for_state_updates(&self) -> watch::Receiver { self.state.subscribe() } + + pub async fn wait_until_loaded(&self) -> anyhow::Result { + let mut receiver = self.state.subscribe(); + loop { + let current_state = *receiver.borrow_and_update(); + if current_state != TenantState::Loading { + break Ok(current_state); + } + receiver.changed().await?; + } + } } /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id), diff --git a/test_runner/regress/test_pageserver_restart.py b/test_runner/regress/test_pageserver_restart.py index ad06634ae9..e48815906b 100644 --- a/test_runner/regress/test_pageserver_restart.py +++ b/test_runner/regress/test_pageserver_restart.py @@ -56,9 +56,24 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder): cur.execute("SELECT count(*) FROM foo") assert cur.fetchone() == (100000,) - # Stop the page server by force, and restart it + # Restart the server again, but delay the loading of tenants, and test what the + # pageserver does if a compute node connects and sends a request for the tenant + # while it's still in Loading state. (It waits for the loading to finish, and then + # processes the request.) env.pageserver.stop() - env.pageserver.start() + env.pageserver.start(extra_env_vars={"FAILPOINTS": "before-loading-tenant=return(5000)"}) + + # Check that it's in Loading state + client = env.pageserver.http_client() + tenant_status = client.tenant_status(env.initial_tenant) + log.info("Tenant status : %s", tenant_status) + assert tenant_status["state"] == "Loading" + + # Try to read. This waits until the loading finishes, and then return normally. + pg_conn = pg.connect() + cur = pg_conn.cursor() + cur.execute("SELECT count(*) FROM foo") + assert cur.fetchone() == (100000,) # Test that repeatedly kills and restarts the page server, while the