mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
Wait for tenant activation during lazy loading
This commit is contained in:
@@ -56,18 +56,34 @@ impl LazyTenantsMap {
|
||||
.with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))
|
||||
}
|
||||
|
||||
async fn try_load_tenant(&self, tenant_id: &TenantId) -> Result<Arc<Tenant>, GetTenantError> {
|
||||
self.load_tenant(tenant_id)
|
||||
.map_err(|e| GetTenantError::NotLoaded(*tenant_id, e))
|
||||
async fn try_load_tenant(
|
||||
&self,
|
||||
tenant_id: &TenantId,
|
||||
wait_to_become_active: bool,
|
||||
) -> Result<Arc<Tenant>, GetTenantError> {
|
||||
let tenant = self
|
||||
.load_tenant(tenant_id)
|
||||
.map_err(|e| GetTenantError::NotLoaded(*tenant_id, e))?;
|
||||
if wait_to_become_active {
|
||||
tenant
|
||||
.wait_to_become_active()
|
||||
.await
|
||||
.map_err(|_| GetTenantError::NotActive(*tenant_id))?;
|
||||
}
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
async fn get(&self, tenant_id: &TenantId) -> Result<&Arc<Tenant>, GetTenantError> {
|
||||
async fn get(
|
||||
&self,
|
||||
tenant_id: &TenantId,
|
||||
wait_to_become_active: bool,
|
||||
) -> Result<&Arc<Tenant>, GetTenantError> {
|
||||
let tenant = self
|
||||
.map
|
||||
.get(tenant_id)
|
||||
.ok_or(GetTenantError::NotFound(*tenant_id))?;
|
||||
tenant
|
||||
.get_or_try_init(|| self.try_load_tenant(tenant_id))
|
||||
.get_or_try_init(|| self.try_load_tenant(tenant_id, wait_to_become_active))
|
||||
.await
|
||||
}
|
||||
}
|
||||
@@ -86,10 +102,14 @@ enum TenantsMap {
|
||||
}
|
||||
|
||||
impl TenantsMap {
|
||||
async fn get(&self, tenant_id: &TenantId) -> Result<&Arc<Tenant>, GetTenantError> {
|
||||
async fn get(
|
||||
&self,
|
||||
tenant_id: &TenantId,
|
||||
wait_to_become_active: bool,
|
||||
) -> Result<&Arc<Tenant>, GetTenantError> {
|
||||
match self {
|
||||
TenantsMap::Initializing => Err(GetTenantError::NotFound(*tenant_id)),
|
||||
TenantsMap::Open(m) => m.get(tenant_id).await,
|
||||
TenantsMap::Open(m) => m.get(tenant_id, wait_to_become_active).await,
|
||||
TenantsMap::ShuttingDown(m) => {
|
||||
m.get(tenant_id).ok_or(GetTenantError::NotFound(*tenant_id))
|
||||
}
|
||||
@@ -484,7 +504,7 @@ pub async fn get_tenant(
|
||||
active_only: bool,
|
||||
) -> Result<Arc<Tenant>, GetTenantError> {
|
||||
let m = TENANTS.read().await;
|
||||
let tenant = m.get(&tenant_id).await?;
|
||||
let tenant = m.get(&tenant_id, active_only).await?;
|
||||
if active_only && !tenant.is_active() {
|
||||
Err(GetTenantError::NotActive(tenant_id))
|
||||
} else {
|
||||
@@ -620,7 +640,7 @@ pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapLis
|
||||
(
|
||||
*id,
|
||||
tenant
|
||||
.get_or_try_init(|| m.try_load_tenant(id))
|
||||
.get_or_try_init(|| m.try_load_tenant(id, false))
|
||||
.await
|
||||
.map_or(
|
||||
TenantState::broken_from_reason("Failed to load tenant".to_string()),
|
||||
@@ -764,7 +784,7 @@ where
|
||||
// avoid holding the lock for the entire process.
|
||||
{
|
||||
let tenants_accessor = TENANTS.write().await;
|
||||
match tenants_accessor.get(&tenant_id).await {
|
||||
match tenants_accessor.get(&tenant_id, false).await {
|
||||
Ok(tenant) => {
|
||||
let tenant = Arc::clone(tenant);
|
||||
// don't hold TENANTS lock while set_stopping waits for activation to finish
|
||||
@@ -804,7 +824,7 @@ where
|
||||
}
|
||||
Err(e) => {
|
||||
let tenants_accessor = TENANTS.read().await;
|
||||
match tenants_accessor.get(&tenant_id).await {
|
||||
match tenants_accessor.get(&tenant_id, false).await {
|
||||
Ok(tenant) => {
|
||||
tenant.set_broken(e.to_string()).await;
|
||||
}
|
||||
@@ -871,7 +891,7 @@ pub async fn immediate_compact(
|
||||
let guard = TENANTS.read().await;
|
||||
|
||||
let tenant = guard
|
||||
.get(&tenant_id)
|
||||
.get(&tenant_id, true)
|
||||
.await
|
||||
.map(Arc::clone)
|
||||
.with_context(|| format!("tenant {tenant_id}"))
|
||||
|
||||
@@ -21,6 +21,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
|
||||
".*will not become active. Current state: Broken.*",
|
||||
".*failed to load metadata.*",
|
||||
".*load failed.*load local timeline.*",
|
||||
".*load failed, setting tenant state to Broken.*",
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user