mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 21:42:56 +00:00
pageserver: return proper status code for heatmap_upload errors (#9991)
## Problem During deploys, we see a lot of 500 errors due to heapmap uploads for inactive tenants. These should be 503s instead. Resolves #9574. ## Summary of changes Make the secondary tenant scheduler use `ApiError` rather than `anyhow::Error`, to propagate the tenant error and convert it to an appropriate status code.
This commit is contained in:
@@ -279,7 +279,10 @@ impl From<TenantStateError> for ApiError {
|
||||
impl From<GetTenantError> for ApiError {
|
||||
fn from(tse: GetTenantError) -> ApiError {
|
||||
match tse {
|
||||
GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
|
||||
GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {tid}").into()),
|
||||
GetTenantError::ShardNotFound(tid) => {
|
||||
ApiError::NotFound(anyhow!("tenant {tid}").into())
|
||||
}
|
||||
GetTenantError::NotActive(_) => {
|
||||
// Why is this not `ApiError::NotFound`?
|
||||
// Because we must be careful to never return 404 for a tenant if it does
|
||||
@@ -387,6 +390,16 @@ impl From<crate::tenant::mgr::DeleteTenantError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::secondary::SecondaryTenantError> for ApiError {
|
||||
fn from(ste: crate::tenant::secondary::SecondaryTenantError) -> ApiError {
|
||||
use crate::tenant::secondary::SecondaryTenantError;
|
||||
match ste {
|
||||
SecondaryTenantError::GetTenant(gte) => gte.into(),
|
||||
SecondaryTenantError::ShuttingDown => ApiError::ShuttingDown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to construct a TimelineInfo struct for a timeline
|
||||
async fn build_timeline_info(
|
||||
timeline: &Arc<Timeline>,
|
||||
@@ -1047,9 +1060,11 @@ async fn timeline_delete_handler(
|
||||
match e {
|
||||
// GetTenantError has a built-in conversion to ApiError, but in this context we don't
|
||||
// want to treat missing tenants as 404, to avoid ambiguity with successful deletions.
|
||||
GetTenantError::NotFound(_) => ApiError::PreconditionFailed(
|
||||
"Requested tenant is missing".to_string().into_boxed_str(),
|
||||
),
|
||||
GetTenantError::NotFound(_) | GetTenantError::ShardNotFound(_) => {
|
||||
ApiError::PreconditionFailed(
|
||||
"Requested tenant is missing".to_string().into_boxed_str(),
|
||||
)
|
||||
}
|
||||
e => e.into(),
|
||||
}
|
||||
})?;
|
||||
@@ -2462,8 +2477,7 @@ async fn secondary_upload_handler(
|
||||
state
|
||||
.secondary_controller
|
||||
.upload_tenant(tenant_shard_id)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
@@ -2578,7 +2592,7 @@ async fn secondary_download_handler(
|
||||
// Edge case: downloads aren't usually fallible: things like a missing heatmap are considered
|
||||
// okay. We could get an error here in the unlikely edge case that the tenant
|
||||
// was detached between our check above and executing the download job.
|
||||
Ok(Err(e)) => return Err(ApiError::InternalServerError(e)),
|
||||
Ok(Err(e)) => return Err(e.into()),
|
||||
// A timeout is not an error: we have started the download, we're just not done
|
||||
// yet. The caller will get a response body indicating status.
|
||||
Err(_) => StatusCode::ACCEPTED,
|
||||
|
||||
@@ -3422,7 +3422,7 @@ impl Tenant {
|
||||
r.map_err(
|
||||
|_e: tokio::sync::watch::error::RecvError|
|
||||
// Tenant existed but was dropped: report it as non-existent
|
||||
GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id))
|
||||
GetActiveTenantError::NotFound(GetTenantError::ShardNotFound(self.tenant_shard_id))
|
||||
)?
|
||||
}
|
||||
Err(TimeoutCancellableError::Cancelled) => {
|
||||
|
||||
@@ -894,7 +894,7 @@ impl TenantManager {
|
||||
Some(TenantSlot::Attached(tenant)) => Ok(Arc::clone(tenant)),
|
||||
Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
|
||||
None | Some(TenantSlot::Secondary(_)) => {
|
||||
Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
|
||||
Err(GetTenantError::ShardNotFound(tenant_shard_id))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2258,6 +2258,9 @@ pub(crate) enum GetTenantError {
|
||||
#[error("Tenant {0} not found")]
|
||||
NotFound(TenantId),
|
||||
|
||||
#[error("Tenant {0} not found")]
|
||||
ShardNotFound(TenantShardId),
|
||||
|
||||
#[error("Tenant {0} is not active")]
|
||||
NotActive(TenantShardId),
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ use super::{
|
||||
mgr::TenantManager,
|
||||
span::debug_assert_current_span_has_tenant_id,
|
||||
storage_layer::LayerName,
|
||||
GetTenantError,
|
||||
};
|
||||
|
||||
use crate::metrics::SECONDARY_RESIDENT_PHYSICAL_SIZE;
|
||||
@@ -66,7 +67,21 @@ struct CommandRequest<T> {
|
||||
}
|
||||
|
||||
struct CommandResponse {
|
||||
result: anyhow::Result<()>,
|
||||
result: Result<(), SecondaryTenantError>,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum SecondaryTenantError {
|
||||
#[error("{0}")]
|
||||
GetTenant(GetTenantError),
|
||||
#[error("shutting down")]
|
||||
ShuttingDown,
|
||||
}
|
||||
|
||||
impl From<GetTenantError> for SecondaryTenantError {
|
||||
fn from(gte: GetTenantError) -> Self {
|
||||
Self::GetTenant(gte)
|
||||
}
|
||||
}
|
||||
|
||||
// Whereas [`Tenant`] represents an attached tenant, this type represents the work
|
||||
@@ -285,7 +300,7 @@ impl SecondaryController {
|
||||
&self,
|
||||
queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
|
||||
payload: T,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), SecondaryTenantError> {
|
||||
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
|
||||
|
||||
queue
|
||||
@@ -294,20 +309,26 @@ impl SecondaryController {
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
|
||||
.map_err(|_| SecondaryTenantError::ShuttingDown)?;
|
||||
|
||||
let response = response_rx
|
||||
.await
|
||||
.map_err(|_| anyhow::anyhow!("Request dropped"))?;
|
||||
.map_err(|_| SecondaryTenantError::ShuttingDown)?;
|
||||
|
||||
response.result
|
||||
}
|
||||
|
||||
pub async fn upload_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
|
||||
pub(crate) async fn upload_tenant(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Result<(), SecondaryTenantError> {
|
||||
self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id))
|
||||
.await
|
||||
}
|
||||
pub async fn download_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
|
||||
pub(crate) async fn download_tenant(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Result<(), SecondaryTenantError> {
|
||||
self.dispatch(
|
||||
&self.download_req_tx,
|
||||
DownloadCommand::Download(tenant_shard_id),
|
||||
|
||||
@@ -35,7 +35,7 @@ use super::{
|
||||
self, period_jitter, period_warmup, Completion, JobGenerator, SchedulingResult,
|
||||
TenantBackgroundJobs,
|
||||
},
|
||||
SecondaryTenant,
|
||||
GetTenantError, SecondaryTenant, SecondaryTenantError,
|
||||
};
|
||||
|
||||
use crate::tenant::{
|
||||
@@ -470,15 +470,16 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
result
|
||||
}
|
||||
|
||||
fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
|
||||
fn on_command(
|
||||
&mut self,
|
||||
command: DownloadCommand,
|
||||
) -> Result<PendingDownload, SecondaryTenantError> {
|
||||
let tenant_shard_id = command.get_tenant_shard_id();
|
||||
|
||||
let tenant = self
|
||||
.tenant_manager
|
||||
.get_secondary_tenant_shard(*tenant_shard_id);
|
||||
let Some(tenant) = tenant else {
|
||||
return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
|
||||
};
|
||||
.get_secondary_tenant_shard(*tenant_shard_id)
|
||||
.ok_or(GetTenantError::ShardNotFound(*tenant_shard_id))?;
|
||||
|
||||
Ok(PendingDownload {
|
||||
target_time: None,
|
||||
|
||||
@@ -28,7 +28,7 @@ use super::{
|
||||
self, period_jitter, period_warmup, JobGenerator, RunningJob, SchedulingResult,
|
||||
TenantBackgroundJobs,
|
||||
},
|
||||
CommandRequest, UploadCommand,
|
||||
CommandRequest, SecondaryTenantError, UploadCommand,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, instrument, Instrument};
|
||||
@@ -279,7 +279,10 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
|
||||
}.instrument(info_span!(parent: None, "heatmap_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
|
||||
}
|
||||
|
||||
fn on_command(&mut self, command: UploadCommand) -> anyhow::Result<UploadPending> {
|
||||
fn on_command(
|
||||
&mut self,
|
||||
command: UploadCommand,
|
||||
) -> Result<UploadPending, SecondaryTenantError> {
|
||||
let tenant_shard_id = command.get_tenant_shard_id();
|
||||
|
||||
tracing::info!(
|
||||
@@ -287,8 +290,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
|
||||
"Starting heatmap write on command");
|
||||
let tenant = self
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(*tenant_shard_id)
|
||||
.map_err(|e| anyhow::anyhow!(e))?;
|
||||
.get_attached_tenant_shard(*tenant_shard_id)?;
|
||||
if !tenant.is_active() {
|
||||
return Err(GetTenantError::NotActive(*tenant_shard_id).into());
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{completion::Barrier, yielding_loop::yielding_loop};
|
||||
|
||||
use super::{CommandRequest, CommandResponse};
|
||||
use super::{CommandRequest, CommandResponse, SecondaryTenantError};
|
||||
|
||||
/// Scheduling interval is the time between calls to JobGenerator::schedule.
|
||||
/// When we schedule jobs, the job generator may provide a hint of its preferred
|
||||
@@ -112,7 +112,7 @@ where
|
||||
|
||||
/// Called when a command is received. A job will be spawned immediately if the return
|
||||
/// value is Some, ignoring concurrency limits and the pending queue.
|
||||
fn on_command(&mut self, cmd: CMD) -> anyhow::Result<PJ>;
|
||||
fn on_command(&mut self, cmd: CMD) -> Result<PJ, SecondaryTenantError>;
|
||||
}
|
||||
|
||||
/// [`JobGenerator`] returns this to provide pending jobs, and hints about scheduling
|
||||
|
||||
Reference in New Issue
Block a user