mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 19:40:39 +00:00
Compare commits
14 Commits
rc/release
...
replica_re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d05cefaae1 | ||
|
|
fad69227d6 | ||
|
|
4a397638bf | ||
|
|
b34648c136 | ||
|
|
24038033bf | ||
|
|
1b935b1958 | ||
|
|
3f16ca2c18 | ||
|
|
67b94c5992 | ||
|
|
e38193c530 | ||
|
|
21949137ed | ||
|
|
02f94edb60 | ||
|
|
58327ef74d | ||
|
|
73be6bb736 | ||
|
|
40d7583906 |
16
Cargo.lock
generated
16
Cargo.lock
generated
@@ -753,6 +753,7 @@ dependencies = [
|
||||
"axum",
|
||||
"axum-core",
|
||||
"bytes",
|
||||
"form_urlencoded",
|
||||
"futures-util",
|
||||
"headers",
|
||||
"http 1.1.0",
|
||||
@@ -761,6 +762,8 @@ dependencies = [
|
||||
"mime",
|
||||
"pin-project-lite",
|
||||
"serde",
|
||||
"serde_html_form",
|
||||
"serde_path_to_error",
|
||||
"tower 0.5.2",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
@@ -6422,6 +6425,19 @@ dependencies = [
|
||||
"syn 2.0.100",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_html_form"
|
||||
version = "0.2.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9d2de91cf02bbc07cde38891769ccd5d4f073d22a40683aa4bc7a95781aaa2c4"
|
||||
dependencies = [
|
||||
"form_urlencoded",
|
||||
"indexmap 2.9.0",
|
||||
"itoa",
|
||||
"ryu",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.125"
|
||||
|
||||
@@ -71,7 +71,7 @@ aws-credential-types = "1.2.0"
|
||||
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
|
||||
aws-types = "1.3"
|
||||
axum = { version = "0.8.1", features = ["ws"] }
|
||||
axum-extra = { version = "0.10.0", features = ["typed-header"] }
|
||||
axum-extra = { version = "0.10.0", features = ["typed-header", "query"] }
|
||||
base64 = "0.13.0"
|
||||
bincode = "1.3"
|
||||
bindgen = "0.71"
|
||||
|
||||
@@ -785,7 +785,7 @@ impl ComputeNode {
|
||||
self.spawn_extension_stats_task();
|
||||
|
||||
if pspec.spec.autoprewarm {
|
||||
self.prewarm_lfc();
|
||||
self.prewarm_lfc(None);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -25,11 +25,16 @@ struct EndpointStoragePair {
|
||||
}
|
||||
|
||||
const KEY: &str = "lfc_state";
|
||||
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
|
||||
type Error = anyhow::Error;
|
||||
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
|
||||
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
|
||||
bail!("pspec.endpoint_id missing")
|
||||
impl EndpointStoragePair {
|
||||
/// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
|
||||
/// If not None, takes precedence over pspec.spec.endpoint_id
|
||||
fn from_spec_and_endpoint(
|
||||
pspec: &crate::compute::ParsedSpec,
|
||||
endpoint_id: Option<String>,
|
||||
) -> Result<Self> {
|
||||
let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
|
||||
let Some(ref endpoint_id) = endpoint_id else {
|
||||
bail!("pspec.endpoint_id missing, other endpoint_id not provided")
|
||||
};
|
||||
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
|
||||
bail!("pspec.endpoint_storage_addr missing")
|
||||
@@ -84,7 +89,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
/// Returns false if there is a prewarm request ongoing, true otherwise
|
||||
pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
|
||||
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
|
||||
crate::metrics::LFC_PREWARM_REQUESTS.inc();
|
||||
{
|
||||
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
|
||||
@@ -97,7 +102,7 @@ impl ComputeNode {
|
||||
|
||||
let cloned = self.clone();
|
||||
spawn(async move {
|
||||
let Err(err) = cloned.prewarm_impl().await else {
|
||||
let Err(err) = cloned.prewarm_impl(from_endpoint).await else {
|
||||
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
|
||||
return;
|
||||
};
|
||||
@@ -109,13 +114,14 @@ impl ComputeNode {
|
||||
true
|
||||
}
|
||||
|
||||
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
|
||||
/// from_endpoint: None for endpoint managed by this compute_ctl
|
||||
fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
|
||||
let state = self.state.lock().unwrap();
|
||||
state.pspec.as_ref().unwrap().try_into()
|
||||
EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
|
||||
}
|
||||
|
||||
async fn prewarm_impl(&self) -> Result<()> {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
|
||||
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<()> {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
|
||||
info!(%url, "requesting LFC state from endpoint storage");
|
||||
|
||||
let request = Client::new().get(&url).bearer_auth(token);
|
||||
@@ -173,7 +179,7 @@ impl ComputeNode {
|
||||
}
|
||||
|
||||
async fn offload_lfc_impl(&self) -> Result<()> {
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
|
||||
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
|
||||
info!(%url, "requesting LFC state from postgres");
|
||||
|
||||
let mut compressed = Vec::new();
|
||||
|
||||
@@ -2,6 +2,7 @@ use crate::compute_prewarm::LfcPrewarmStateWithProgress;
|
||||
use crate::http::JsonResponse;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{Json, http::StatusCode};
|
||||
use axum_extra::extract::OptionalQuery;
|
||||
use compute_api::responses::LfcOffloadState;
|
||||
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
|
||||
|
||||
@@ -16,8 +17,16 @@ pub(in crate::http) async fn offload_state(compute: Compute) -> Json<LfcOffloadS
|
||||
Json(compute.lfc_offload_state())
|
||||
}
|
||||
|
||||
pub(in crate::http) async fn prewarm(compute: Compute) -> Response {
|
||||
if compute.prewarm_lfc() {
|
||||
#[derive(serde::Deserialize)]
|
||||
pub struct PrewarmQuery {
|
||||
pub from_endpoint: String,
|
||||
}
|
||||
|
||||
pub(in crate::http) async fn prewarm(
|
||||
compute: Compute,
|
||||
OptionalQuery(query): OptionalQuery<PrewarmQuery>,
|
||||
) -> Response {
|
||||
if compute.prewarm_lfc(query.map(|q| q.from_endpoint)) {
|
||||
StatusCode::ACCEPTED.into_response()
|
||||
} else {
|
||||
JsonResponse::error(
|
||||
|
||||
@@ -1022,6 +1022,7 @@ impl RemoteStorage for S3Bucket {
|
||||
let Version { key, .. } = &vd;
|
||||
let version_id = vd.version_id().map(|v| v.0.as_str());
|
||||
if version_id == Some("null") {
|
||||
// TODO: check the behavior of using the SDK on a non-versioned container
|
||||
return Err(TimeTravelError::Other(anyhow!(
|
||||
"Received ListVersions response for key={key} with version_id='null', \
|
||||
indicating either disabled versioning, or legacy objects with null version id values"
|
||||
|
||||
@@ -573,7 +573,8 @@ fn start_pageserver(
|
||||
tokio::sync::mpsc::unbounded_channel();
|
||||
let deletion_queue_client = deletion_queue.new_client();
|
||||
let background_purges = mgr::BackgroundPurges::default();
|
||||
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
|
||||
|
||||
let tenant_manager = mgr::init(
|
||||
conf,
|
||||
background_purges.clone(),
|
||||
TenantSharedResources {
|
||||
@@ -584,10 +585,10 @@ fn start_pageserver(
|
||||
basebackup_prepare_sender,
|
||||
feature_resolver,
|
||||
},
|
||||
order,
|
||||
shutdown_pageserver.clone(),
|
||||
))?;
|
||||
);
|
||||
let tenant_manager = Arc::new(tenant_manager);
|
||||
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(tenant_manager.clone(), order))?;
|
||||
|
||||
let basebackup_cache = BasebackupCache::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||
|
||||
use pageserver_api::config::NodeMetadata;
|
||||
use posthog_client_lite::{
|
||||
CaptureEvent, FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
|
||||
PostHogFlagFilterPropertyValue,
|
||||
@@ -86,7 +87,35 @@ impl FeatureResolver {
|
||||
}
|
||||
}
|
||||
}
|
||||
// TODO: add pageserver URL.
|
||||
// TODO: move this to a background task so that we don't block startup in case of slow disk
|
||||
let metadata_path = conf.metadata_path();
|
||||
match std::fs::read_to_string(&metadata_path) {
|
||||
Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
|
||||
Ok(metadata) => {
|
||||
properties.insert(
|
||||
"hostname".to_string(),
|
||||
PostHogFlagFilterPropertyValue::String(metadata.http_host),
|
||||
);
|
||||
if let Some(cplane_region) = metadata.other.get("region_id") {
|
||||
if let Some(cplane_region) = cplane_region.as_str() {
|
||||
// This region contains the cell number
|
||||
properties.insert(
|
||||
"neon_region".to_string(),
|
||||
PostHogFlagFilterPropertyValue::String(
|
||||
cplane_region.to_string(),
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to parse metadata.json: {}", e);
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to read metadata.json: {}", e);
|
||||
}
|
||||
}
|
||||
Arc::new(properties)
|
||||
};
|
||||
let fake_tenants = {
|
||||
|
||||
@@ -12,7 +12,6 @@ use anyhow::Context;
|
||||
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
|
||||
use futures::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::{DetachBehavior, LocationConfigMode};
|
||||
use pageserver_api::shard::{
|
||||
@@ -103,7 +102,7 @@ pub(crate) enum TenantsMap {
|
||||
/// [`init_tenant_mgr`] is not done yet.
|
||||
Initializing,
|
||||
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
|
||||
/// New tenants can be added using [`tenant_map_acquire_slot`].
|
||||
/// New tenants can be added using [`TenantManager::tenant_map_acquire_slot`].
|
||||
Open(BTreeMap<TenantShardId, TenantSlot>),
|
||||
/// The pageserver has entered shutdown mode via [`TenantManager::shutdown`].
|
||||
/// Existing tenants are still accessible, but no new tenants can be created.
|
||||
@@ -284,9 +283,6 @@ impl BackgroundPurges {
|
||||
}
|
||||
}
|
||||
|
||||
static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
|
||||
Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
|
||||
|
||||
/// Responsible for storing and mutating the collection of all tenants
|
||||
/// that this pageserver has state for.
|
||||
///
|
||||
@@ -297,10 +293,7 @@ static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
|
||||
/// and attached modes concurrently.
|
||||
pub struct TenantManager {
|
||||
conf: &'static PageServerConf,
|
||||
// TODO: currently this is a &'static pointing to TENANTs. When we finish refactoring
|
||||
// out of that static variable, the TenantManager can own this.
|
||||
// See https://github.com/neondatabase/neon/issues/5796
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
tenants: std::sync::RwLock<TenantsMap>,
|
||||
resources: TenantSharedResources,
|
||||
|
||||
// Long-running operations that happen outside of a [`Tenant`] lifetime should respect this token.
|
||||
@@ -479,21 +472,43 @@ pub(crate) enum DeleteTenantError {
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
/// Initialize repositories with locally available timelines.
|
||||
/// Initialize repositories at `Initializing` state.
|
||||
pub fn init(
|
||||
conf: &'static PageServerConf,
|
||||
background_purges: BackgroundPurges,
|
||||
resources: TenantSharedResources,
|
||||
cancel: CancellationToken,
|
||||
) -> TenantManager {
|
||||
TenantManager {
|
||||
conf,
|
||||
tenants: std::sync::RwLock::new(TenantsMap::Initializing),
|
||||
resources,
|
||||
cancel,
|
||||
background_purges,
|
||||
}
|
||||
}
|
||||
|
||||
/// Transition repositories from `Initializing` state to `Open` state with locally available timelines.
|
||||
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
|
||||
/// are scheduled for download and added to the tenant once download is completed.
|
||||
#[instrument(skip_all)]
|
||||
pub async fn init_tenant_mgr(
|
||||
conf: &'static PageServerConf,
|
||||
background_purges: BackgroundPurges,
|
||||
resources: TenantSharedResources,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
init_order: InitializationOrder,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<TenantManager> {
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert!(matches!(
|
||||
*tenant_manager.tenants.read().unwrap(),
|
||||
TenantsMap::Initializing
|
||||
));
|
||||
let mut tenants = BTreeMap::new();
|
||||
|
||||
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
|
||||
|
||||
let conf = tenant_manager.conf;
|
||||
let resources = &tenant_manager.resources;
|
||||
let cancel = &tenant_manager.cancel;
|
||||
let background_purges = &tenant_manager.background_purges;
|
||||
|
||||
// Initialize dynamic limits that depend on system resources
|
||||
let system_memory =
|
||||
sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory())
|
||||
@@ -512,7 +527,7 @@ pub async fn init_tenant_mgr(
|
||||
let tenant_configs = init_load_tenant_configs(conf).await;
|
||||
|
||||
// Determine which tenants are to be secondary or attached, and in which generation
|
||||
let tenant_modes = init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
|
||||
let tenant_modes = init_load_generations(conf, &tenant_configs, resources, cancel).await?;
|
||||
|
||||
tracing::info!(
|
||||
"Attaching {} tenants at startup, warming up {} at a time",
|
||||
@@ -669,18 +684,10 @@ pub async fn init_tenant_mgr(
|
||||
|
||||
info!("Processed {} local tenants at startup", tenants.len());
|
||||
|
||||
let mut tenants_map = TENANTS.write().unwrap();
|
||||
assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
|
||||
let mut tenant_map = tenant_manager.tenants.write().unwrap();
|
||||
*tenant_map = TenantsMap::Open(tenants);
|
||||
|
||||
*tenants_map = TenantsMap::Open(tenants);
|
||||
|
||||
Ok(TenantManager {
|
||||
conf,
|
||||
tenants: &TENANTS,
|
||||
resources,
|
||||
cancel: CancellationToken::new(),
|
||||
background_purges,
|
||||
})
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Wrapper for Tenant::spawn that checks invariants before running
|
||||
@@ -719,142 +726,6 @@ fn tenant_spawn(
|
||||
)
|
||||
}
|
||||
|
||||
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
#[cfg(all(debug_assertions, not(test)))]
|
||||
{
|
||||
// Check that our metrics properly tracked the size of the tenants map. This is a convenient location to check,
|
||||
// as it happens implicitly at the end of tests etc.
|
||||
let m = tenants.read().unwrap();
|
||||
debug_assert_eq!(METRICS.slots_total(), m.len() as u64);
|
||||
}
|
||||
|
||||
// Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
|
||||
let (total_in_progress, total_attached) = {
|
||||
let mut m = tenants.write().unwrap();
|
||||
match &mut *m {
|
||||
TenantsMap::Initializing => {
|
||||
*m = TenantsMap::ShuttingDown(BTreeMap::default());
|
||||
info!("tenants map is empty");
|
||||
return;
|
||||
}
|
||||
TenantsMap::Open(tenants) => {
|
||||
let mut shutdown_state = BTreeMap::new();
|
||||
let mut total_in_progress = 0;
|
||||
let mut total_attached = 0;
|
||||
|
||||
for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
|
||||
match v {
|
||||
TenantSlot::Attached(t) => {
|
||||
shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
|
||||
join_set.spawn(
|
||||
async move {
|
||||
let res = {
|
||||
let (_guard, shutdown_progress) = completion::channel();
|
||||
t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await
|
||||
};
|
||||
|
||||
if let Err(other_progress) = res {
|
||||
// join the another shutdown in progress
|
||||
other_progress.wait().await;
|
||||
}
|
||||
|
||||
// we cannot afford per tenant logging here, because if s3 is degraded, we are
|
||||
// going to log too many lines
|
||||
debug!("tenant successfully stopped");
|
||||
}
|
||||
.instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
|
||||
);
|
||||
|
||||
total_attached += 1;
|
||||
}
|
||||
TenantSlot::Secondary(state) => {
|
||||
// We don't need to wait for this individually per-tenant: the
|
||||
// downloader task will be waited on eventually, this cancel
|
||||
// is just to encourage it to drop out if it is doing work
|
||||
// for this tenant right now.
|
||||
state.cancel.cancel();
|
||||
|
||||
shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
|
||||
}
|
||||
TenantSlot::InProgress(notify) => {
|
||||
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
|
||||
// wait for their notifications to fire in this function.
|
||||
join_set.spawn(async move {
|
||||
notify.wait().await;
|
||||
});
|
||||
|
||||
total_in_progress += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
*m = TenantsMap::ShuttingDown(shutdown_state);
|
||||
(total_in_progress, total_attached)
|
||||
}
|
||||
TenantsMap::ShuttingDown(_) => {
|
||||
error!(
|
||||
"already shutting down, this function isn't supposed to be called more than once"
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
info!(
|
||||
"Waiting for {} InProgress tenants and {} Attached tenants to shut down",
|
||||
total_in_progress, total_attached
|
||||
);
|
||||
|
||||
let total = join_set.len();
|
||||
let mut panicked = 0;
|
||||
let mut buffering = true;
|
||||
const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
|
||||
let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
|
||||
|
||||
while !join_set.is_empty() {
|
||||
tokio::select! {
|
||||
Some(joined) = join_set.join_next() => {
|
||||
match joined {
|
||||
Ok(()) => {},
|
||||
Err(join_error) if join_error.is_cancelled() => {
|
||||
unreachable!("we are not cancelling any of the tasks");
|
||||
}
|
||||
Err(join_error) if join_error.is_panic() => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
panicked += 1;
|
||||
}
|
||||
Err(join_error) => {
|
||||
warn!("unknown kind of JoinError: {join_error}");
|
||||
}
|
||||
}
|
||||
if !buffering {
|
||||
// buffer so that every 500ms since the first update (or starting) we'll log
|
||||
// how far away we are; this is because we will get SIGKILL'd at 10s, and we
|
||||
// are not able to log *then*.
|
||||
buffering = true;
|
||||
buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
|
||||
}
|
||||
},
|
||||
_ = &mut buffered, if buffering => {
|
||||
buffering = false;
|
||||
info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if panicked > 0 {
|
||||
warn!(
|
||||
panicked,
|
||||
total, "observed panicks while shutting down tenants"
|
||||
);
|
||||
}
|
||||
|
||||
// caller will log how long we took
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum UpsertLocationError {
|
||||
#[error("Bad config request: {0}")]
|
||||
@@ -1056,7 +927,8 @@ impl TenantManager {
|
||||
// the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
|
||||
// the state is ill-defined while we're in transition. Transitions are async, but fast: we do
|
||||
// not do significant I/O, and shutdowns should be prompt via cancellation tokens.
|
||||
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
|
||||
let mut slot_guard = self
|
||||
.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
|
||||
.map_err(|e| match e {
|
||||
TenantSlotError::NotFound(_) => {
|
||||
unreachable!("Called with mode Any")
|
||||
@@ -1223,6 +1095,75 @@ impl TenantManager {
|
||||
}
|
||||
}
|
||||
|
||||
fn tenant_map_acquire_slot(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
mode: TenantSlotAcquireMode,
|
||||
) -> Result<SlotGuard, TenantSlotError> {
|
||||
use TenantSlotAcquireMode::*;
|
||||
METRICS.tenant_slot_writes.inc();
|
||||
|
||||
let mut locked = self.tenants.write().unwrap();
|
||||
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
|
||||
let _guard = span.enter();
|
||||
|
||||
let m = match &mut *locked {
|
||||
TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
|
||||
TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
|
||||
TenantsMap::Open(m) => m,
|
||||
};
|
||||
|
||||
use std::collections::btree_map::Entry;
|
||||
|
||||
let entry = m.entry(*tenant_shard_id);
|
||||
|
||||
match entry {
|
||||
Entry::Vacant(v) => match mode {
|
||||
MustExist => {
|
||||
tracing::debug!("Vacant && MustExist: return NotFound");
|
||||
Err(TenantSlotError::NotFound(*tenant_shard_id))
|
||||
}
|
||||
_ => {
|
||||
let (completion, barrier) = utils::completion::channel();
|
||||
let inserting = TenantSlot::InProgress(barrier);
|
||||
METRICS.slot_inserted(&inserting);
|
||||
v.insert(inserting);
|
||||
tracing::debug!("Vacant, inserted InProgress");
|
||||
Ok(SlotGuard::new(
|
||||
*tenant_shard_id,
|
||||
None,
|
||||
completion,
|
||||
&self.tenants,
|
||||
))
|
||||
}
|
||||
},
|
||||
Entry::Occupied(mut o) => {
|
||||
// Apply mode-driven checks
|
||||
match (o.get(), mode) {
|
||||
(TenantSlot::InProgress(_), _) => {
|
||||
tracing::debug!("Occupied, failing for InProgress");
|
||||
Err(TenantSlotError::InProgress)
|
||||
}
|
||||
_ => {
|
||||
// Happy case: the slot was not in any state that violated our mode
|
||||
let (completion, barrier) = utils::completion::channel();
|
||||
let in_progress = TenantSlot::InProgress(barrier);
|
||||
METRICS.slot_inserted(&in_progress);
|
||||
let old_value = o.insert(in_progress);
|
||||
METRICS.slot_removed(&old_value);
|
||||
tracing::debug!("Occupied, replaced with InProgress");
|
||||
Ok(SlotGuard::new(
|
||||
*tenant_shard_id,
|
||||
Some(old_value),
|
||||
completion,
|
||||
&self.tenants,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
|
||||
/// LocationConf that was last used to attach it. Optionally, the local file cache may be
|
||||
/// dropped before re-attaching.
|
||||
@@ -1239,7 +1180,8 @@ impl TenantManager {
|
||||
drop_cache: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
let mut slot_guard =
|
||||
self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
let Some(old_slot) = slot_guard.get_old_value() else {
|
||||
anyhow::bail!("Tenant not found when trying to reset");
|
||||
};
|
||||
@@ -1388,7 +1330,8 @@ impl TenantManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
let slot_guard =
|
||||
self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
match &slot_guard.old_value {
|
||||
Some(TenantSlot::Attached(tenant)) => {
|
||||
// Legacy deletion flow: the tenant remains attached, goes to Stopping state, and
|
||||
@@ -1539,7 +1482,7 @@ impl TenantManager {
|
||||
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
|
||||
drop(tenant);
|
||||
let mut parent_slot_guard =
|
||||
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
let parent = match parent_slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(t)) => t,
|
||||
Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"),
|
||||
@@ -1843,7 +1786,145 @@ impl TenantManager {
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
self.cancel.cancel();
|
||||
|
||||
shutdown_all_tenants0(self.tenants).await
|
||||
self.shutdown_all_tenants0().await
|
||||
}
|
||||
|
||||
async fn shutdown_all_tenants0(&self) {
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
#[cfg(all(debug_assertions, not(test)))]
|
||||
{
|
||||
// Check that our metrics properly tracked the size of the tenants map. This is a convenient location to check,
|
||||
// as it happens implicitly at the end of tests etc.
|
||||
let m = self.tenants.read().unwrap();
|
||||
debug_assert_eq!(METRICS.slots_total(), m.len() as u64);
|
||||
}
|
||||
|
||||
// Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
|
||||
let (total_in_progress, total_attached) = {
|
||||
let mut m = self.tenants.write().unwrap();
|
||||
match &mut *m {
|
||||
TenantsMap::Initializing => {
|
||||
*m = TenantsMap::ShuttingDown(BTreeMap::default());
|
||||
info!("tenants map is empty");
|
||||
return;
|
||||
}
|
||||
TenantsMap::Open(tenants) => {
|
||||
let mut shutdown_state = BTreeMap::new();
|
||||
let mut total_in_progress = 0;
|
||||
let mut total_attached = 0;
|
||||
|
||||
for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
|
||||
match v {
|
||||
TenantSlot::Attached(t) => {
|
||||
shutdown_state
|
||||
.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
|
||||
join_set.spawn(
|
||||
async move {
|
||||
let res = {
|
||||
let (_guard, shutdown_progress) = completion::channel();
|
||||
t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await
|
||||
};
|
||||
|
||||
if let Err(other_progress) = res {
|
||||
// join the another shutdown in progress
|
||||
other_progress.wait().await;
|
||||
}
|
||||
|
||||
// we cannot afford per tenant logging here, because if s3 is degraded, we are
|
||||
// going to log too many lines
|
||||
debug!("tenant successfully stopped");
|
||||
}
|
||||
.instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
|
||||
);
|
||||
|
||||
total_attached += 1;
|
||||
}
|
||||
TenantSlot::Secondary(state) => {
|
||||
// We don't need to wait for this individually per-tenant: the
|
||||
// downloader task will be waited on eventually, this cancel
|
||||
// is just to encourage it to drop out if it is doing work
|
||||
// for this tenant right now.
|
||||
state.cancel.cancel();
|
||||
|
||||
shutdown_state
|
||||
.insert(tenant_shard_id, TenantSlot::Secondary(state));
|
||||
}
|
||||
TenantSlot::InProgress(notify) => {
|
||||
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
|
||||
// wait for their notifications to fire in this function.
|
||||
join_set.spawn(async move {
|
||||
notify.wait().await;
|
||||
});
|
||||
|
||||
total_in_progress += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
*m = TenantsMap::ShuttingDown(shutdown_state);
|
||||
(total_in_progress, total_attached)
|
||||
}
|
||||
TenantsMap::ShuttingDown(_) => {
|
||||
error!(
|
||||
"already shutting down, this function isn't supposed to be called more than once"
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
info!(
|
||||
"Waiting for {} InProgress tenants and {} Attached tenants to shut down",
|
||||
total_in_progress, total_attached
|
||||
);
|
||||
|
||||
let total = join_set.len();
|
||||
let mut panicked = 0;
|
||||
let mut buffering = true;
|
||||
const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
|
||||
let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
|
||||
|
||||
while !join_set.is_empty() {
|
||||
tokio::select! {
|
||||
Some(joined) = join_set.join_next() => {
|
||||
match joined {
|
||||
Ok(()) => {},
|
||||
Err(join_error) if join_error.is_cancelled() => {
|
||||
unreachable!("we are not cancelling any of the tasks");
|
||||
}
|
||||
Err(join_error) if join_error.is_panic() => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
panicked += 1;
|
||||
}
|
||||
Err(join_error) => {
|
||||
warn!("unknown kind of JoinError: {join_error}");
|
||||
}
|
||||
}
|
||||
if !buffering {
|
||||
// buffer so that every 500ms since the first update (or starting) we'll log
|
||||
// how far away we are; this is because we will get SIGKILL'd at 10s, and we
|
||||
// are not able to log *then*.
|
||||
buffering = true;
|
||||
buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
|
||||
}
|
||||
},
|
||||
_ = &mut buffered, if buffering => {
|
||||
buffering = false;
|
||||
info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if panicked > 0 {
|
||||
warn!(
|
||||
panicked,
|
||||
total, "observed panicks while shutting down tenants"
|
||||
);
|
||||
}
|
||||
|
||||
// caller will log how long we took
|
||||
}
|
||||
|
||||
/// Detaches a tenant, and removes its local files asynchronously.
|
||||
@@ -1889,12 +1970,12 @@ impl TenantManager {
|
||||
.map(Some)
|
||||
};
|
||||
|
||||
let mut removal_result = remove_tenant_from_memory(
|
||||
self.tenants,
|
||||
tenant_shard_id,
|
||||
tenant_dir_rename_operation(tenant_shard_id),
|
||||
)
|
||||
.await;
|
||||
let mut removal_result = self
|
||||
.remove_tenant_from_memory(
|
||||
tenant_shard_id,
|
||||
tenant_dir_rename_operation(tenant_shard_id),
|
||||
)
|
||||
.await;
|
||||
|
||||
// If the tenant was not found, it was likely already removed. Attempt to remove the tenant
|
||||
// directory on disk anyway. For example, during shard splits, we shut down and remove the
|
||||
@@ -1948,17 +2029,16 @@ impl TenantManager {
|
||||
) -> Result<HashSet<TimelineId>, detach_ancestor::Error> {
|
||||
use detach_ancestor::Error;
|
||||
|
||||
let slot_guard =
|
||||
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist).map_err(
|
||||
|e| {
|
||||
use TenantSlotError::*;
|
||||
let slot_guard = self
|
||||
.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)
|
||||
.map_err(|e| {
|
||||
use TenantSlotError::*;
|
||||
|
||||
match e {
|
||||
MapState(TenantMapError::ShuttingDown) => Error::ShuttingDown,
|
||||
NotFound(_) | InProgress | MapState(_) => Error::DetachReparent(e.into()),
|
||||
}
|
||||
},
|
||||
)?;
|
||||
match e {
|
||||
MapState(TenantMapError::ShuttingDown) => Error::ShuttingDown,
|
||||
NotFound(_) | InProgress | MapState(_) => Error::DetachReparent(e.into()),
|
||||
}
|
||||
})?;
|
||||
|
||||
let tenant = {
|
||||
let old_slot = slot_guard
|
||||
@@ -2291,6 +2371,80 @@ impl TenantManager {
|
||||
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
|
||||
})
|
||||
}
|
||||
|
||||
/// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
|
||||
/// Allows to remove other tenant resources manually, via `tenant_cleanup`.
|
||||
/// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
|
||||
async fn remove_tenant_from_memory<V, F>(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
tenant_cleanup: F,
|
||||
) -> Result<V, TenantStateError>
|
||||
where
|
||||
F: std::future::Future<Output = anyhow::Result<V>>,
|
||||
{
|
||||
let mut slot_guard =
|
||||
self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
|
||||
|
||||
// allow pageserver shutdown to await for our completion
|
||||
let (_guard, progress) = completion::channel();
|
||||
|
||||
// The SlotGuard allows us to manipulate the Tenant object without fear of some
|
||||
// concurrent API request doing something else for the same tenant ID.
|
||||
let attached_tenant = match slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(tenant)) => {
|
||||
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
|
||||
let shutdown_mode = ShutdownMode::Hard;
|
||||
|
||||
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
|
||||
// that we can continue safely to cleanup.
|
||||
match tenant.shutdown(progress, shutdown_mode).await {
|
||||
Ok(()) => {}
|
||||
Err(_other) => {
|
||||
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
|
||||
// wait for it but return an error right away because these are distinct requests.
|
||||
slot_guard.revert();
|
||||
return Err(TenantStateError::IsStopping(tenant_shard_id));
|
||||
}
|
||||
}
|
||||
Some(tenant)
|
||||
}
|
||||
Some(TenantSlot::Secondary(secondary_state)) => {
|
||||
tracing::info!("Shutting down in secondary mode");
|
||||
secondary_state.shutdown().await;
|
||||
None
|
||||
}
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
// Acquiring a slot guarantees its old value was not InProgress
|
||||
unreachable!();
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
match tenant_cleanup
|
||||
.await
|
||||
.with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
|
||||
{
|
||||
Ok(hook_value) => {
|
||||
// Success: drop the old TenantSlot::Attached.
|
||||
slot_guard
|
||||
.drop_old_value()
|
||||
.expect("We just called shutdown");
|
||||
|
||||
Ok(hook_value)
|
||||
}
|
||||
Err(e) => {
|
||||
// If we had a Tenant, set it to Broken and put it back in the TenantsMap
|
||||
if let Some(attached_tenant) = attached_tenant {
|
||||
attached_tenant.set_broken(e.to_string()).await;
|
||||
}
|
||||
// Leave the broken tenant in the map
|
||||
slot_guard.revert();
|
||||
|
||||
Err(TenantStateError::Other(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
@@ -2455,7 +2609,7 @@ pub(crate) enum TenantMapError {
|
||||
/// this tenant to retry later, or wait for the InProgress state to end.
|
||||
///
|
||||
/// This structure enforces the important invariant that we do not have overlapping
|
||||
/// tasks that will try use local storage for a the same tenant ID: we enforce that
|
||||
/// tasks that will try to use local storage for a the same tenant ID: we enforce that
|
||||
/// the previous contents of a slot have been shut down before the slot can be
|
||||
/// left empty or used for something else
|
||||
///
|
||||
@@ -2468,7 +2622,7 @@ pub(crate) enum TenantMapError {
|
||||
/// The `old_value` may be dropped before the SlotGuard is dropped, by calling
|
||||
/// `drop_old_value`. It is an error to call this without shutting down
|
||||
/// the conents of `old_value`.
|
||||
pub(crate) struct SlotGuard {
|
||||
pub(crate) struct SlotGuard<'a> {
|
||||
tenant_shard_id: TenantShardId,
|
||||
old_value: Option<TenantSlot>,
|
||||
upserted: bool,
|
||||
@@ -2476,19 +2630,23 @@ pub(crate) struct SlotGuard {
|
||||
/// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
|
||||
/// release any waiters as soon as this SlotGuard is dropped.
|
||||
completion: utils::completion::Completion,
|
||||
|
||||
tenants: &'a std::sync::RwLock<TenantsMap>,
|
||||
}
|
||||
|
||||
impl SlotGuard {
|
||||
impl<'a> SlotGuard<'a> {
|
||||
fn new(
|
||||
tenant_shard_id: TenantShardId,
|
||||
old_value: Option<TenantSlot>,
|
||||
completion: utils::completion::Completion,
|
||||
tenants: &'a std::sync::RwLock<TenantsMap>,
|
||||
) -> Self {
|
||||
Self {
|
||||
tenant_shard_id,
|
||||
old_value,
|
||||
upserted: false,
|
||||
completion,
|
||||
tenants,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2512,8 +2670,8 @@ impl SlotGuard {
|
||||
));
|
||||
}
|
||||
|
||||
let replaced = {
|
||||
let mut locked = TENANTS.write().unwrap();
|
||||
let replaced: Option<TenantSlot> = {
|
||||
let mut locked = self.tenants.write().unwrap();
|
||||
|
||||
if let TenantSlot::InProgress(_) = new_value {
|
||||
// It is never expected to try and upsert InProgress via this path: it should
|
||||
@@ -2621,7 +2779,7 @@ impl SlotGuard {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SlotGuard {
|
||||
impl<'a> Drop for SlotGuard<'a> {
|
||||
fn drop(&mut self) {
|
||||
if self.upserted {
|
||||
return;
|
||||
@@ -2629,7 +2787,7 @@ impl Drop for SlotGuard {
|
||||
// Our old value is already shutdown, or it never existed: it is safe
|
||||
// for us to fully release the TenantSlot back into an empty state
|
||||
|
||||
let mut locked = TENANTS.write().unwrap();
|
||||
let mut locked = self.tenants.write().unwrap();
|
||||
|
||||
let m = match &mut *locked {
|
||||
TenantsMap::Initializing => {
|
||||
@@ -2711,151 +2869,6 @@ enum TenantSlotAcquireMode {
|
||||
MustExist,
|
||||
}
|
||||
|
||||
fn tenant_map_acquire_slot(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
mode: TenantSlotAcquireMode,
|
||||
) -> Result<SlotGuard, TenantSlotError> {
|
||||
tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
|
||||
}
|
||||
|
||||
fn tenant_map_acquire_slot_impl(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
tenants: &std::sync::RwLock<TenantsMap>,
|
||||
mode: TenantSlotAcquireMode,
|
||||
) -> Result<SlotGuard, TenantSlotError> {
|
||||
use TenantSlotAcquireMode::*;
|
||||
METRICS.tenant_slot_writes.inc();
|
||||
|
||||
let mut locked = tenants.write().unwrap();
|
||||
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
|
||||
let _guard = span.enter();
|
||||
|
||||
let m = match &mut *locked {
|
||||
TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
|
||||
TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
|
||||
TenantsMap::Open(m) => m,
|
||||
};
|
||||
|
||||
use std::collections::btree_map::Entry;
|
||||
|
||||
let entry = m.entry(*tenant_shard_id);
|
||||
|
||||
match entry {
|
||||
Entry::Vacant(v) => match mode {
|
||||
MustExist => {
|
||||
tracing::debug!("Vacant && MustExist: return NotFound");
|
||||
Err(TenantSlotError::NotFound(*tenant_shard_id))
|
||||
}
|
||||
_ => {
|
||||
let (completion, barrier) = utils::completion::channel();
|
||||
let inserting = TenantSlot::InProgress(barrier);
|
||||
METRICS.slot_inserted(&inserting);
|
||||
v.insert(inserting);
|
||||
tracing::debug!("Vacant, inserted InProgress");
|
||||
Ok(SlotGuard::new(*tenant_shard_id, None, completion))
|
||||
}
|
||||
},
|
||||
Entry::Occupied(mut o) => {
|
||||
// Apply mode-driven checks
|
||||
match (o.get(), mode) {
|
||||
(TenantSlot::InProgress(_), _) => {
|
||||
tracing::debug!("Occupied, failing for InProgress");
|
||||
Err(TenantSlotError::InProgress)
|
||||
}
|
||||
_ => {
|
||||
// Happy case: the slot was not in any state that violated our mode
|
||||
let (completion, barrier) = utils::completion::channel();
|
||||
let in_progress = TenantSlot::InProgress(barrier);
|
||||
METRICS.slot_inserted(&in_progress);
|
||||
let old_value = o.insert(in_progress);
|
||||
METRICS.slot_removed(&old_value);
|
||||
tracing::debug!("Occupied, replaced with InProgress");
|
||||
Ok(SlotGuard::new(
|
||||
*tenant_shard_id,
|
||||
Some(old_value),
|
||||
completion,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
|
||||
/// Allows to remove other tenant resources manually, via `tenant_cleanup`.
|
||||
/// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
|
||||
/// operation would be needed to remove it.
|
||||
async fn remove_tenant_from_memory<V, F>(
|
||||
tenants: &std::sync::RwLock<TenantsMap>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
tenant_cleanup: F,
|
||||
) -> Result<V, TenantStateError>
|
||||
where
|
||||
F: std::future::Future<Output = anyhow::Result<V>>,
|
||||
{
|
||||
let mut slot_guard =
|
||||
tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
|
||||
|
||||
// allow pageserver shutdown to await for our completion
|
||||
let (_guard, progress) = completion::channel();
|
||||
|
||||
// The SlotGuard allows us to manipulate the Tenant object without fear of some
|
||||
// concurrent API request doing something else for the same tenant ID.
|
||||
let attached_tenant = match slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(tenant)) => {
|
||||
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
|
||||
let shutdown_mode = ShutdownMode::Hard;
|
||||
|
||||
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
|
||||
// that we can continue safely to cleanup.
|
||||
match tenant.shutdown(progress, shutdown_mode).await {
|
||||
Ok(()) => {}
|
||||
Err(_other) => {
|
||||
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
|
||||
// wait for it but return an error right away because these are distinct requests.
|
||||
slot_guard.revert();
|
||||
return Err(TenantStateError::IsStopping(tenant_shard_id));
|
||||
}
|
||||
}
|
||||
Some(tenant)
|
||||
}
|
||||
Some(TenantSlot::Secondary(secondary_state)) => {
|
||||
tracing::info!("Shutting down in secondary mode");
|
||||
secondary_state.shutdown().await;
|
||||
None
|
||||
}
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
// Acquiring a slot guarantees its old value was not InProgress
|
||||
unreachable!();
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
match tenant_cleanup
|
||||
.await
|
||||
.with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
|
||||
{
|
||||
Ok(hook_value) => {
|
||||
// Success: drop the old TenantSlot::Attached.
|
||||
slot_guard
|
||||
.drop_old_value()
|
||||
.expect("We just called shutdown");
|
||||
|
||||
Ok(hook_value)
|
||||
}
|
||||
Err(e) => {
|
||||
// If we had a Tenant, set it to Broken and put it back in the TenantsMap
|
||||
if let Some(attached_tenant) = attached_tenant {
|
||||
attached_tenant.set_broken(e.to_string()).await;
|
||||
}
|
||||
// Leave the broken tenant in the map
|
||||
slot_guard.revert();
|
||||
|
||||
Err(TenantStateError::Other(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use http_utils::error::ApiError;
|
||||
use pageserver_api::models::TimelineGcRequest;
|
||||
|
||||
@@ -2866,11 +2879,15 @@ mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tracing::Instrument;
|
||||
|
||||
use super::super::harness::TenantHarness;
|
||||
use super::TenantsMap;
|
||||
use crate::tenant::mgr::TenantSlot;
|
||||
use crate::tenant::{
|
||||
TenantSharedResources,
|
||||
mgr::{BackgroundPurges, TenantManager, TenantSlot},
|
||||
};
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn shutdown_awaits_in_progress_tenant() {
|
||||
@@ -2891,23 +2908,47 @@ mod tests {
|
||||
let _e = span.enter();
|
||||
|
||||
let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
|
||||
let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
|
||||
|
||||
// Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
|
||||
// permit it to proceed: that will stick the tenant in InProgress
|
||||
|
||||
let (basebackup_prepare_sender, _) = tokio::sync::mpsc::unbounded_channel::<
|
||||
crate::basebackup_cache::BasebackupPrepareRequest,
|
||||
>();
|
||||
|
||||
let tenant_manager = TenantManager {
|
||||
tenants: std::sync::RwLock::new(TenantsMap::Open(tenants)),
|
||||
conf: h.conf,
|
||||
resources: TenantSharedResources {
|
||||
broker_client: BrokerClientChannel::connect_lazy("foobar.com")
|
||||
.await
|
||||
.unwrap(),
|
||||
remote_storage: h.remote_storage.clone(),
|
||||
deletion_queue_client: h.deletion_queue.new_client(),
|
||||
l0_flush_global_state: crate::l0_flush::L0FlushGlobalState::new(
|
||||
h.conf.l0_flush.clone(),
|
||||
),
|
||||
basebackup_prepare_sender,
|
||||
feature_resolver: crate::feature_resolver::FeatureResolver::new_disabled(),
|
||||
},
|
||||
cancel: tokio_util::sync::CancellationToken::new(),
|
||||
background_purges: BackgroundPurges::default(),
|
||||
};
|
||||
|
||||
let tenant_manager = Arc::new(tenant_manager);
|
||||
|
||||
let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
|
||||
let (until_cleanup_started, cleanup_started) = utils::completion::channel();
|
||||
let mut remove_tenant_from_memory_task = {
|
||||
let tenant_manager = tenant_manager.clone();
|
||||
let jh = tokio::spawn({
|
||||
let tenants = tenants.clone();
|
||||
async move {
|
||||
let cleanup = async move {
|
||||
drop(until_cleanup_started);
|
||||
can_complete_cleanup.wait().await;
|
||||
anyhow::Ok(())
|
||||
};
|
||||
super::remove_tenant_from_memory(&tenants, id, cleanup).await
|
||||
tenant_manager.remove_tenant_from_memory(id, cleanup).await
|
||||
}
|
||||
.instrument(h.span())
|
||||
});
|
||||
@@ -2920,9 +2961,11 @@ mod tests {
|
||||
let mut shutdown_task = {
|
||||
let (until_shutdown_started, shutdown_started) = utils::completion::channel();
|
||||
|
||||
let tenant_manager = tenant_manager.clone();
|
||||
|
||||
let shutdown_task = tokio::spawn(async move {
|
||||
drop(until_shutdown_started);
|
||||
super::shutdown_all_tenants0(&tenants).await;
|
||||
tenant_manager.shutdown_all_tenants0().await;
|
||||
});
|
||||
|
||||
shutdown_started.wait().await;
|
||||
|
||||
@@ -1092,13 +1092,15 @@ communicator_prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
|
||||
MyPState->ring_last <= ring_index);
|
||||
}
|
||||
|
||||
/* internal version. Returns the ring index */
|
||||
/* Internal version. Returns the ring index of the last block (result of this function is used only
|
||||
* when nblocks==1)
|
||||
*/
|
||||
static uint64
|
||||
prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
|
||||
BlockNumber nblocks, const bits8 *mask,
|
||||
bool is_prefetch)
|
||||
{
|
||||
uint64 min_ring_index;
|
||||
uint64 last_ring_index;
|
||||
PrefetchRequest hashkey;
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
bool any_hits = false;
|
||||
@@ -1122,13 +1124,12 @@ Retry:
|
||||
MyPState->ring_unused - MyPState->ring_receive;
|
||||
MyNeonCounters->getpage_prefetches_buffered =
|
||||
MyPState->n_responses_buffered;
|
||||
last_ring_index = UINT64_MAX;
|
||||
|
||||
min_ring_index = UINT64_MAX;
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
PrefetchRequest *slot = NULL;
|
||||
PrfHashEntry *entry = NULL;
|
||||
uint64 ring_index;
|
||||
neon_request_lsns *lsns;
|
||||
|
||||
if (PointerIsValid(mask) && BITMAP_ISSET(mask, i))
|
||||
@@ -1152,12 +1153,12 @@ Retry:
|
||||
if (entry != NULL)
|
||||
{
|
||||
slot = entry->slot;
|
||||
ring_index = slot->my_ring_index;
|
||||
Assert(slot == GetPrfSlot(ring_index));
|
||||
last_ring_index = slot->my_ring_index;
|
||||
Assert(slot == GetPrfSlot(last_ring_index));
|
||||
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
ring_index < MyPState->ring_unused);
|
||||
Assert(MyPState->ring_last <= last_ring_index &&
|
||||
last_ring_index < MyPState->ring_unused);
|
||||
Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag));
|
||||
|
||||
/*
|
||||
@@ -1169,9 +1170,9 @@ Retry:
|
||||
if (!neon_prefetch_response_usable(lsns, slot))
|
||||
{
|
||||
/* Wait for the old request to finish and discard it */
|
||||
if (!prefetch_wait_for(ring_index))
|
||||
if (!prefetch_wait_for(last_ring_index))
|
||||
goto Retry;
|
||||
prefetch_set_unused(ring_index);
|
||||
prefetch_set_unused(last_ring_index);
|
||||
entry = NULL;
|
||||
slot = NULL;
|
||||
pgBufferUsage.prefetch.expired += 1;
|
||||
@@ -1188,13 +1189,12 @@ Retry:
|
||||
*/
|
||||
if (slot->status == PRFS_TAG_REMAINS)
|
||||
{
|
||||
prefetch_set_unused(ring_index);
|
||||
prefetch_set_unused(last_ring_index);
|
||||
entry = NULL;
|
||||
slot = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
min_ring_index = Min(min_ring_index, ring_index);
|
||||
/* The buffered request is good enough, return that index */
|
||||
if (is_prefetch)
|
||||
pgBufferUsage.prefetch.duplicates++;
|
||||
@@ -1283,12 +1283,12 @@ Retry:
|
||||
* The next buffer pointed to by `ring_unused` is now definitely empty, so
|
||||
* we can insert the new request to it.
|
||||
*/
|
||||
ring_index = MyPState->ring_unused;
|
||||
last_ring_index = MyPState->ring_unused;
|
||||
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
ring_index <= MyPState->ring_unused);
|
||||
Assert(MyPState->ring_last <= last_ring_index &&
|
||||
last_ring_index <= MyPState->ring_unused);
|
||||
|
||||
slot = GetPrfSlotNoCheck(ring_index);
|
||||
slot = GetPrfSlotNoCheck(last_ring_index);
|
||||
|
||||
Assert(slot->status == PRFS_UNUSED);
|
||||
|
||||
@@ -1298,11 +1298,9 @@ Retry:
|
||||
*/
|
||||
slot->buftag = hashkey.buftag;
|
||||
slot->shard_no = get_shard_number(&tag);
|
||||
slot->my_ring_index = ring_index;
|
||||
slot->my_ring_index = last_ring_index;
|
||||
slot->flags = 0;
|
||||
|
||||
min_ring_index = Min(min_ring_index, ring_index);
|
||||
|
||||
if (is_prefetch)
|
||||
MyNeonCounters->getpage_prefetch_requests_total++;
|
||||
else
|
||||
@@ -1315,11 +1313,12 @@ Retry:
|
||||
MyPState->ring_unused - MyPState->ring_receive;
|
||||
|
||||
Assert(any_hits);
|
||||
Assert(last_ring_index != UINT64_MAX);
|
||||
|
||||
Assert(GetPrfSlot(min_ring_index)->status == PRFS_REQUESTED ||
|
||||
GetPrfSlot(min_ring_index)->status == PRFS_RECEIVED);
|
||||
Assert(MyPState->ring_last <= min_ring_index &&
|
||||
min_ring_index < MyPState->ring_unused);
|
||||
Assert(GetPrfSlot(last_ring_index)->status == PRFS_REQUESTED ||
|
||||
GetPrfSlot(last_ring_index)->status == PRFS_RECEIVED);
|
||||
Assert(MyPState->ring_last <= last_ring_index &&
|
||||
last_ring_index < MyPState->ring_unused);
|
||||
|
||||
if (flush_every_n_requests > 0 &&
|
||||
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
|
||||
@@ -1335,7 +1334,7 @@ Retry:
|
||||
MyPState->ring_flush = MyPState->ring_unused;
|
||||
}
|
||||
|
||||
return min_ring_index;
|
||||
return last_ring_index;
|
||||
}
|
||||
|
||||
static bool
|
||||
@@ -2086,9 +2085,6 @@ communicator_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber ba
|
||||
|
||||
start_ts = GetCurrentTimestamp();
|
||||
|
||||
if (RecoveryInProgress() && MyBackendType != B_STARTUP)
|
||||
XLogWaitForReplayOf(reqlsns->request_lsn);
|
||||
|
||||
/*
|
||||
* Try to find prefetched page in the list of received pages.
|
||||
*/
|
||||
|
||||
@@ -2,6 +2,6 @@ DROP FUNCTION IF EXISTS get_prewarm_info(out total_pages integer, out prewarmed_
|
||||
|
||||
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
|
||||
|
||||
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer default 1);
|
||||
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer);
|
||||
|
||||
|
||||
|
||||
@@ -2382,6 +2382,8 @@ get_fsm_physical_block(BlockNumber heapblk)
|
||||
static bool
|
||||
neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
{
|
||||
static XLogRecPtr last_recptr = InvalidXLogRecPtr;
|
||||
static bool last_no_redo_needed;
|
||||
XLogRecPtr end_recptr = record->EndRecPtr;
|
||||
NRelFileInfo rinfo;
|
||||
ForkNumber forknum;
|
||||
@@ -2390,69 +2392,88 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
uint32 hash;
|
||||
LWLock *partitionLock;
|
||||
int buf_id;
|
||||
bool no_redo_needed;
|
||||
bool no_redo_needed = true;
|
||||
|
||||
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
|
||||
return true;
|
||||
|
||||
#if PG_VERSION_NUM < 150000
|
||||
if (!XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno))
|
||||
neon_log(PANIC, "failed to locate backup block with ID %d", block_id);
|
||||
#else
|
||||
XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno);
|
||||
#endif
|
||||
|
||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||
tag.forkNum = forknum;
|
||||
tag.blockNum = blkno;
|
||||
|
||||
hash = BufTableHashCode(&tag);
|
||||
partitionLock = BufMappingPartitionLock(hash);
|
||||
|
||||
/*
|
||||
* Lock the partition of shared_buffers so that it can't be updated
|
||||
* concurrently.
|
||||
*/
|
||||
LWLockAcquire(partitionLock, LW_SHARED);
|
||||
|
||||
/*
|
||||
* Out of an abundance of caution, we always run redo on shared catalogs,
|
||||
* regardless of whether the block is stored in shared buffers. See also
|
||||
* this function's top comment.
|
||||
*/
|
||||
if (!OidIsValid(NInfoGetDbOid(rinfo)))
|
||||
if (last_recptr != end_recptr)
|
||||
{
|
||||
no_redo_needed = false;
|
||||
#if PG_VERSION_NUM < 150000
|
||||
int max_block_id = record->max_block_id;
|
||||
#else
|
||||
int max_block_id = XLogRecMaxBlockId(record);
|
||||
#endif
|
||||
for (int block_id = 0; block_id <= max_block_id && no_redo_needed; block_id++)
|
||||
{
|
||||
if (XLogRecHasBlockRef(record, block_id))
|
||||
{
|
||||
#if PG_VERSION_NUM < 150000
|
||||
if (!XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno))
|
||||
neon_log(PANIC, "failed to locate backup block with ID %d", block_id);
|
||||
#else
|
||||
XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno);
|
||||
#endif
|
||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||
tag.forkNum = forknum;
|
||||
tag.blockNum = blkno;
|
||||
|
||||
hash = BufTableHashCode(&tag);
|
||||
partitionLock = BufMappingPartitionLock(hash);
|
||||
|
||||
/*
|
||||
* Lock the partition of shared_buffers so that it can't be updated
|
||||
* concurrently.
|
||||
*/
|
||||
LWLockAcquire(partitionLock, LW_SHARED);
|
||||
|
||||
/*
|
||||
* Out of an abundance of caution, we always run redo on shared catalogs,
|
||||
* regardless of whether the block is stored in shared buffers. See also
|
||||
* this function's top comment.
|
||||
*/
|
||||
if (!OidIsValid(NInfoGetDbOid(rinfo)))
|
||||
{
|
||||
no_redo_needed = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Try to find the relevant buffer */
|
||||
buf_id = BufTableLookup(&tag, hash);
|
||||
|
||||
no_redo_needed = buf_id < 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* we don't have the buffer in memory, update lwLsn past this record, also
|
||||
* evict page from file cache
|
||||
*/
|
||||
if (no_redo_needed)
|
||||
{
|
||||
neon_set_lwlsn_block(end_recptr, rinfo, forknum, blkno);
|
||||
/*
|
||||
* Redo changes if page exists in LFC.
|
||||
* We should perform this check after assigning LwLSN to prevent
|
||||
* prefetching of some older version of the page by some other backend.
|
||||
*/
|
||||
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
|
||||
}
|
||||
|
||||
LWLockRelease(partitionLock);
|
||||
|
||||
neon_extend_rel_size(rinfo, forknum, blkno, end_recptr);
|
||||
if (forknum == MAIN_FORKNUM)
|
||||
{
|
||||
neon_extend_rel_size(rinfo, FSM_FORKNUM, get_fsm_physical_block(blkno), end_recptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
last_recptr = end_recptr;
|
||||
last_no_redo_needed = no_redo_needed;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Try to find the relevant buffer */
|
||||
buf_id = BufTableLookup(&tag, hash);
|
||||
|
||||
no_redo_needed = buf_id < 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* we don't have the buffer in memory, update lwLsn past this record, also
|
||||
* evict page from file cache
|
||||
*/
|
||||
if (no_redo_needed)
|
||||
{
|
||||
neon_set_lwlsn_block(end_recptr, rinfo, forknum, blkno);
|
||||
/*
|
||||
* Redo changes if page exists in LFC.
|
||||
* We should perform this check after assigning LwLSN to prevent
|
||||
* prefetching of some older version of the page by some other backend.
|
||||
*/
|
||||
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
|
||||
}
|
||||
|
||||
LWLockRelease(partitionLock);
|
||||
|
||||
neon_extend_rel_size(rinfo, forknum, blkno, end_recptr);
|
||||
if (forknum == MAIN_FORKNUM)
|
||||
{
|
||||
neon_extend_rel_size(rinfo, FSM_FORKNUM, get_fsm_physical_block(blkno), end_recptr);
|
||||
no_redo_needed = last_no_redo_needed;
|
||||
}
|
||||
return no_redo_needed;
|
||||
}
|
||||
|
||||
@@ -1135,7 +1135,7 @@ VotesCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInf
|
||||
wp->propTermStartLsn = sk->voteResponse.flushLsn;
|
||||
wp->donor = sk;
|
||||
}
|
||||
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
|
||||
wp->truncateLsn = Max(sk->voteResponse.truncateLsn, wp->truncateLsn);
|
||||
|
||||
if (n_votes > 0)
|
||||
appendStringInfoString(s, ", ");
|
||||
|
||||
@@ -14,9 +14,9 @@ use crate::context::RequestContext;
|
||||
use crate::control_plane::client::cplane_proxy_v1;
|
||||
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::pglb::connect_compute::ComputeConnectBackend;
|
||||
use crate::pqproto::BeMessage;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::proxy::wake_compute::WakeComputeBackend;
|
||||
use crate::stream::PqStream;
|
||||
use crate::types::RoleName;
|
||||
use crate::{auth, compute, waiters};
|
||||
@@ -109,7 +109,7 @@ impl ConsoleRedirectBackend {
|
||||
pub struct ConsoleRedirectNodeInfo(pub(super) NodeInfo);
|
||||
|
||||
#[async_trait]
|
||||
impl ComputeConnectBackend for ConsoleRedirectNodeInfo {
|
||||
impl WakeComputeBackend for ConsoleRedirectNodeInfo {
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
_ctx: &RequestContext,
|
||||
|
||||
@@ -14,20 +14,21 @@ use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::auth::{self, AuthError, ComputeUserInfoMaybeEndpoint, validate_password_and_exchange};
|
||||
use crate::auth::{self, ComputeUserInfoMaybeEndpoint, validate_password_and_exchange};
|
||||
use crate::cache::Cached;
|
||||
use crate::config::AuthenticationConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::client::ControlPlaneClient;
|
||||
use crate::control_plane::errors::GetAuthInfoError;
|
||||
use crate::control_plane::messages::EndpointRateLimitConfig;
|
||||
use crate::control_plane::{
|
||||
self, AccessBlockerFlags, AuthSecret, CachedNodeInfo, ControlPlaneApi, EndpointAccessControl,
|
||||
RoleAccessControl,
|
||||
};
|
||||
use crate::intern::EndpointIdInt;
|
||||
use crate::pglb::connect_compute::ComputeConnectBackend;
|
||||
use crate::pqproto::BeMessage;
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::proxy::wake_compute::WakeComputeBackend;
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::stream::Stream;
|
||||
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
|
||||
@@ -230,11 +231,8 @@ async fn auth_quirks(
|
||||
config.is_vpc_acccess_proxy,
|
||||
)?;
|
||||
|
||||
let endpoint = EndpointIdInt::from(&info.endpoint);
|
||||
let rate_limit_config = None;
|
||||
if !endpoint_rate_limiter.check(endpoint, rate_limit_config, 1) {
|
||||
return Err(AuthError::too_many_connections());
|
||||
}
|
||||
access_controls.connection_attempt_rate_limit(ctx, &info.endpoint, &endpoint_rate_limiter)?;
|
||||
|
||||
let role_access = api
|
||||
.get_role_access_control(ctx, &info.endpoint, &info.user)
|
||||
.await?;
|
||||
@@ -401,19 +399,20 @@ impl Backend<'_, ComputeUserInfo> {
|
||||
allowed_ips: Arc::new(vec![]),
|
||||
allowed_vpce: Arc::new(vec![]),
|
||||
flags: AccessBlockerFlags::default(),
|
||||
rate_limits: EndpointRateLimitConfig::default(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ComputeConnectBackend for Backend<'_, ComputeCredentials> {
|
||||
impl WakeComputeBackend for Backend<'_, ComputeUserInfo> {
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
|
||||
match self {
|
||||
Self::ControlPlane(api, creds) => api.wake_compute(ctx, &creds.info).await,
|
||||
Self::ControlPlane(api, info) => api.wake_compute(ctx, info).await,
|
||||
Self::Local(local) => Ok(Cached::new_uncached(local.node_info.clone())),
|
||||
}
|
||||
}
|
||||
@@ -439,6 +438,7 @@ mod tests {
|
||||
use crate::auth::{ComputeUserInfoMaybeEndpoint, IpPattern};
|
||||
use crate::config::AuthenticationConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::EndpointRateLimitConfig;
|
||||
use crate::control_plane::{
|
||||
self, AccessBlockerFlags, CachedNodeInfo, EndpointAccessControl, RoleAccessControl,
|
||||
};
|
||||
@@ -477,6 +477,7 @@ mod tests {
|
||||
allowed_ips: Arc::new(self.ips.clone()),
|
||||
allowed_vpce: Arc::new(self.vpc_endpoint_ids.clone()),
|
||||
flags: self.access_blocker_flags,
|
||||
rate_limits: EndpointRateLimitConfig::default(),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -28,10 +28,9 @@ use crate::context::RequestContext;
|
||||
use crate::metrics::{Metrics, ThreadPoolMetrics};
|
||||
use crate::pqproto::FeStartupPacket;
|
||||
use crate::protocol2::ConnectionInfo;
|
||||
use crate::proxy::{
|
||||
ErrorSource, TlsRequired, copy_bidirectional_client_compute, run_until_cancelled,
|
||||
};
|
||||
use crate::proxy::{ErrorSource, TlsRequired, copy_bidirectional_client_compute};
|
||||
use crate::stream::{PqStream, Stream};
|
||||
use crate::util::run_until_cancelled;
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
|
||||
4
proxy/src/cache/project_info.rs
vendored
4
proxy/src/cache/project_info.rs
vendored
@@ -364,6 +364,7 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::*;
|
||||
use crate::control_plane::messages::EndpointRateLimitConfig;
|
||||
use crate::control_plane::{AccessBlockerFlags, AuthSecret};
|
||||
use crate::scram::ServerSecret;
|
||||
use crate::types::ProjectId;
|
||||
@@ -399,6 +400,7 @@ mod tests {
|
||||
allowed_ips: allowed_ips.clone(),
|
||||
allowed_vpce: Arc::new(vec![]),
|
||||
flags: AccessBlockerFlags::default(),
|
||||
rate_limits: EndpointRateLimitConfig::default(),
|
||||
},
|
||||
RoleAccessControl {
|
||||
secret: secret1.clone(),
|
||||
@@ -414,6 +416,7 @@ mod tests {
|
||||
allowed_ips: allowed_ips.clone(),
|
||||
allowed_vpce: Arc::new(vec![]),
|
||||
flags: AccessBlockerFlags::default(),
|
||||
rate_limits: EndpointRateLimitConfig::default(),
|
||||
},
|
||||
RoleAccessControl {
|
||||
secret: secret2.clone(),
|
||||
@@ -439,6 +442,7 @@ mod tests {
|
||||
allowed_ips: allowed_ips.clone(),
|
||||
allowed_vpce: Arc::new(vec![]),
|
||||
flags: AccessBlockerFlags::default(),
|
||||
rate_limits: EndpointRateLimitConfig::default(),
|
||||
},
|
||||
RoleAccessControl {
|
||||
secret: secret3.clone(),
|
||||
|
||||
@@ -136,11 +136,11 @@ impl AuthInfo {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn with_auth_keys(keys: &ComputeCredentialKeys) -> Self {
|
||||
pub(crate) fn with_auth_keys(keys: ComputeCredentialKeys) -> Self {
|
||||
Self {
|
||||
auth: match keys {
|
||||
ComputeCredentialKeys::AuthKeys(AuthKeys::ScramSha256(auth_keys)) => {
|
||||
Some(Auth::Scram(Box::new(*auth_keys)))
|
||||
Some(Auth::Scram(Box::new(auth_keys)))
|
||||
}
|
||||
ComputeCredentialKeys::JwtPayload(_) | ComputeCredentialKeys::None => None,
|
||||
},
|
||||
|
||||
@@ -11,13 +11,12 @@ use crate::config::{ProxyConfig, ProxyProtocolV2};
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard};
|
||||
use crate::pglb::connect_compute::{TcpMechanism, connect_to_compute};
|
||||
use crate::pglb::handshake::{HandshakeData, handshake};
|
||||
use crate::pglb::passthrough::ProxyPassthrough;
|
||||
use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol};
|
||||
use crate::proxy::{
|
||||
ClientRequestError, ErrorSource, prepare_client_connection, run_until_cancelled,
|
||||
};
|
||||
use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute};
|
||||
use crate::proxy::{ClientRequestError, ErrorSource, prepare_client_connection};
|
||||
use crate::util::run_until_cancelled;
|
||||
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
|
||||
@@ -146,6 +146,7 @@ impl NeonControlPlaneClient {
|
||||
public_access_blocked: block_public_connections,
|
||||
vpc_access_blocked: block_vpc_connections,
|
||||
},
|
||||
rate_limits: body.rate_limits,
|
||||
})
|
||||
}
|
||||
.inspect_err(|e| tracing::debug!(error = ?e))
|
||||
@@ -312,6 +313,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
|
||||
allowed_ips: Arc::new(auth_info.allowed_ips),
|
||||
allowed_vpce: Arc::new(auth_info.allowed_vpc_endpoint_ids),
|
||||
flags: auth_info.access_blocker_flags,
|
||||
rate_limits: auth_info.rate_limits,
|
||||
};
|
||||
let role_control = RoleAccessControl {
|
||||
secret: auth_info.secret,
|
||||
@@ -357,6 +359,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
|
||||
allowed_ips: Arc::new(auth_info.allowed_ips),
|
||||
allowed_vpce: Arc::new(auth_info.allowed_vpc_endpoint_ids),
|
||||
flags: auth_info.access_blocker_flags,
|
||||
rate_limits: auth_info.rate_limits,
|
||||
};
|
||||
let role_control = RoleAccessControl {
|
||||
secret: auth_info.secret,
|
||||
|
||||
@@ -20,7 +20,7 @@ use crate::context::RequestContext;
|
||||
use crate::control_plane::errors::{
|
||||
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
|
||||
};
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::control_plane::messages::{EndpointRateLimitConfig, MetricsAuxInfo};
|
||||
use crate::control_plane::{
|
||||
AccessBlockerFlags, AuthInfo, AuthSecret, CachedNodeInfo, EndpointAccessControl, NodeInfo,
|
||||
RoleAccessControl,
|
||||
@@ -130,6 +130,7 @@ impl MockControlPlane {
|
||||
project_id: None,
|
||||
account_id: None,
|
||||
access_blocker_flags: AccessBlockerFlags::default(),
|
||||
rate_limits: EndpointRateLimitConfig::default(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -233,6 +234,7 @@ impl super::ControlPlaneApi for MockControlPlane {
|
||||
allowed_ips: Arc::new(info.allowed_ips),
|
||||
allowed_vpce: Arc::new(info.allowed_vpc_endpoint_ids),
|
||||
flags: info.access_blocker_flags,
|
||||
rate_limits: info.rate_limits,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ use clashmap::ClashMap;
|
||||
use tokio::time::Instant;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use super::{EndpointAccessControl, RoleAccessControl};
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::auth::backend::jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError};
|
||||
use crate::cache::endpoints::EndpointsCache;
|
||||
@@ -22,8 +23,6 @@ use crate::metrics::ApiLockMetrics;
|
||||
use crate::rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token};
|
||||
use crate::types::EndpointId;
|
||||
|
||||
use super::{EndpointAccessControl, RoleAccessControl};
|
||||
|
||||
#[non_exhaustive]
|
||||
#[derive(Clone)]
|
||||
pub enum ControlPlaneClient {
|
||||
|
||||
@@ -227,12 +227,35 @@ pub(crate) struct UserFacingMessage {
|
||||
#[derive(Deserialize)]
|
||||
pub(crate) struct GetEndpointAccessControl {
|
||||
pub(crate) role_secret: Box<str>,
|
||||
pub(crate) allowed_ips: Option<Vec<IpPattern>>,
|
||||
pub(crate) allowed_vpc_endpoint_ids: Option<Vec<String>>,
|
||||
|
||||
pub(crate) project_id: Option<ProjectIdInt>,
|
||||
pub(crate) account_id: Option<AccountIdInt>,
|
||||
|
||||
pub(crate) allowed_ips: Option<Vec<IpPattern>>,
|
||||
pub(crate) allowed_vpc_endpoint_ids: Option<Vec<String>>,
|
||||
pub(crate) block_public_connections: Option<bool>,
|
||||
pub(crate) block_vpc_connections: Option<bool>,
|
||||
|
||||
#[serde(default)]
|
||||
pub(crate) rate_limits: EndpointRateLimitConfig,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Deserialize, Default)]
|
||||
pub struct EndpointRateLimitConfig {
|
||||
pub connection_attempts: ConnectionAttemptsLimit,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Deserialize, Default)]
|
||||
pub struct ConnectionAttemptsLimit {
|
||||
pub tcp: Option<LeakyBucketSetting>,
|
||||
pub ws: Option<LeakyBucketSetting>,
|
||||
pub http: Option<LeakyBucketSetting>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Deserialize)]
|
||||
pub struct LeakyBucketSetting {
|
||||
pub rps: f64,
|
||||
pub burst: f64,
|
||||
}
|
||||
|
||||
/// Response which holds compute node's `host:port` pair.
|
||||
|
||||
@@ -11,6 +11,8 @@ pub(crate) mod errors;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use messages::EndpointRateLimitConfig;
|
||||
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::auth::backend::jwt::AuthRule;
|
||||
use crate::auth::{AuthError, IpPattern, check_peer_addr_is_in_list};
|
||||
@@ -18,8 +20,9 @@ use crate::cache::{Cached, TimedLru};
|
||||
use crate::config::ComputeConfig;
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
|
||||
use crate::intern::{AccountIdInt, ProjectIdInt};
|
||||
use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt};
|
||||
use crate::protocol2::ConnectionInfoExtra;
|
||||
use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig};
|
||||
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
|
||||
use crate::{compute, scram};
|
||||
|
||||
@@ -56,6 +59,8 @@ pub(crate) struct AuthInfo {
|
||||
pub(crate) account_id: Option<AccountIdInt>,
|
||||
/// Are public connections or VPC connections blocked?
|
||||
pub(crate) access_blocker_flags: AccessBlockerFlags,
|
||||
/// The rate limits for this endpoint.
|
||||
pub(crate) rate_limits: EndpointRateLimitConfig,
|
||||
}
|
||||
|
||||
/// Info for establishing a connection to a compute node.
|
||||
@@ -101,6 +106,8 @@ pub struct EndpointAccessControl {
|
||||
pub allowed_ips: Arc<Vec<IpPattern>>,
|
||||
pub allowed_vpce: Arc<Vec<String>>,
|
||||
pub flags: AccessBlockerFlags,
|
||||
|
||||
pub rate_limits: EndpointRateLimitConfig,
|
||||
}
|
||||
|
||||
impl EndpointAccessControl {
|
||||
@@ -139,6 +146,36 @@ impl EndpointAccessControl {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn connection_attempt_rate_limit(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
endpoint: &EndpointId,
|
||||
rate_limiter: &EndpointRateLimiter,
|
||||
) -> Result<(), AuthError> {
|
||||
let endpoint = EndpointIdInt::from(endpoint);
|
||||
|
||||
let limits = &self.rate_limits.connection_attempts;
|
||||
let config = match ctx.protocol() {
|
||||
crate::metrics::Protocol::Http => limits.http,
|
||||
crate::metrics::Protocol::Ws => limits.ws,
|
||||
crate::metrics::Protocol::Tcp => limits.tcp,
|
||||
crate::metrics::Protocol::SniRouter => return Ok(()),
|
||||
};
|
||||
let config = config.and_then(|config| {
|
||||
if config.rps <= 0.0 || config.burst <= 0.0 {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(LeakyBucketConfig::new(config.rps, config.burst))
|
||||
});
|
||||
|
||||
if !rate_limiter.check(endpoint, config, 1) {
|
||||
return Err(AuthError::too_many_connections());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// This will allocate per each call, but the http requests alone
|
||||
|
||||
@@ -106,4 +106,5 @@ mod tls;
|
||||
mod types;
|
||||
mod url;
|
||||
mod usage_metrics;
|
||||
mod util;
|
||||
mod waiters;
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
pub mod connect_compute;
|
||||
pub mod copy_bidirectional;
|
||||
pub mod handshake;
|
||||
pub mod inprocess;
|
||||
|
||||
@@ -8,19 +8,19 @@ use crate::config::{ComputeConfig, RetryConfig};
|
||||
use crate::context::RequestContext;
|
||||
use crate::control_plane::errors::WakeComputeError;
|
||||
use crate::control_plane::locks::ApiLocks;
|
||||
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::control_plane::{self, NodeInfo};
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{
|
||||
ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType,
|
||||
};
|
||||
use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute, retry_after, should_retry};
|
||||
use crate::proxy::wake_compute::wake_compute;
|
||||
use crate::proxy::wake_compute::{WakeComputeBackend, wake_compute};
|
||||
use crate::types::Host;
|
||||
|
||||
/// If we couldn't connect, a cached connection info might be to blame
|
||||
/// (e.g. the compute node's address might've changed at the wrong time).
|
||||
/// Invalidate the cache entry (if any) to prevent subsequent errors.
|
||||
#[tracing::instrument(name = "invalidate_cache", skip_all)]
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) fn invalidate_cache(node_info: control_plane::CachedNodeInfo) -> NodeInfo {
|
||||
let is_cached = node_info.cached();
|
||||
if is_cached {
|
||||
@@ -49,14 +49,6 @@ pub(crate) trait ConnectMechanism {
|
||||
) -> Result<Self::Connection, Self::ConnectError>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub(crate) trait ComputeConnectBackend {
|
||||
async fn wake_compute(
|
||||
&self,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError>;
|
||||
}
|
||||
|
||||
pub(crate) struct TcpMechanism {
|
||||
pub(crate) auth: AuthInfo,
|
||||
/// connect_to_compute concurrency lock
|
||||
@@ -91,7 +83,7 @@ impl ConnectMechanism for TcpMechanism {
|
||||
|
||||
/// Try to connect to the compute node, retrying if necessary.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
|
||||
pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: WakeComputeBackend>(
|
||||
ctx: &RequestContext,
|
||||
mechanism: &M,
|
||||
user_info: &B,
|
||||
@@ -1,8 +1,10 @@
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub(crate) mod connect_compute;
|
||||
pub(crate) mod retry;
|
||||
pub(crate) mod wake_compute;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::FutureExt;
|
||||
@@ -21,15 +23,16 @@ use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig};
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::{ReportableError, UserFacingError};
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard};
|
||||
use crate::pglb::connect_compute::{TcpMechanism, connect_to_compute};
|
||||
pub use crate::pglb::copy_bidirectional::{ErrorSource, copy_bidirectional_client_compute};
|
||||
use crate::pglb::handshake::{HandshakeData, HandshakeError, handshake};
|
||||
use crate::pglb::passthrough::ProxyPassthrough;
|
||||
use crate::pqproto::{BeMessage, CancelKeyData, StartupMessageParams};
|
||||
use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol};
|
||||
use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute};
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::stream::{PqStream, Stream};
|
||||
use crate::types::EndpointCacheKey;
|
||||
use crate::util::run_until_cancelled;
|
||||
use crate::{auth, compute};
|
||||
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
@@ -46,21 +49,6 @@ impl ReportableError for TlsRequired {
|
||||
|
||||
impl UserFacingError for TlsRequired {}
|
||||
|
||||
pub async fn run_until_cancelled<F: std::future::Future>(
|
||||
f: F,
|
||||
cancellation_token: &CancellationToken,
|
||||
) -> Option<F::Output> {
|
||||
match futures::future::select(
|
||||
std::pin::pin!(f),
|
||||
std::pin::pin!(cancellation_token.cancelled()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
futures::future::Either::Left((f, _)) => Some(f),
|
||||
futures::future::Either::Right(((), _)) => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn task_main(
|
||||
config: &'static ProxyConfig,
|
||||
auth_backend: &'static auth::Backend<'static, ()>,
|
||||
@@ -358,12 +346,12 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
|
||||
}
|
||||
};
|
||||
|
||||
let creds = match &user_info {
|
||||
auth::Backend::ControlPlane(_, creds) => creds,
|
||||
let (cplane, creds) = match user_info {
|
||||
auth::Backend::ControlPlane(cplane, creds) => (cplane, creds),
|
||||
auth::Backend::Local(_) => unreachable!("local proxy does not run tcp proxy service"),
|
||||
};
|
||||
let params_compat = creds.info.options.get(NeonOptions::PARAMS_COMPAT).is_some();
|
||||
let mut auth_info = compute::AuthInfo::with_auth_keys(&creds.keys);
|
||||
let mut auth_info = compute::AuthInfo::with_auth_keys(creds.keys);
|
||||
auth_info.set_startup_params(¶ms, params_compat);
|
||||
|
||||
let res = connect_to_compute(
|
||||
@@ -373,7 +361,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
|
||||
auth: auth_info,
|
||||
locks: &config.connect_compute_locks,
|
||||
},
|
||||
&user_info,
|
||||
&auth::Backend::ControlPlane(cplane, creds.info),
|
||||
config.wake_compute_retry_config,
|
||||
&config.connect_to_compute,
|
||||
)
|
||||
|
||||
@@ -8,7 +8,7 @@ use std::time::Duration;
|
||||
use anyhow::{Context, bail};
|
||||
use async_trait::async_trait;
|
||||
use http::StatusCode;
|
||||
use postgres_client::config::{AuthKeys, ScramKeys, SslMode};
|
||||
use postgres_client::config::SslMode;
|
||||
use postgres_client::tls::{MakeTlsConnect, NoTls};
|
||||
use retry::{ShouldRetryWakeCompute, retry_after};
|
||||
use rstest::rstest;
|
||||
@@ -19,15 +19,13 @@ use tracing_test::traced_test;
|
||||
|
||||
use super::retry::CouldRetry;
|
||||
use super::*;
|
||||
use crate::auth::backend::{
|
||||
ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo, MaybeOwned,
|
||||
};
|
||||
use crate::auth::backend::{ComputeUserInfo, MaybeOwned};
|
||||
use crate::config::{ComputeConfig, RetryConfig};
|
||||
use crate::control_plane::client::{ControlPlaneClient, TestControlPlaneClient};
|
||||
use crate::control_plane::messages::{ControlPlaneErrorMessage, Details, MetricsAuxInfo, Status};
|
||||
use crate::control_plane::{self, CachedNodeInfo, NodeInfo, NodeInfoCache};
|
||||
use crate::error::ErrorKind;
|
||||
use crate::pglb::connect_compute::ConnectMechanism;
|
||||
use crate::proxy::connect_compute::ConnectMechanism;
|
||||
use crate::tls::client_config::compute_client_config_with_certs;
|
||||
use crate::tls::server_config::CertResolver;
|
||||
use crate::types::{BranchId, EndpointId, ProjectId};
|
||||
@@ -575,19 +573,13 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
|
||||
|
||||
fn helper_create_connect_info(
|
||||
mechanism: &TestConnectMechanism,
|
||||
) -> auth::Backend<'static, ComputeCredentials> {
|
||||
) -> auth::Backend<'static, ComputeUserInfo> {
|
||||
auth::Backend::ControlPlane(
|
||||
MaybeOwned::Owned(ControlPlaneClient::Test(Box::new(mechanism.clone()))),
|
||||
ComputeCredentials {
|
||||
info: ComputeUserInfo {
|
||||
endpoint: "endpoint".into(),
|
||||
user: "user".into(),
|
||||
options: NeonOptions::parse_options_raw(""),
|
||||
},
|
||||
keys: ComputeCredentialKeys::AuthKeys(AuthKeys::ScramSha256(ScramKeys {
|
||||
client_key: [0; 32],
|
||||
server_key: [0; 32],
|
||||
})),
|
||||
ComputeUserInfo {
|
||||
endpoint: "endpoint".into(),
|
||||
user: "user".into(),
|
||||
options: NeonOptions::parse_options_raw(""),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use async_trait::async_trait;
|
||||
use tracing::{error, info};
|
||||
|
||||
use crate::config::RetryConfig;
|
||||
@@ -8,7 +9,6 @@ use crate::error::ReportableError;
|
||||
use crate::metrics::{
|
||||
ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType,
|
||||
};
|
||||
use crate::pglb::connect_compute::ComputeConnectBackend;
|
||||
use crate::proxy::retry::{retry_after, should_retry};
|
||||
|
||||
// Use macro to retain original callsite.
|
||||
@@ -23,7 +23,12 @@ macro_rules! log_wake_compute_error {
|
||||
};
|
||||
}
|
||||
|
||||
pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
|
||||
#[async_trait]
|
||||
pub(crate) trait WakeComputeBackend {
|
||||
async fn wake_compute(&self, ctx: &RequestContext) -> Result<CachedNodeInfo, WakeComputeError>;
|
||||
}
|
||||
|
||||
pub(crate) async fn wake_compute<B: WakeComputeBackend>(
|
||||
num_retries: &mut u32,
|
||||
ctx: &RequestContext,
|
||||
api: &B,
|
||||
|
||||
@@ -69,9 +69,8 @@ pub struct LeakyBucketConfig {
|
||||
pub max: f64,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl LeakyBucketConfig {
|
||||
pub(crate) fn new(rps: f64, max: f64) -> Self {
|
||||
pub fn new(rps: f64, max: f64) -> Self {
|
||||
assert!(rps > 0.0, "rps must be positive");
|
||||
assert!(max > 0.0, "max must be positive");
|
||||
Self { rps, max }
|
||||
|
||||
@@ -12,11 +12,10 @@ use rand::{Rng, SeedableRng};
|
||||
use tokio::time::{Duration, Instant};
|
||||
use tracing::info;
|
||||
|
||||
use super::LeakyBucketConfig;
|
||||
use crate::ext::LockExt;
|
||||
use crate::intern::EndpointIdInt;
|
||||
|
||||
use super::LeakyBucketConfig;
|
||||
|
||||
pub struct GlobalRateLimiter {
|
||||
data: Vec<RateBucket>,
|
||||
info: Vec<RateBucketInfo>,
|
||||
|
||||
@@ -21,7 +21,7 @@ use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
|
||||
use super::http_conn_pool::{self, HttpConnPool, Send, poll_http2_client};
|
||||
use super::local_conn_pool::{self, EXT_NAME, EXT_SCHEMA, EXT_VERSION, LocalConnPool};
|
||||
use crate::auth::backend::local::StaticAuthRules;
|
||||
use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
|
||||
use crate::auth::backend::{ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo};
|
||||
use crate::auth::{self, AuthError};
|
||||
use crate::compute_ctl::{
|
||||
ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
|
||||
@@ -34,7 +34,7 @@ use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError};
|
||||
use crate::control_plane::locks::ApiLocks;
|
||||
use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::intern::EndpointIdInt;
|
||||
use crate::pglb::connect_compute::ConnectMechanism;
|
||||
use crate::proxy::connect_compute::ConnectMechanism;
|
||||
use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute};
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
|
||||
@@ -68,17 +68,20 @@ impl PoolingBackend {
|
||||
self.config.authentication_config.is_vpc_acccess_proxy,
|
||||
)?;
|
||||
|
||||
let ep = EndpointIdInt::from(&user_info.endpoint);
|
||||
let rate_limit_config = None;
|
||||
if !self.endpoint_rate_limiter.check(ep, rate_limit_config, 1) {
|
||||
return Err(AuthError::too_many_connections());
|
||||
}
|
||||
access_control.connection_attempt_rate_limit(
|
||||
ctx,
|
||||
&user_info.endpoint,
|
||||
&self.endpoint_rate_limiter,
|
||||
)?;
|
||||
|
||||
let role_access = backend.get_role_secret(ctx).await?;
|
||||
let Some(secret) = role_access.secret else {
|
||||
// If we don't have an authentication secret, for the http flow we can just return an error.
|
||||
info!("authentication info not found");
|
||||
return Err(AuthError::password_failed(&*user_info.user));
|
||||
};
|
||||
|
||||
let ep = EndpointIdInt::from(&user_info.endpoint);
|
||||
let auth_outcome = crate::auth::validate_password_and_exchange(
|
||||
&self.config.authentication_config.thread_pool,
|
||||
ep,
|
||||
@@ -180,14 +183,15 @@ impl PoolingBackend {
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
tracing::Span::current().record("conn_id", display(conn_id));
|
||||
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
let backend = self.auth_backend.as_ref().map(|()| keys);
|
||||
crate::pglb::connect_compute::connect_to_compute(
|
||||
let backend = self.auth_backend.as_ref().map(|()| keys.info);
|
||||
crate::proxy::connect_compute::connect_to_compute(
|
||||
ctx,
|
||||
&TokioMechanism {
|
||||
conn_id,
|
||||
conn_info,
|
||||
pool: self.pool.clone(),
|
||||
locks: &self.config.connect_compute_locks,
|
||||
keys: keys.keys,
|
||||
},
|
||||
&backend,
|
||||
self.config.wake_compute_retry_config,
|
||||
@@ -214,18 +218,15 @@ impl PoolingBackend {
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
tracing::Span::current().record("conn_id", display(conn_id));
|
||||
debug!(%conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials {
|
||||
info: ComputeUserInfo {
|
||||
user: conn_info.user_info.user.clone(),
|
||||
endpoint: EndpointId::from(format!(
|
||||
"{}{LOCAL_PROXY_SUFFIX}",
|
||||
conn_info.user_info.endpoint.normalize()
|
||||
)),
|
||||
options: conn_info.user_info.options.clone(),
|
||||
},
|
||||
keys: crate::auth::backend::ComputeCredentialKeys::None,
|
||||
let backend = self.auth_backend.as_ref().map(|()| ComputeUserInfo {
|
||||
user: conn_info.user_info.user.clone(),
|
||||
endpoint: EndpointId::from(format!(
|
||||
"{}{LOCAL_PROXY_SUFFIX}",
|
||||
conn_info.user_info.endpoint.normalize()
|
||||
)),
|
||||
options: conn_info.user_info.options.clone(),
|
||||
});
|
||||
crate::pglb::connect_compute::connect_to_compute(
|
||||
crate::proxy::connect_compute::connect_to_compute(
|
||||
ctx,
|
||||
&HyperMechanism {
|
||||
conn_id,
|
||||
@@ -495,6 +496,7 @@ struct TokioMechanism {
|
||||
pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
|
||||
conn_info: ConnInfo,
|
||||
conn_id: uuid::Uuid,
|
||||
keys: ComputeCredentialKeys,
|
||||
|
||||
/// connect_to_compute concurrency lock
|
||||
locks: &'static ApiLocks<Host>,
|
||||
@@ -520,6 +522,10 @@ impl ConnectMechanism for TokioMechanism {
|
||||
.dbname(&self.conn_info.dbname)
|
||||
.connect_timeout(compute_config.timeout);
|
||||
|
||||
if let ComputeCredentialKeys::AuthKeys(auth_keys) = self.keys {
|
||||
config.auth_keys(auth_keys);
|
||||
}
|
||||
|
||||
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
|
||||
let res = config.connect(compute_config).await;
|
||||
drop(pause);
|
||||
|
||||
@@ -50,10 +50,10 @@ use crate::context::RequestContext;
|
||||
use crate::ext::TaskExt;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol};
|
||||
use crate::proxy::run_until_cancelled;
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::serverless::backend::PoolingBackend;
|
||||
use crate::serverless::http_util::{api_error_into_response, json_response};
|
||||
use crate::util::run_until_cancelled;
|
||||
|
||||
pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api";
|
||||
pub(crate) const AUTH_BROKER_SNI: &str = "apiauth";
|
||||
|
||||
@@ -41,10 +41,11 @@ use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::http::{ReadBodyError, read_body_with_limit};
|
||||
use crate::metrics::{HttpDirection, Metrics, SniGroup, SniKind};
|
||||
use crate::pqproto::StartupMessageParams;
|
||||
use crate::proxy::{NeonOptions, run_until_cancelled};
|
||||
use crate::proxy::NeonOptions;
|
||||
use crate::serverless::backend::HttpConnError;
|
||||
use crate::types::{DbName, RoleName};
|
||||
use crate::usage_metrics::{MetricCounter, MetricCounterRecorder};
|
||||
use crate::util::run_until_cancelled;
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
|
||||
14
proxy/src/util.rs
Normal file
14
proxy/src/util.rs
Normal file
@@ -0,0 +1,14 @@
|
||||
use std::pin::pin;
|
||||
|
||||
use futures::future::{Either, select};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub async fn run_until_cancelled<F: Future>(
|
||||
f: F,
|
||||
cancellation_token: &CancellationToken,
|
||||
) -> Option<F::Output> {
|
||||
match select(pin!(f), pin!(cancellation_token.cancelled())).await {
|
||||
Either::Left((f, _)) => Some(f),
|
||||
Either::Right(((), _)) => None,
|
||||
}
|
||||
}
|
||||
@@ -69,8 +69,10 @@ class EndpointHttpClient(requests.Session):
|
||||
json: dict[str, str] = res.json()
|
||||
return json
|
||||
|
||||
def prewarm_lfc(self):
|
||||
self.post(f"http://localhost:{self.external_port}/lfc/prewarm").raise_for_status()
|
||||
def prewarm_lfc(self, from_endpoint_id: str | None = None):
|
||||
url: str = f"http://localhost:{self.external_port}/lfc/prewarm"
|
||||
params = {"from_endpoint": from_endpoint_id} if from_endpoint_id else dict()
|
||||
self.post(url, params=params).raise_for_status()
|
||||
|
||||
def prewarmed():
|
||||
json = self.prewarm_lfc_status()
|
||||
|
||||
@@ -129,6 +129,18 @@ class NeonAPI:
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def get_project_limits(self, project_id: str) -> dict[str, Any]:
|
||||
resp = self.__request(
|
||||
"GET",
|
||||
f"/projects/{project_id}/limits",
|
||||
headers={
|
||||
"Accept": "application/json",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
|
||||
return cast("dict[str, Any]", resp.json())
|
||||
|
||||
def delete_project(
|
||||
self,
|
||||
project_id: str,
|
||||
|
||||
@@ -4046,6 +4046,16 @@ def static_proxy(
|
||||
"CREATE TABLE neon_control_plane.endpoints (endpoint_id VARCHAR(255) PRIMARY KEY, allowed_ips VARCHAR(255))"
|
||||
)
|
||||
|
||||
vanilla_pg.stop()
|
||||
vanilla_pg.edit_hba(
|
||||
[
|
||||
"local all all trust",
|
||||
"host all all 127.0.0.1/32 scram-sha-256",
|
||||
"host all all ::1/128 scram-sha-256",
|
||||
]
|
||||
)
|
||||
vanilla_pg.start()
|
||||
|
||||
proxy_port = port_distributor.get_port()
|
||||
mgmt_port = port_distributor.get_port()
|
||||
http_port = port_distributor.get_port()
|
||||
|
||||
@@ -45,6 +45,8 @@ class NeonEndpoint:
|
||||
if self.branch.connect_env:
|
||||
self.connect_env = self.branch.connect_env.copy()
|
||||
self.connect_env["PGHOST"] = self.host
|
||||
if self.type == "read_only":
|
||||
self.project.read_only_endpoints_total += 1
|
||||
|
||||
def delete(self):
|
||||
self.project.delete_endpoint(self.id)
|
||||
@@ -228,8 +230,13 @@ class NeonProject:
|
||||
self.benchmarks: dict[str, subprocess.Popen[Any]] = {}
|
||||
self.restore_num: int = 0
|
||||
self.restart_pgbench_on_console_errors: bool = False
|
||||
self.limits: dict[str, Any] = self.get_limits()["limits"]
|
||||
self.read_only_endpoints_total: int = 0
|
||||
|
||||
def delete(self):
|
||||
def get_limits(self) -> dict[str, Any]:
|
||||
return self.neon_api.get_project_limits(self.id)
|
||||
|
||||
def delete(self) -> None:
|
||||
self.neon_api.delete_project(self.id)
|
||||
|
||||
def create_branch(self, parent_id: str | None = None) -> NeonBranch | None:
|
||||
@@ -282,6 +289,7 @@ class NeonProject:
|
||||
self.neon_api.delete_endpoint(self.id, endpoint_id)
|
||||
self.endpoints[endpoint_id].branch.endpoints.pop(endpoint_id)
|
||||
self.endpoints.pop(endpoint_id)
|
||||
self.read_only_endpoints_total -= 1
|
||||
self.wait()
|
||||
|
||||
def start_benchmark(self, target: str, clients: int = 10) -> subprocess.Popen[Any]:
|
||||
@@ -369,49 +377,64 @@ def setup_class(
|
||||
print(f"::warning::Retried on 524 error {neon_api.retries524} times")
|
||||
if neon_api.retries4xx > 0:
|
||||
print(f"::warning::Retried on 4xx error {neon_api.retries4xx} times")
|
||||
log.info("Removing the project")
|
||||
log.info("Removing the project %s", project.id)
|
||||
project.delete()
|
||||
|
||||
|
||||
def do_action(project: NeonProject, action: str) -> None:
|
||||
def do_action(project: NeonProject, action: str) -> bool:
|
||||
"""
|
||||
Runs the action
|
||||
"""
|
||||
log.info("Action: %s", action)
|
||||
if action == "new_branch":
|
||||
log.info("Trying to create a new branch")
|
||||
if 0 <= project.limits["max_branches"] <= len(project.branches):
|
||||
log.info(
|
||||
"Maximum branch limit exceeded (%s of %s)",
|
||||
len(project.branches),
|
||||
project.limits["max_branches"],
|
||||
)
|
||||
return False
|
||||
parent = project.branches[
|
||||
random.choice(list(set(project.branches.keys()) - project.reset_branches))
|
||||
]
|
||||
log.info("Parent: %s", parent)
|
||||
child = parent.create_child_branch()
|
||||
if child is None:
|
||||
return
|
||||
return False
|
||||
log.info("Created branch %s", child)
|
||||
child.start_benchmark()
|
||||
elif action == "delete_branch":
|
||||
if project.leaf_branches:
|
||||
target = random.choice(list(project.leaf_branches.values()))
|
||||
target: NeonBranch = random.choice(list(project.leaf_branches.values()))
|
||||
log.info("Trying to delete branch %s", target)
|
||||
target.delete()
|
||||
else:
|
||||
log.info("Leaf branches not found, skipping")
|
||||
return False
|
||||
elif action == "new_ro_endpoint":
|
||||
if 0 <= project.limits["max_read_only_endpoints"] <= project.read_only_endpoints_total:
|
||||
log.info(
|
||||
"Maximum read only endpoint limit exceeded (%s of %s)",
|
||||
project.read_only_endpoints_total,
|
||||
project.limits["max_read_only_endpoints"],
|
||||
)
|
||||
return False
|
||||
ep = random.choice(
|
||||
[br for br in project.branches.values() if br.id not in project.reset_branches]
|
||||
).create_ro_endpoint()
|
||||
log.info("Created the RO endpoint with id %s branch: %s", ep.id, ep.branch.id)
|
||||
ep.start_benchmark()
|
||||
elif action == "delete_ro_endpoint":
|
||||
if project.read_only_endpoints_total == 0:
|
||||
log.info("no read_only endpoints present, skipping")
|
||||
return False
|
||||
ro_endpoints: list[NeonEndpoint] = [
|
||||
endpoint for endpoint in project.endpoints.values() if endpoint.type == "read_only"
|
||||
]
|
||||
if ro_endpoints:
|
||||
target_ep: NeonEndpoint = random.choice(ro_endpoints)
|
||||
target_ep.delete()
|
||||
log.info("endpoint %s deleted", target_ep.id)
|
||||
else:
|
||||
log.info("no read_only endpoints present, skipping")
|
||||
target_ep: NeonEndpoint = random.choice(ro_endpoints)
|
||||
target_ep.delete()
|
||||
log.info("endpoint %s deleted", target_ep.id)
|
||||
elif action == "restore_random_time":
|
||||
if project.leaf_branches:
|
||||
br: NeonBranch = random.choice(list(project.leaf_branches.values()))
|
||||
@@ -419,8 +442,10 @@ def do_action(project: NeonProject, action: str) -> None:
|
||||
br.restore_random_time()
|
||||
else:
|
||||
log.info("No leaf branches found")
|
||||
return False
|
||||
else:
|
||||
raise ValueError(f"The action {action} is unknown")
|
||||
return True
|
||||
|
||||
|
||||
@pytest.mark.timeout(7200)
|
||||
@@ -457,8 +482,9 @@ def test_api_random(
|
||||
pg_bin.run(["pgbench", "-i", "-I", "dtGvp", "-s100"], env=project.main_branch.connect_env)
|
||||
for _ in range(num_operations):
|
||||
log.info("Starting action #%s", _ + 1)
|
||||
do_action(
|
||||
while not do_action(
|
||||
project, random.choices([a[0] for a in ACTIONS], weights=[w[1] for w in ACTIONS])[0]
|
||||
)
|
||||
):
|
||||
log.info("Retrying...")
|
||||
project.check_all_benchmarks()
|
||||
assert True
|
||||
|
||||
@@ -188,7 +188,8 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, query: LfcQueryMet
|
||||
pg_cur.execute("select pg_reload_conf()")
|
||||
|
||||
if query is LfcQueryMethod.COMPUTE_CTL:
|
||||
http_client.prewarm_lfc()
|
||||
# Same thing as prewarm_lfc(), testing other method
|
||||
http_client.prewarm_lfc(endpoint.endpoint_id)
|
||||
else:
|
||||
pg_cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
|
||||
|
||||
|
||||
@@ -19,11 +19,15 @@ TABLE_NAME = "neon_control_plane.endpoints"
|
||||
async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: VanillaPostgres):
|
||||
# Shouldn't be able to connect to this project
|
||||
vanilla_pg.safe_psql(
|
||||
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('private-project', '8.8.8.8')"
|
||||
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('private-project', '8.8.8.8')",
|
||||
user="proxy",
|
||||
password="password",
|
||||
)
|
||||
# Should be able to connect to this project
|
||||
vanilla_pg.safe_psql(
|
||||
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('generic-project', '::1,127.0.0.1')"
|
||||
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('generic-project', '::1,127.0.0.1')",
|
||||
user="proxy",
|
||||
password="password",
|
||||
)
|
||||
|
||||
def check_cannot_connect(**kwargs):
|
||||
@@ -60,7 +64,9 @@ async def test_proxy_http_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
|
||||
|
||||
# Shouldn't be able to connect to this project
|
||||
vanilla_pg.safe_psql(
|
||||
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('proxy', '8.8.8.8')"
|
||||
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('proxy', '8.8.8.8')",
|
||||
user="proxy",
|
||||
password="password",
|
||||
)
|
||||
|
||||
def query(status: int, query: str, *args):
|
||||
@@ -75,6 +81,8 @@ async def test_proxy_http_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
|
||||
query(400, "select 1;") # ip address is not allowed
|
||||
# Should be able to connect to this project
|
||||
vanilla_pg.safe_psql(
|
||||
f"UPDATE {TABLE_NAME} SET allowed_ips = '8.8.8.8,127.0.0.1' WHERE endpoint_id = 'proxy'"
|
||||
f"UPDATE {TABLE_NAME} SET allowed_ips = '8.8.8.8,127.0.0.1' WHERE endpoint_id = 'proxy'",
|
||||
user="proxy",
|
||||
password="password",
|
||||
)
|
||||
query(200, "select 1;") # should work now
|
||||
|
||||
Reference in New Issue
Block a user