mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-07 21:42:56 +00:00
@@ -4,7 +4,7 @@ use std::{collections::HashMap, time::Duration};
|
||||
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use futures::StreamExt;
|
||||
use hyper::{Method, StatusCode};
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
|
||||
use postgres_connection::parse_host_port;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -328,7 +328,7 @@ impl ComputeHook {
|
||||
reconfigure_request: &ComputeHookNotifyRequest,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(), NotifyError> {
|
||||
let req = self.client.request(Method::PUT, url);
|
||||
let req = self.client.request(reqwest::Method::PUT, url);
|
||||
let req = if let Some(value) = &self.authorization_header {
|
||||
req.header(reqwest::header::AUTHORIZATION, value)
|
||||
} else {
|
||||
@@ -347,8 +347,10 @@ impl ComputeHook {
|
||||
};
|
||||
|
||||
// Treat all 2xx responses as success
|
||||
if response.status() >= StatusCode::OK && response.status() < StatusCode::MULTIPLE_CHOICES {
|
||||
if response.status() != StatusCode::OK {
|
||||
if response.status() >= reqwest::StatusCode::OK
|
||||
&& response.status() < reqwest::StatusCode::MULTIPLE_CHOICES
|
||||
{
|
||||
if response.status() != reqwest::StatusCode::OK {
|
||||
// Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
|
||||
// log a warning.
|
||||
tracing::warn!(
|
||||
@@ -362,7 +364,7 @@ impl ComputeHook {
|
||||
|
||||
// Error response codes
|
||||
match response.status() {
|
||||
StatusCode::TOO_MANY_REQUESTS => {
|
||||
reqwest::StatusCode::TOO_MANY_REQUESTS => {
|
||||
// TODO: 429 handling should be global: set some state visible to other requests
|
||||
// so that they will delay before starting, rather than all notifications trying
|
||||
// once before backing off.
|
||||
@@ -371,20 +373,30 @@ impl ComputeHook {
|
||||
.ok();
|
||||
Err(NotifyError::SlowDown)
|
||||
}
|
||||
StatusCode::LOCKED => {
|
||||
reqwest::StatusCode::LOCKED => {
|
||||
// We consider this fatal, because it's possible that the operation blocking the control one is
|
||||
// also the one that is waiting for this reconcile. We should let the reconciler calling
|
||||
// this hook fail, to give control plane a chance to un-lock.
|
||||
tracing::info!("Control plane reports tenant is locked, dropping out of notify");
|
||||
Err(NotifyError::Busy)
|
||||
}
|
||||
StatusCode::SERVICE_UNAVAILABLE
|
||||
| StatusCode::GATEWAY_TIMEOUT
|
||||
| StatusCode::BAD_GATEWAY => Err(NotifyError::Unavailable(response.status())),
|
||||
StatusCode::BAD_REQUEST | StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
|
||||
Err(NotifyError::Fatal(response.status()))
|
||||
reqwest::StatusCode::SERVICE_UNAVAILABLE => {
|
||||
Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE))
|
||||
}
|
||||
_ => Err(NotifyError::Unexpected(response.status())),
|
||||
reqwest::StatusCode::GATEWAY_TIMEOUT => {
|
||||
Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT))
|
||||
}
|
||||
reqwest::StatusCode::BAD_GATEWAY => {
|
||||
Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY))
|
||||
}
|
||||
|
||||
reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)),
|
||||
reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)),
|
||||
reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)),
|
||||
status => Err(NotifyError::Unexpected(
|
||||
hyper::StatusCode::from_u16(status.as_u16())
|
||||
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ use crate::metrics::{
|
||||
};
|
||||
use crate::reconciler::ReconcileError;
|
||||
use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
|
||||
use anyhow::Context;
|
||||
use futures::Future;
|
||||
use hyper::header::CONTENT_TYPE;
|
||||
use hyper::{Body, Request, Response};
|
||||
@@ -258,6 +259,12 @@ async fn handle_tenant_time_travel_remote_storage(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
fn map_reqwest_hyper_status(status: reqwest::StatusCode) -> Result<hyper::StatusCode, ApiError> {
|
||||
hyper::StatusCode::from_u16(status.as_u16())
|
||||
.context("invalid status code")
|
||||
.map_err(ApiError::InternalServerError)
|
||||
}
|
||||
|
||||
async fn handle_tenant_secondary_download(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
@@ -266,7 +273,7 @@ async fn handle_tenant_secondary_download(
|
||||
let wait = parse_query_param(&req, "wait_ms")?.map(Duration::from_millis);
|
||||
|
||||
let (status, progress) = service.tenant_secondary_download(tenant_id, wait).await?;
|
||||
json_response(status, progress)
|
||||
json_response(map_reqwest_hyper_status(status)?, progress)
|
||||
}
|
||||
|
||||
async fn handle_tenant_delete(
|
||||
@@ -277,7 +284,10 @@ async fn handle_tenant_delete(
|
||||
check_permissions(&req, Scope::PageServerApi)?;
|
||||
|
||||
deletion_wrapper(service, move |service| async move {
|
||||
service.tenant_delete(tenant_id).await
|
||||
service
|
||||
.tenant_delete(tenant_id)
|
||||
.await
|
||||
.and_then(map_reqwest_hyper_status)
|
||||
})
|
||||
.await
|
||||
}
|
||||
@@ -308,7 +318,10 @@ async fn handle_tenant_timeline_delete(
|
||||
let timeline_id: TimelineId = parse_request_param(&req, "timeline_id")?;
|
||||
|
||||
deletion_wrapper(service, move |service| async move {
|
||||
service.tenant_timeline_delete(tenant_id, timeline_id).await
|
||||
service
|
||||
.tenant_timeline_delete(tenant_id, timeline_id)
|
||||
.await
|
||||
.and_then(map_reqwest_hyper_status)
|
||||
})
|
||||
.await
|
||||
}
|
||||
@@ -371,11 +384,9 @@ async fn handle_tenant_timeline_passthrough(
|
||||
}
|
||||
|
||||
// We have a reqest::Response, would like a http::Response
|
||||
let mut builder = hyper::Response::builder()
|
||||
.status(resp.status())
|
||||
.version(resp.version());
|
||||
let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
|
||||
for (k, v) in resp.headers() {
|
||||
builder = builder.header(k, v);
|
||||
builder = builder.header(k.as_str(), v.as_bytes());
|
||||
}
|
||||
|
||||
let response = builder
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::{str::FromStr, time::Duration};
|
||||
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, NodeSchedulingPolicy,
|
||||
@@ -9,6 +8,7 @@ use pageserver_api::{
|
||||
shard::TenantShardId,
|
||||
};
|
||||
use pageserver_client::mgmt_api;
|
||||
use reqwest::StatusCode;
|
||||
use serde::Serialize;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{backoff, id::NodeId};
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
use crate::pageserver_client::PageserverClient;
|
||||
use crate::persistence::Persistence;
|
||||
use crate::service;
|
||||
use hyper::StatusCode;
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
|
||||
};
|
||||
use pageserver_api::shard::{ShardIdentity, TenantShardId};
|
||||
use pageserver_client::mgmt_api;
|
||||
use reqwest::StatusCode;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@@ -21,7 +21,6 @@ use control_plane::storage_controller::{
|
||||
};
|
||||
use diesel::result::DatabaseErrorKind;
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use hyper::StatusCode;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::{
|
||||
controller_api::{
|
||||
@@ -33,6 +32,7 @@ use pageserver_api::{
|
||||
},
|
||||
models::{SecondaryProgress, TenantConfigRequest},
|
||||
};
|
||||
use reqwest::StatusCode;
|
||||
|
||||
use crate::pageserver_client::PageserverClient;
|
||||
use pageserver_api::{
|
||||
|
||||
Reference in New Issue
Block a user