On connection from compute, wait for tenant to become Active.

If a connection from compute arrives while a tenant is still in
Loading state, wait for it to become Active instead of throwing an
error to the client. This should fix the errors from test_gc_cutoff
test that repeatedly restarts the pageserver and immediately tries to
connect to it.
This commit is contained in:
Heikki Linnakangas
2022-11-19 11:49:35 +02:00
committed by Christian Schwarz
parent 7552e2d25f
commit e8db20eb26
3 changed files with 66 additions and 12 deletions

View File

@@ -25,6 +25,7 @@ use std::net::TcpListener;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::pin;
use tokio_util::io::StreamReader;
use tokio_util::io::SyncIoBridge;
@@ -46,7 +47,7 @@ use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME};
use crate::profiling::profpoint_start;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use crate::tenant::{Tenant, Timeline};
use crate::tenant_mgr;
use crate::trace::Tracer;
use crate::CheckpointConfig;
@@ -278,7 +279,7 @@ impl PageServerHandler {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Make request tracer if needed
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
let tenant = get_active_tenant_with_timeout(tenant_id).await?;
let mut tracer = if tenant.get_trace_read_requests() {
let connection_id = ConnectionId::generate();
let path = tenant
@@ -290,7 +291,7 @@ impl PageServerHandler {
};
// Check that the timeline exists
let timeline = get_local_timeline(tenant_id, timeline_id)?;
let timeline = tenant.get_timeline(timeline_id, true)?;
// switch client to COPYBOTH
pgb.write_message(&BeMessage::CopyBothResponse)?;
@@ -375,7 +376,7 @@ impl PageServerHandler {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline
info!("creating new timeline");
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
let tenant = get_active_tenant_with_timeout(tenant_id).await?;
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version)?;
// TODO mark timeline as not ready until it reaches end_lsn.
@@ -430,7 +431,7 @@ impl PageServerHandler {
) -> anyhow::Result<()> {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let timeline = get_local_timeline(tenant_id, timeline_id)?;
let timeline = get_active_timeline_with_timeout(tenant_id, timeline_id).await?;
ensure!(timeline.get_last_record_lsn() == start_lsn);
// TODO leave clean state on error. For now you can use detach to clean
@@ -623,7 +624,7 @@ impl PageServerHandler {
full_backup: bool,
) -> anyhow::Result<()> {
// check that the timeline exists
let timeline = get_local_timeline(tenant_id, timeline_id)?;
let timeline = get_active_timeline_with_timeout(tenant_id, timeline_id).await?;
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
if let Some(lsn) = lsn {
// Backup was requested at a particular LSN. Wait for it to arrive.
@@ -765,7 +766,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
let timeline_id = TimelineId::from_str(params[1])?;
self.check_permission(Some(tenant_id))?;
let timeline = get_local_timeline(tenant_id, timeline_id)?;
let timeline = get_active_timeline_with_timeout(tenant_id, timeline_id).await?;
let end_of_timeline = timeline.get_last_record_rlsn();
@@ -888,7 +889,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
self.check_permission(Some(tenant_id))?;
let tenant = tenant_mgr::get_tenant(tenant_id, true)?;
let tenant = get_active_tenant_with_timeout(tenant_id).await?;
pgb.write_message(&BeMessage::RowDescription(&[
RowDescriptor::int8_col(b"checkpoint_distance"),
RowDescriptor::int8_col(b"checkpoint_timeout"),
@@ -932,8 +933,33 @@ impl postgres_backend_async::Handler for PageServerHandler {
}
}
fn get_local_timeline(tenant_id: TenantId, timeline_id: TimelineId) -> Result<Arc<Timeline>> {
tenant_mgr::get_tenant(tenant_id, true)
/// Get active tenant.
///
/// If the tenant is Loading, waits for it to become Active, for up to 30 s. That
/// ensures that queries don't fail immediately after pageserver startup, because
/// all tenants are still loading.
async fn get_active_tenant_with_timeout(tenant_id: TenantId) -> Result<Arc<Tenant>> {
let tenant = tenant_mgr::get_tenant(tenant_id, false)?;
match tokio::time::timeout(Duration::from_secs(30), tenant.wait_until_loaded()).await {
Ok(result) => {
let state = result?;
if !matches!(state, pageserver_api::models::TenantState::Active { .. }) {
anyhow::bail!("Tenant {tenant_id} is not active. Current state: {state:?}");
}
Ok(tenant)
}
Err(_) => anyhow::bail!("Timeout waiting for tenant {tenant_id} to become Active"),
}
}
/// Shorthand for getting a reference to a Timeline of an Active tenant.
async fn get_active_timeline_with_timeout(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<Arc<Timeline>> {
get_active_tenant_with_timeout(tenant_id)
.await
.and_then(|tenant| tenant.get_timeline(timeline_id, true))
}

View File

@@ -766,6 +766,8 @@ impl Tenant {
async fn load(self: &Arc<Tenant>) -> anyhow::Result<()> {
info!("loading tenant task");
utils::failpoint_sleep_millis_async!("before-loading-tenant");
// TODO split this into two functions, scan and actual load
// Load in-memory state to reflect the local files on disk
@@ -1295,6 +1297,17 @@ impl Tenant {
pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
self.state.subscribe()
}
pub async fn wait_until_loaded(&self) -> anyhow::Result<TenantState> {
let mut receiver = self.state.subscribe();
loop {
let current_state = *receiver.borrow_and_update();
if current_state != TenantState::Loading {
break Ok(current_state);
}
receiver.changed().await?;
}
}
}
/// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),

View File

@@ -56,9 +56,24 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder):
cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
# Stop the page server by force, and restart it
# Restart the server again, but delay the loading of tenants, and test what the
# pageserver does if a compute node connects and sends a request for the tenant
# while it's still in Loading state. (It waits for the loading to finish, and then
# processes the request.)
env.pageserver.stop()
env.pageserver.start()
env.pageserver.start(extra_env_vars={"FAILPOINTS": "before-loading-tenant=return(5000)"})
# Check that it's in Loading state
client = env.pageserver.http_client()
tenant_status = client.tenant_status(env.initial_tenant)
log.info("Tenant status : %s", tenant_status)
assert tenant_status["state"] == "Loading"
# Try to read. This waits until the loading finishes, and then return normally.
pg_conn = pg.connect()
cur = pg_conn.cursor()
cur.execute("SELECT count(*) FROM foo")
assert cur.fetchone() == (100000,)
# Test that repeatedly kills and restarts the page server, while the