Compare commits

...

7 Commits

Author SHA1 Message Date
Vlad Lazar
6e43ef46de fixup: update pytest printing of gc results 2025-06-19 14:34:09 +02:00
Vlad Lazar
4c67de446d fixup: init offloaded timelines list before gc info init 2025-06-18 15:42:52 +02:00
Vlad Lazar
7bd8394ed5 pageserver: fix initial layer visibility calculation
Problem

GC info is an input to updating layer visibility.
Currently, gc info is updated on timeline activation and visibility is
computed on tenant attach, so we ignore branch points and compute
visibility by taking all layers into account.

Side note: gc info is also updated when timelines are created and
dropped. That doesn't help because we create the timelines in
topological order from the root. Hence the root timeline goes first,
without context of where the branch points are.

The impact of this in prod is that shards need to rehydrate layers
after live migration since the non-visible ones were excluded from
the heatmap.

Summary of Changes

Move the visibility calculation into tenant attachment instead of
activation.
2025-06-11 18:02:16 +02:00
Trung Dinh
02f94edb60 Remove global static TENANTS (#12169)
## Problem
There is this TODO in code:
https://github.com/neondatabase/neon/blob/main/pageserver/src/tenant/mgr.rs#L300-L302
This is an old TODO by @jcsp.

## Summary of changes
This PR addresses the TODO. Specifically, it removes a global static
`TENANTS`. Instead the `TenantManager` now directly manages the tenant
map. Enhancing abstraction.

Essentially, this PR moves all module-level methods to inside the
implementation of `TenantManager`.
2025-06-10 09:26:40 +00:00
Conrad Ludgate
58327ef74d [proxy] fix sql-over-http password setting (#12177)
## Problem

Looks like our sql-over-http tests get to rely on "trust"
authentication, so the path that made sure the authkeys data was set was
never being hit.

## Summary of changes

Slight refactor to WakeComputeBackends, as well as making sure auth keys
are propagated. Fix tests to ensure passwords are tested.
2025-06-10 08:46:29 +00:00
Dmitrii Kovalkov
73be6bb736 fix(compute): use proper safekeeper in VotesCollectedMset (#12175)
## Problem
`VotesCollectedMset` uses the wrong safekeeper to update truncateLsn.
This led to some failed assert later in the code during running
safekeeper migration tests.
- Relates to https://github.com/neondatabase/neon/issues/11823

## Summary of changes
Use proper safekeeper to update truncateLsn in VotesCollectedMset
2025-06-10 07:16:42 +00:00
Alex Chi Z.
40d7583906 feat(pageserver): use hostname as feature flag resolver property (#12141)
## Problem

part of https://github.com/neondatabase/neon/issues/11813

## Summary of changes

Collect pageserver hostname property so that we can use it in the
PostHog UI. Not sure if this is the best way to do that -- open to
suggestions.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-06-10 07:10:41 +00:00
18 changed files with 504 additions and 406 deletions

View File

@@ -1022,6 +1022,7 @@ impl RemoteStorage for S3Bucket {
let Version { key, .. } = &vd;
let version_id = vd.version_id().map(|v| v.0.as_str());
if version_id == Some("null") {
// TODO: check the behavior of using the SDK on a non-versioned container
return Err(TimeTravelError::Other(anyhow!(
"Received ListVersions response for key={key} with version_id='null', \
indicating either disabled versioning, or legacy objects with null version id values"

View File

@@ -573,7 +573,8 @@ fn start_pageserver(
tokio::sync::mpsc::unbounded_channel();
let deletion_queue_client = deletion_queue.new_client();
let background_purges = mgr::BackgroundPurges::default();
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
let tenant_manager = mgr::init(
conf,
background_purges.clone(),
TenantSharedResources {
@@ -584,10 +585,10 @@ fn start_pageserver(
basebackup_prepare_sender,
feature_resolver,
},
order,
shutdown_pageserver.clone(),
))?;
);
let tenant_manager = Arc::new(tenant_manager);
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(tenant_manager.clone(), order))?;
let basebackup_cache = BasebackupCache::spawn(
BACKGROUND_RUNTIME.handle(),

View File

@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use pageserver_api::config::NodeMetadata;
use posthog_client_lite::{
CaptureEvent, FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
PostHogFlagFilterPropertyValue,
@@ -86,7 +87,35 @@ impl FeatureResolver {
}
}
}
// TODO: add pageserver URL.
// TODO: move this to a background task so that we don't block startup in case of slow disk
let metadata_path = conf.metadata_path();
match std::fs::read_to_string(&metadata_path) {
Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
Ok(metadata) => {
properties.insert(
"hostname".to_string(),
PostHogFlagFilterPropertyValue::String(metadata.http_host),
);
if let Some(cplane_region) = metadata.other.get("region_id") {
if let Some(cplane_region) = cplane_region.as_str() {
// This region contains the cell number
properties.insert(
"neon_region".to_string(),
PostHogFlagFilterPropertyValue::String(
cplane_region.to_string(),
),
);
}
}
}
Err(e) => {
tracing::warn!("Failed to parse metadata.json: {}", e);
}
},
Err(e) => {
tracing::warn!("Failed to read metadata.json: {}", e);
}
}
Arc::new(properties)
};
let fake_tenants = {

View File

@@ -1858,6 +1858,29 @@ impl TenantShard {
}
}
// At this point we've initialized all timelines and are tracking them.
// Now compute the layer visibility for all (not offloaded) timelines.
let compute_visiblity_for = {
let timelines_accessor = self.timelines.lock().unwrap();
let mut timelines_offloaded_accessor = self.timelines_offloaded.lock().unwrap();
timelines_offloaded_accessor.extend(offloaded_timelines_list.into_iter());
// Before activation, populate each Timeline's GcInfo with information about its children
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None);
timelines_accessor.values().cloned().collect::<Vec<_>>()
};
for tl in compute_visiblity_for {
tl.update_layer_visibility().await.with_context(|| {
format!(
"failed initial timeline visibility computation {} for tenant {}",
tl.timeline_id, self.tenant_shard_id
)
})?;
}
// Walk through deleted timelines, resume deletion
for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
remote_timeline_client
@@ -1877,10 +1900,6 @@ impl TenantShard {
.context("resume_deletion")
.map_err(LoadLocalTimelineError::ResumeDeletion)?;
}
{
let mut offloaded_timelines_accessor = self.timelines_offloaded.lock().unwrap();
offloaded_timelines_accessor.extend(offloaded_timelines_list.into_iter());
}
// Stash the preloaded tenant manifest, and upload a new manifest if changed.
//
@@ -3443,9 +3462,6 @@ impl TenantShard {
.values()
.filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
// Before activation, populate each Timeline's GcInfo with information about its children
self.initialize_gc_info(&timelines_accessor, &timelines_offloaded_accessor, None);
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self, background_jobs_can_start);

View File

@@ -12,7 +12,6 @@ use anyhow::Context;
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
use once_cell::sync::Lazy;
use pageserver_api::key::Key;
use pageserver_api::models::{DetachBehavior, LocationConfigMode};
use pageserver_api::shard::{
@@ -103,7 +102,7 @@ pub(crate) enum TenantsMap {
/// [`init_tenant_mgr`] is not done yet.
Initializing,
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
/// New tenants can be added using [`tenant_map_acquire_slot`].
/// New tenants can be added using [`TenantManager::tenant_map_acquire_slot`].
Open(BTreeMap<TenantShardId, TenantSlot>),
/// The pageserver has entered shutdown mode via [`TenantManager::shutdown`].
/// Existing tenants are still accessible, but no new tenants can be created.
@@ -284,9 +283,6 @@ impl BackgroundPurges {
}
}
static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
/// Responsible for storing and mutating the collection of all tenants
/// that this pageserver has state for.
///
@@ -297,10 +293,7 @@ static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
/// and attached modes concurrently.
pub struct TenantManager {
conf: &'static PageServerConf,
// TODO: currently this is a &'static pointing to TENANTs. When we finish refactoring
// out of that static variable, the TenantManager can own this.
// See https://github.com/neondatabase/neon/issues/5796
tenants: &'static std::sync::RwLock<TenantsMap>,
tenants: std::sync::RwLock<TenantsMap>,
resources: TenantSharedResources,
// Long-running operations that happen outside of a [`Tenant`] lifetime should respect this token.
@@ -479,21 +472,43 @@ pub(crate) enum DeleteTenantError {
Other(#[from] anyhow::Error),
}
/// Initialize repositories with locally available timelines.
/// Initialize repositories at `Initializing` state.
pub fn init(
conf: &'static PageServerConf,
background_purges: BackgroundPurges,
resources: TenantSharedResources,
cancel: CancellationToken,
) -> TenantManager {
TenantManager {
conf,
tenants: std::sync::RwLock::new(TenantsMap::Initializing),
resources,
cancel,
background_purges,
}
}
/// Transition repositories from `Initializing` state to `Open` state with locally available timelines.
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
/// are scheduled for download and added to the tenant once download is completed.
#[instrument(skip_all)]
pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
background_purges: BackgroundPurges,
resources: TenantSharedResources,
tenant_manager: Arc<TenantManager>,
init_order: InitializationOrder,
cancel: CancellationToken,
) -> anyhow::Result<TenantManager> {
) -> anyhow::Result<()> {
debug_assert!(matches!(
*tenant_manager.tenants.read().unwrap(),
TenantsMap::Initializing
));
let mut tenants = BTreeMap::new();
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
let conf = tenant_manager.conf;
let resources = &tenant_manager.resources;
let cancel = &tenant_manager.cancel;
let background_purges = &tenant_manager.background_purges;
// Initialize dynamic limits that depend on system resources
let system_memory =
sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory())
@@ -512,7 +527,7 @@ pub async fn init_tenant_mgr(
let tenant_configs = init_load_tenant_configs(conf).await;
// Determine which tenants are to be secondary or attached, and in which generation
let tenant_modes = init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
let tenant_modes = init_load_generations(conf, &tenant_configs, resources, cancel).await?;
tracing::info!(
"Attaching {} tenants at startup, warming up {} at a time",
@@ -669,18 +684,10 @@ pub async fn init_tenant_mgr(
info!("Processed {} local tenants at startup", tenants.len());
let mut tenants_map = TENANTS.write().unwrap();
assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
let mut tenant_map = tenant_manager.tenants.write().unwrap();
*tenant_map = TenantsMap::Open(tenants);
*tenants_map = TenantsMap::Open(tenants);
Ok(TenantManager {
conf,
tenants: &TENANTS,
resources,
cancel: CancellationToken::new(),
background_purges,
})
Ok(())
}
/// Wrapper for Tenant::spawn that checks invariants before running
@@ -719,142 +726,6 @@ fn tenant_spawn(
)
}
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
let mut join_set = JoinSet::new();
#[cfg(all(debug_assertions, not(test)))]
{
// Check that our metrics properly tracked the size of the tenants map. This is a convenient location to check,
// as it happens implicitly at the end of tests etc.
let m = tenants.read().unwrap();
debug_assert_eq!(METRICS.slots_total(), m.len() as u64);
}
// Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
let (total_in_progress, total_attached) = {
let mut m = tenants.write().unwrap();
match &mut *m {
TenantsMap::Initializing => {
*m = TenantsMap::ShuttingDown(BTreeMap::default());
info!("tenants map is empty");
return;
}
TenantsMap::Open(tenants) => {
let mut shutdown_state = BTreeMap::new();
let mut total_in_progress = 0;
let mut total_attached = 0;
for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
match v {
TenantSlot::Attached(t) => {
shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
join_set.spawn(
async move {
let res = {
let (_guard, shutdown_progress) = completion::channel();
t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await
};
if let Err(other_progress) = res {
// join the another shutdown in progress
other_progress.wait().await;
}
// we cannot afford per tenant logging here, because if s3 is degraded, we are
// going to log too many lines
debug!("tenant successfully stopped");
}
.instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
);
total_attached += 1;
}
TenantSlot::Secondary(state) => {
// We don't need to wait for this individually per-tenant: the
// downloader task will be waited on eventually, this cancel
// is just to encourage it to drop out if it is doing work
// for this tenant right now.
state.cancel.cancel();
shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
}
TenantSlot::InProgress(notify) => {
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
// wait for their notifications to fire in this function.
join_set.spawn(async move {
notify.wait().await;
});
total_in_progress += 1;
}
}
}
*m = TenantsMap::ShuttingDown(shutdown_state);
(total_in_progress, total_attached)
}
TenantsMap::ShuttingDown(_) => {
error!(
"already shutting down, this function isn't supposed to be called more than once"
);
return;
}
}
};
let started_at = std::time::Instant::now();
info!(
"Waiting for {} InProgress tenants and {} Attached tenants to shut down",
total_in_progress, total_attached
);
let total = join_set.len();
let mut panicked = 0;
let mut buffering = true;
const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
while !join_set.is_empty() {
tokio::select! {
Some(joined) = join_set.join_next() => {
match joined {
Ok(()) => {},
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the tasks");
}
Err(join_error) if join_error.is_panic() => {
// cannot really do anything, as this panic is likely a bug
panicked += 1;
}
Err(join_error) => {
warn!("unknown kind of JoinError: {join_error}");
}
}
if !buffering {
// buffer so that every 500ms since the first update (or starting) we'll log
// how far away we are; this is because we will get SIGKILL'd at 10s, and we
// are not able to log *then*.
buffering = true;
buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
}
},
_ = &mut buffered, if buffering => {
buffering = false;
info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
}
}
}
if panicked > 0 {
warn!(
panicked,
total, "observed panicks while shutting down tenants"
);
}
// caller will log how long we took
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum UpsertLocationError {
#[error("Bad config request: {0}")]
@@ -1056,7 +927,8 @@ impl TenantManager {
// the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
// the state is ill-defined while we're in transition. Transitions are async, but fast: we do
// not do significant I/O, and shutdowns should be prompt via cancellation tokens.
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
let mut slot_guard = self
.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
.map_err(|e| match e {
TenantSlotError::NotFound(_) => {
unreachable!("Called with mode Any")
@@ -1223,6 +1095,75 @@ impl TenantManager {
}
}
fn tenant_map_acquire_slot(
&self,
tenant_shard_id: &TenantShardId,
mode: TenantSlotAcquireMode,
) -> Result<SlotGuard, TenantSlotError> {
use TenantSlotAcquireMode::*;
METRICS.tenant_slot_writes.inc();
let mut locked = self.tenants.write().unwrap();
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
let _guard = span.enter();
let m = match &mut *locked {
TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
TenantsMap::Open(m) => m,
};
use std::collections::btree_map::Entry;
let entry = m.entry(*tenant_shard_id);
match entry {
Entry::Vacant(v) => match mode {
MustExist => {
tracing::debug!("Vacant && MustExist: return NotFound");
Err(TenantSlotError::NotFound(*tenant_shard_id))
}
_ => {
let (completion, barrier) = utils::completion::channel();
let inserting = TenantSlot::InProgress(barrier);
METRICS.slot_inserted(&inserting);
v.insert(inserting);
tracing::debug!("Vacant, inserted InProgress");
Ok(SlotGuard::new(
*tenant_shard_id,
None,
completion,
&self.tenants,
))
}
},
Entry::Occupied(mut o) => {
// Apply mode-driven checks
match (o.get(), mode) {
(TenantSlot::InProgress(_), _) => {
tracing::debug!("Occupied, failing for InProgress");
Err(TenantSlotError::InProgress)
}
_ => {
// Happy case: the slot was not in any state that violated our mode
let (completion, barrier) = utils::completion::channel();
let in_progress = TenantSlot::InProgress(barrier);
METRICS.slot_inserted(&in_progress);
let old_value = o.insert(in_progress);
METRICS.slot_removed(&old_value);
tracing::debug!("Occupied, replaced with InProgress");
Ok(SlotGuard::new(
*tenant_shard_id,
Some(old_value),
completion,
&self.tenants,
))
}
}
}
}
}
/// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
/// LocationConf that was last used to attach it. Optionally, the local file cache may be
/// dropped before re-attaching.
@@ -1239,7 +1180,8 @@ impl TenantManager {
drop_cache: bool,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let mut slot_guard =
self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let Some(old_slot) = slot_guard.get_old_value() else {
anyhow::bail!("Tenant not found when trying to reset");
};
@@ -1388,7 +1330,8 @@ impl TenantManager {
Ok(())
}
let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let slot_guard =
self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
match &slot_guard.old_value {
Some(TenantSlot::Attached(tenant)) => {
// Legacy deletion flow: the tenant remains attached, goes to Stopping state, and
@@ -1539,7 +1482,7 @@ impl TenantManager {
// Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
drop(tenant);
let mut parent_slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
let parent = match parent_slot_guard.get_old_value() {
Some(TenantSlot::Attached(t)) => t,
Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"),
@@ -1843,7 +1786,145 @@ impl TenantManager {
pub(crate) async fn shutdown(&self) {
self.cancel.cancel();
shutdown_all_tenants0(self.tenants).await
self.shutdown_all_tenants0().await
}
async fn shutdown_all_tenants0(&self) {
let mut join_set = JoinSet::new();
#[cfg(all(debug_assertions, not(test)))]
{
// Check that our metrics properly tracked the size of the tenants map. This is a convenient location to check,
// as it happens implicitly at the end of tests etc.
let m = self.tenants.read().unwrap();
debug_assert_eq!(METRICS.slots_total(), m.len() as u64);
}
// Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
let (total_in_progress, total_attached) = {
let mut m = self.tenants.write().unwrap();
match &mut *m {
TenantsMap::Initializing => {
*m = TenantsMap::ShuttingDown(BTreeMap::default());
info!("tenants map is empty");
return;
}
TenantsMap::Open(tenants) => {
let mut shutdown_state = BTreeMap::new();
let mut total_in_progress = 0;
let mut total_attached = 0;
for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
match v {
TenantSlot::Attached(t) => {
shutdown_state
.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
join_set.spawn(
async move {
let res = {
let (_guard, shutdown_progress) = completion::channel();
t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await
};
if let Err(other_progress) = res {
// join the another shutdown in progress
other_progress.wait().await;
}
// we cannot afford per tenant logging here, because if s3 is degraded, we are
// going to log too many lines
debug!("tenant successfully stopped");
}
.instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
);
total_attached += 1;
}
TenantSlot::Secondary(state) => {
// We don't need to wait for this individually per-tenant: the
// downloader task will be waited on eventually, this cancel
// is just to encourage it to drop out if it is doing work
// for this tenant right now.
state.cancel.cancel();
shutdown_state
.insert(tenant_shard_id, TenantSlot::Secondary(state));
}
TenantSlot::InProgress(notify) => {
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
// wait for their notifications to fire in this function.
join_set.spawn(async move {
notify.wait().await;
});
total_in_progress += 1;
}
}
}
*m = TenantsMap::ShuttingDown(shutdown_state);
(total_in_progress, total_attached)
}
TenantsMap::ShuttingDown(_) => {
error!(
"already shutting down, this function isn't supposed to be called more than once"
);
return;
}
}
};
let started_at = std::time::Instant::now();
info!(
"Waiting for {} InProgress tenants and {} Attached tenants to shut down",
total_in_progress, total_attached
);
let total = join_set.len();
let mut panicked = 0;
let mut buffering = true;
const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
while !join_set.is_empty() {
tokio::select! {
Some(joined) = join_set.join_next() => {
match joined {
Ok(()) => {},
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the tasks");
}
Err(join_error) if join_error.is_panic() => {
// cannot really do anything, as this panic is likely a bug
panicked += 1;
}
Err(join_error) => {
warn!("unknown kind of JoinError: {join_error}");
}
}
if !buffering {
// buffer so that every 500ms since the first update (or starting) we'll log
// how far away we are; this is because we will get SIGKILL'd at 10s, and we
// are not able to log *then*.
buffering = true;
buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
}
},
_ = &mut buffered, if buffering => {
buffering = false;
info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
}
}
}
if panicked > 0 {
warn!(
panicked,
total, "observed panicks while shutting down tenants"
);
}
// caller will log how long we took
}
/// Detaches a tenant, and removes its local files asynchronously.
@@ -1889,12 +1970,12 @@ impl TenantManager {
.map(Some)
};
let mut removal_result = remove_tenant_from_memory(
self.tenants,
tenant_shard_id,
tenant_dir_rename_operation(tenant_shard_id),
)
.await;
let mut removal_result = self
.remove_tenant_from_memory(
tenant_shard_id,
tenant_dir_rename_operation(tenant_shard_id),
)
.await;
// If the tenant was not found, it was likely already removed. Attempt to remove the tenant
// directory on disk anyway. For example, during shard splits, we shut down and remove the
@@ -1948,17 +2029,16 @@ impl TenantManager {
) -> Result<HashSet<TimelineId>, detach_ancestor::Error> {
use detach_ancestor::Error;
let slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist).map_err(
|e| {
use TenantSlotError::*;
let slot_guard = self
.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)
.map_err(|e| {
use TenantSlotError::*;
match e {
MapState(TenantMapError::ShuttingDown) => Error::ShuttingDown,
NotFound(_) | InProgress | MapState(_) => Error::DetachReparent(e.into()),
}
},
)?;
match e {
MapState(TenantMapError::ShuttingDown) => Error::ShuttingDown,
NotFound(_) | InProgress | MapState(_) => Error::DetachReparent(e.into()),
}
})?;
let tenant = {
let old_slot = slot_guard
@@ -2291,6 +2371,80 @@ impl TenantManager {
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
})
}
/// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
/// Allows to remove other tenant resources manually, via `tenant_cleanup`.
/// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
async fn remove_tenant_from_memory<V, F>(
&self,
tenant_shard_id: TenantShardId,
tenant_cleanup: F,
) -> Result<V, TenantStateError>
where
F: std::future::Future<Output = anyhow::Result<V>>,
{
let mut slot_guard =
self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
// allow pageserver shutdown to await for our completion
let (_guard, progress) = completion::channel();
// The SlotGuard allows us to manipulate the Tenant object without fear of some
// concurrent API request doing something else for the same tenant ID.
let attached_tenant = match slot_guard.get_old_value() {
Some(TenantSlot::Attached(tenant)) => {
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
let shutdown_mode = ShutdownMode::Hard;
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
// that we can continue safely to cleanup.
match tenant.shutdown(progress, shutdown_mode).await {
Ok(()) => {}
Err(_other) => {
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
// wait for it but return an error right away because these are distinct requests.
slot_guard.revert();
return Err(TenantStateError::IsStopping(tenant_shard_id));
}
}
Some(tenant)
}
Some(TenantSlot::Secondary(secondary_state)) => {
tracing::info!("Shutting down in secondary mode");
secondary_state.shutdown().await;
None
}
Some(TenantSlot::InProgress(_)) => {
// Acquiring a slot guarantees its old value was not InProgress
unreachable!();
}
None => None,
};
match tenant_cleanup
.await
.with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
{
Ok(hook_value) => {
// Success: drop the old TenantSlot::Attached.
slot_guard
.drop_old_value()
.expect("We just called shutdown");
Ok(hook_value)
}
Err(e) => {
// If we had a Tenant, set it to Broken and put it back in the TenantsMap
if let Some(attached_tenant) = attached_tenant {
attached_tenant.set_broken(e.to_string()).await;
}
// Leave the broken tenant in the map
slot_guard.revert();
Err(TenantStateError::Other(e))
}
}
}
}
#[derive(Debug, thiserror::Error)]
@@ -2455,7 +2609,7 @@ pub(crate) enum TenantMapError {
/// this tenant to retry later, or wait for the InProgress state to end.
///
/// This structure enforces the important invariant that we do not have overlapping
/// tasks that will try use local storage for a the same tenant ID: we enforce that
/// tasks that will try to use local storage for a the same tenant ID: we enforce that
/// the previous contents of a slot have been shut down before the slot can be
/// left empty or used for something else
///
@@ -2468,7 +2622,7 @@ pub(crate) enum TenantMapError {
/// The `old_value` may be dropped before the SlotGuard is dropped, by calling
/// `drop_old_value`. It is an error to call this without shutting down
/// the conents of `old_value`.
pub(crate) struct SlotGuard {
pub(crate) struct SlotGuard<'a> {
tenant_shard_id: TenantShardId,
old_value: Option<TenantSlot>,
upserted: bool,
@@ -2476,19 +2630,23 @@ pub(crate) struct SlotGuard {
/// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
/// release any waiters as soon as this SlotGuard is dropped.
completion: utils::completion::Completion,
tenants: &'a std::sync::RwLock<TenantsMap>,
}
impl SlotGuard {
impl<'a> SlotGuard<'a> {
fn new(
tenant_shard_id: TenantShardId,
old_value: Option<TenantSlot>,
completion: utils::completion::Completion,
tenants: &'a std::sync::RwLock<TenantsMap>,
) -> Self {
Self {
tenant_shard_id,
old_value,
upserted: false,
completion,
tenants,
}
}
@@ -2512,8 +2670,8 @@ impl SlotGuard {
));
}
let replaced = {
let mut locked = TENANTS.write().unwrap();
let replaced: Option<TenantSlot> = {
let mut locked = self.tenants.write().unwrap();
if let TenantSlot::InProgress(_) = new_value {
// It is never expected to try and upsert InProgress via this path: it should
@@ -2621,7 +2779,7 @@ impl SlotGuard {
}
}
impl Drop for SlotGuard {
impl<'a> Drop for SlotGuard<'a> {
fn drop(&mut self) {
if self.upserted {
return;
@@ -2629,7 +2787,7 @@ impl Drop for SlotGuard {
// Our old value is already shutdown, or it never existed: it is safe
// for us to fully release the TenantSlot back into an empty state
let mut locked = TENANTS.write().unwrap();
let mut locked = self.tenants.write().unwrap();
let m = match &mut *locked {
TenantsMap::Initializing => {
@@ -2711,151 +2869,6 @@ enum TenantSlotAcquireMode {
MustExist,
}
fn tenant_map_acquire_slot(
tenant_shard_id: &TenantShardId,
mode: TenantSlotAcquireMode,
) -> Result<SlotGuard, TenantSlotError> {
tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
}
fn tenant_map_acquire_slot_impl(
tenant_shard_id: &TenantShardId,
tenants: &std::sync::RwLock<TenantsMap>,
mode: TenantSlotAcquireMode,
) -> Result<SlotGuard, TenantSlotError> {
use TenantSlotAcquireMode::*;
METRICS.tenant_slot_writes.inc();
let mut locked = tenants.write().unwrap();
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
let _guard = span.enter();
let m = match &mut *locked {
TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
TenantsMap::Open(m) => m,
};
use std::collections::btree_map::Entry;
let entry = m.entry(*tenant_shard_id);
match entry {
Entry::Vacant(v) => match mode {
MustExist => {
tracing::debug!("Vacant && MustExist: return NotFound");
Err(TenantSlotError::NotFound(*tenant_shard_id))
}
_ => {
let (completion, barrier) = utils::completion::channel();
let inserting = TenantSlot::InProgress(barrier);
METRICS.slot_inserted(&inserting);
v.insert(inserting);
tracing::debug!("Vacant, inserted InProgress");
Ok(SlotGuard::new(*tenant_shard_id, None, completion))
}
},
Entry::Occupied(mut o) => {
// Apply mode-driven checks
match (o.get(), mode) {
(TenantSlot::InProgress(_), _) => {
tracing::debug!("Occupied, failing for InProgress");
Err(TenantSlotError::InProgress)
}
_ => {
// Happy case: the slot was not in any state that violated our mode
let (completion, barrier) = utils::completion::channel();
let in_progress = TenantSlot::InProgress(barrier);
METRICS.slot_inserted(&in_progress);
let old_value = o.insert(in_progress);
METRICS.slot_removed(&old_value);
tracing::debug!("Occupied, replaced with InProgress");
Ok(SlotGuard::new(
*tenant_shard_id,
Some(old_value),
completion,
))
}
}
}
}
}
/// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
/// Allows to remove other tenant resources manually, via `tenant_cleanup`.
/// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
/// operation would be needed to remove it.
async fn remove_tenant_from_memory<V, F>(
tenants: &std::sync::RwLock<TenantsMap>,
tenant_shard_id: TenantShardId,
tenant_cleanup: F,
) -> Result<V, TenantStateError>
where
F: std::future::Future<Output = anyhow::Result<V>>,
{
let mut slot_guard =
tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
// allow pageserver shutdown to await for our completion
let (_guard, progress) = completion::channel();
// The SlotGuard allows us to manipulate the Tenant object without fear of some
// concurrent API request doing something else for the same tenant ID.
let attached_tenant = match slot_guard.get_old_value() {
Some(TenantSlot::Attached(tenant)) => {
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
let shutdown_mode = ShutdownMode::Hard;
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
// that we can continue safely to cleanup.
match tenant.shutdown(progress, shutdown_mode).await {
Ok(()) => {}
Err(_other) => {
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
// wait for it but return an error right away because these are distinct requests.
slot_guard.revert();
return Err(TenantStateError::IsStopping(tenant_shard_id));
}
}
Some(tenant)
}
Some(TenantSlot::Secondary(secondary_state)) => {
tracing::info!("Shutting down in secondary mode");
secondary_state.shutdown().await;
None
}
Some(TenantSlot::InProgress(_)) => {
// Acquiring a slot guarantees its old value was not InProgress
unreachable!();
}
None => None,
};
match tenant_cleanup
.await
.with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
{
Ok(hook_value) => {
// Success: drop the old TenantSlot::Attached.
slot_guard
.drop_old_value()
.expect("We just called shutdown");
Ok(hook_value)
}
Err(e) => {
// If we had a Tenant, set it to Broken and put it back in the TenantsMap
if let Some(attached_tenant) = attached_tenant {
attached_tenant.set_broken(e.to_string()).await;
}
// Leave the broken tenant in the map
slot_guard.revert();
Err(TenantStateError::Other(e))
}
}
}
use http_utils::error::ApiError;
use pageserver_api::models::TimelineGcRequest;
@@ -2866,11 +2879,15 @@ mod tests {
use std::collections::BTreeMap;
use std::sync::Arc;
use storage_broker::BrokerClientChannel;
use tracing::Instrument;
use super::super::harness::TenantHarness;
use super::TenantsMap;
use crate::tenant::mgr::TenantSlot;
use crate::tenant::{
TenantSharedResources,
mgr::{BackgroundPurges, TenantManager, TenantSlot},
};
#[tokio::test(start_paused = true)]
async fn shutdown_awaits_in_progress_tenant() {
@@ -2891,23 +2908,47 @@ mod tests {
let _e = span.enter();
let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
// Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
// permit it to proceed: that will stick the tenant in InProgress
let (basebackup_prepare_sender, _) = tokio::sync::mpsc::unbounded_channel::<
crate::basebackup_cache::BasebackupPrepareRequest,
>();
let tenant_manager = TenantManager {
tenants: std::sync::RwLock::new(TenantsMap::Open(tenants)),
conf: h.conf,
resources: TenantSharedResources {
broker_client: BrokerClientChannel::connect_lazy("foobar.com")
.await
.unwrap(),
remote_storage: h.remote_storage.clone(),
deletion_queue_client: h.deletion_queue.new_client(),
l0_flush_global_state: crate::l0_flush::L0FlushGlobalState::new(
h.conf.l0_flush.clone(),
),
basebackup_prepare_sender,
feature_resolver: crate::feature_resolver::FeatureResolver::new_disabled(),
},
cancel: tokio_util::sync::CancellationToken::new(),
background_purges: BackgroundPurges::default(),
};
let tenant_manager = Arc::new(tenant_manager);
let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
let (until_cleanup_started, cleanup_started) = utils::completion::channel();
let mut remove_tenant_from_memory_task = {
let tenant_manager = tenant_manager.clone();
let jh = tokio::spawn({
let tenants = tenants.clone();
async move {
let cleanup = async move {
drop(until_cleanup_started);
can_complete_cleanup.wait().await;
anyhow::Ok(())
};
super::remove_tenant_from_memory(&tenants, id, cleanup).await
tenant_manager.remove_tenant_from_memory(id, cleanup).await
}
.instrument(h.span())
});
@@ -2920,9 +2961,11 @@ mod tests {
let mut shutdown_task = {
let (until_shutdown_started, shutdown_started) = utils::completion::channel();
let tenant_manager = tenant_manager.clone();
let shutdown_task = tokio::spawn(async move {
drop(until_shutdown_started);
super::shutdown_all_tenants0(&tenants).await;
tenant_manager.shutdown_all_tenants0().await;
});
shutdown_started.wait().await;

View File

@@ -3407,10 +3407,6 @@ impl Timeline {
// TenantShard::create_timeline will wait for these uploads to happen before returning, or
// on retry.
// Now that we have the full layer map, we may calculate the visibility of layers within it (a global scan)
drop(guard); // drop write lock, update_layer_visibility will take a read lock.
self.update_layer_visibility().await?;
info!(
"loaded layer map with {} layers at {}, total physical size: {}",
num_layers, disk_consistent_lsn, total_physical_size
@@ -5901,7 +5897,7 @@ impl Drop for Timeline {
if let Ok(mut gc_info) = ancestor.gc_info.write() {
if !gc_info.remove_child_not_offloaded(self.timeline_id) {
tracing::error!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id,
"Couldn't remove retain_lsn entry from offloaded timeline's parent: already removed");
"Couldn't remove retain_lsn entry from timeline's parent on drop: already removed");
}
}
}

View File

@@ -1135,7 +1135,7 @@ VotesCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInf
wp->propTermStartLsn = sk->voteResponse.flushLsn;
wp->donor = sk;
}
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
wp->truncateLsn = Max(sk->voteResponse.truncateLsn, wp->truncateLsn);
if (n_votes > 0)
appendStringInfoString(s, ", ");

View File

@@ -14,7 +14,7 @@ use crate::context::RequestContext;
use crate::control_plane::client::cplane_proxy_v1;
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
use crate::error::{ReportableError, UserFacingError};
use crate::pglb::connect_compute::ComputeConnectBackend;
use crate::pglb::connect_compute::WakeComputeBackend;
use crate::pqproto::BeMessage;
use crate::proxy::NeonOptions;
use crate::stream::PqStream;
@@ -109,7 +109,7 @@ impl ConsoleRedirectBackend {
pub struct ConsoleRedirectNodeInfo(pub(super) NodeInfo);
#[async_trait]
impl ComputeConnectBackend for ConsoleRedirectNodeInfo {
impl WakeComputeBackend for ConsoleRedirectNodeInfo {
async fn wake_compute(
&self,
_ctx: &RequestContext,

View File

@@ -25,7 +25,7 @@ use crate::control_plane::{
RoleAccessControl,
};
use crate::intern::EndpointIdInt;
use crate::pglb::connect_compute::ComputeConnectBackend;
use crate::pglb::connect_compute::WakeComputeBackend;
use crate::pqproto::BeMessage;
use crate::proxy::NeonOptions;
use crate::rate_limiter::EndpointRateLimiter;
@@ -407,13 +407,13 @@ impl Backend<'_, ComputeUserInfo> {
}
#[async_trait::async_trait]
impl ComputeConnectBackend for Backend<'_, ComputeCredentials> {
impl WakeComputeBackend for Backend<'_, ComputeUserInfo> {
async fn wake_compute(
&self,
ctx: &RequestContext,
) -> Result<CachedNodeInfo, control_plane::errors::WakeComputeError> {
match self {
Self::ControlPlane(api, creds) => api.wake_compute(ctx, &creds.info).await,
Self::ControlPlane(api, info) => api.wake_compute(ctx, info).await,
Self::Local(local) => Ok(Cached::new_uncached(local.node_info.clone())),
}
}

View File

@@ -136,11 +136,11 @@ impl AuthInfo {
}
}
pub(crate) fn with_auth_keys(keys: &ComputeCredentialKeys) -> Self {
pub(crate) fn with_auth_keys(keys: ComputeCredentialKeys) -> Self {
Self {
auth: match keys {
ComputeCredentialKeys::AuthKeys(AuthKeys::ScramSha256(auth_keys)) => {
Some(Auth::Scram(Box::new(*auth_keys)))
Some(Auth::Scram(Box::new(auth_keys)))
}
ComputeCredentialKeys::JwtPayload(_) | ComputeCredentialKeys::None => None,
},

View File

@@ -50,7 +50,7 @@ pub(crate) trait ConnectMechanism {
}
#[async_trait]
pub(crate) trait ComputeConnectBackend {
pub(crate) trait WakeComputeBackend {
async fn wake_compute(
&self,
ctx: &RequestContext,
@@ -91,7 +91,7 @@ impl ConnectMechanism for TcpMechanism {
/// Try to connect to the compute node, retrying if necessary.
#[tracing::instrument(skip_all)]
pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: WakeComputeBackend>(
ctx: &RequestContext,
mechanism: &M,
user_info: &B,

View File

@@ -358,12 +358,12 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
}
};
let creds = match &user_info {
auth::Backend::ControlPlane(_, creds) => creds,
let (cplane, creds) = match user_info {
auth::Backend::ControlPlane(cplane, creds) => (cplane, creds),
auth::Backend::Local(_) => unreachable!("local proxy does not run tcp proxy service"),
};
let params_compat = creds.info.options.get(NeonOptions::PARAMS_COMPAT).is_some();
let mut auth_info = compute::AuthInfo::with_auth_keys(&creds.keys);
let mut auth_info = compute::AuthInfo::with_auth_keys(creds.keys);
auth_info.set_startup_params(&params, params_compat);
let res = connect_to_compute(
@@ -373,7 +373,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
auth: auth_info,
locks: &config.connect_compute_locks,
},
&user_info,
&auth::Backend::ControlPlane(cplane, creds.info),
config.wake_compute_retry_config,
&config.connect_to_compute,
)

View File

@@ -8,7 +8,7 @@ use std::time::Duration;
use anyhow::{Context, bail};
use async_trait::async_trait;
use http::StatusCode;
use postgres_client::config::{AuthKeys, ScramKeys, SslMode};
use postgres_client::config::SslMode;
use postgres_client::tls::{MakeTlsConnect, NoTls};
use retry::{ShouldRetryWakeCompute, retry_after};
use rstest::rstest;
@@ -19,9 +19,7 @@ use tracing_test::traced_test;
use super::retry::CouldRetry;
use super::*;
use crate::auth::backend::{
ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo, MaybeOwned,
};
use crate::auth::backend::{ComputeUserInfo, MaybeOwned};
use crate::config::{ComputeConfig, RetryConfig};
use crate::control_plane::client::{ControlPlaneClient, TestControlPlaneClient};
use crate::control_plane::messages::{ControlPlaneErrorMessage, Details, MetricsAuxInfo, Status};
@@ -575,19 +573,13 @@ fn helper_create_cached_node_info(cache: &'static NodeInfoCache) -> CachedNodeIn
fn helper_create_connect_info(
mechanism: &TestConnectMechanism,
) -> auth::Backend<'static, ComputeCredentials> {
) -> auth::Backend<'static, ComputeUserInfo> {
auth::Backend::ControlPlane(
MaybeOwned::Owned(ControlPlaneClient::Test(Box::new(mechanism.clone()))),
ComputeCredentials {
info: ComputeUserInfo {
endpoint: "endpoint".into(),
user: "user".into(),
options: NeonOptions::parse_options_raw(""),
},
keys: ComputeCredentialKeys::AuthKeys(AuthKeys::ScramSha256(ScramKeys {
client_key: [0; 32],
server_key: [0; 32],
})),
ComputeUserInfo {
endpoint: "endpoint".into(),
user: "user".into(),
options: NeonOptions::parse_options_raw(""),
},
)
}

View File

@@ -8,7 +8,7 @@ use crate::error::ReportableError;
use crate::metrics::{
ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType,
};
use crate::pglb::connect_compute::ComputeConnectBackend;
use crate::pglb::connect_compute::WakeComputeBackend;
use crate::proxy::retry::{retry_after, should_retry};
// Use macro to retain original callsite.
@@ -23,7 +23,7 @@ macro_rules! log_wake_compute_error {
};
}
pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
pub(crate) async fn wake_compute<B: WakeComputeBackend>(
num_retries: &mut u32,
ctx: &RequestContext,
api: &B,

View File

@@ -21,7 +21,7 @@ use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
use super::http_conn_pool::{self, HttpConnPool, Send, poll_http2_client};
use super::local_conn_pool::{self, EXT_NAME, EXT_SCHEMA, EXT_VERSION, LocalConnPool};
use crate::auth::backend::local::StaticAuthRules;
use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
use crate::auth::backend::{ComputeCredentialKeys, ComputeCredentials, ComputeUserInfo};
use crate::auth::{self, AuthError};
use crate::compute_ctl::{
ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest,
@@ -180,7 +180,7 @@ impl PoolingBackend {
let conn_id = uuid::Uuid::new_v4();
tracing::Span::current().record("conn_id", display(conn_id));
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
let backend = self.auth_backend.as_ref().map(|()| keys);
let backend = self.auth_backend.as_ref().map(|()| keys.info);
crate::pglb::connect_compute::connect_to_compute(
ctx,
&TokioMechanism {
@@ -188,6 +188,7 @@ impl PoolingBackend {
conn_info,
pool: self.pool.clone(),
locks: &self.config.connect_compute_locks,
keys: keys.keys,
},
&backend,
self.config.wake_compute_retry_config,
@@ -214,16 +215,13 @@ impl PoolingBackend {
let conn_id = uuid::Uuid::new_v4();
tracing::Span::current().record("conn_id", display(conn_id));
debug!(%conn_id, "pool: opening a new connection '{conn_info}'");
let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials {
info: ComputeUserInfo {
user: conn_info.user_info.user.clone(),
endpoint: EndpointId::from(format!(
"{}{LOCAL_PROXY_SUFFIX}",
conn_info.user_info.endpoint.normalize()
)),
options: conn_info.user_info.options.clone(),
},
keys: crate::auth::backend::ComputeCredentialKeys::None,
let backend = self.auth_backend.as_ref().map(|()| ComputeUserInfo {
user: conn_info.user_info.user.clone(),
endpoint: EndpointId::from(format!(
"{}{LOCAL_PROXY_SUFFIX}",
conn_info.user_info.endpoint.normalize()
)),
options: conn_info.user_info.options.clone(),
});
crate::pglb::connect_compute::connect_to_compute(
ctx,
@@ -495,6 +493,7 @@ struct TokioMechanism {
pool: Arc<GlobalConnPool<postgres_client::Client, EndpointConnPool<postgres_client::Client>>>,
conn_info: ConnInfo,
conn_id: uuid::Uuid,
keys: ComputeCredentialKeys,
/// connect_to_compute concurrency lock
locks: &'static ApiLocks<Host>,
@@ -520,6 +519,10 @@ impl ConnectMechanism for TokioMechanism {
.dbname(&self.conn_info.dbname)
.connect_timeout(compute_config.timeout);
if let ComputeCredentialKeys::AuthKeys(auth_keys) = self.keys {
config.auth_keys(auth_keys);
}
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute);
let res = config.connect(compute_config).await;
drop(pause);

View File

@@ -4046,6 +4046,16 @@ def static_proxy(
"CREATE TABLE neon_control_plane.endpoints (endpoint_id VARCHAR(255) PRIMARY KEY, allowed_ips VARCHAR(255))"
)
vanilla_pg.stop()
vanilla_pg.edit_hba(
[
"local all all trust",
"host all all 127.0.0.1/32 scram-sha-256",
"host all all ::1/128 scram-sha-256",
]
)
vanilla_pg.start()
proxy_port = port_distributor.get_port()
mgmt_port = port_distributor.get_port()
http_port = port_distributor.get_port()

View File

@@ -197,8 +197,7 @@ def print_gc_result(row: dict[str, Any]):
log.info("GC duration {elapsed} ms".format_map(row))
log.info(
(
" total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_pitr {layers_needed_by_pitr}"
" needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}"
" eligible: {layers_eligible}, not_updated: {layers_not_updated}, removed: {layers_removed}"
).format_map(row)
)

View File

@@ -19,11 +19,15 @@ TABLE_NAME = "neon_control_plane.endpoints"
async def test_proxy_psql_allowed_ips(static_proxy: NeonProxy, vanilla_pg: VanillaPostgres):
# Shouldn't be able to connect to this project
vanilla_pg.safe_psql(
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('private-project', '8.8.8.8')"
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('private-project', '8.8.8.8')",
user="proxy",
password="password",
)
# Should be able to connect to this project
vanilla_pg.safe_psql(
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('generic-project', '::1,127.0.0.1')"
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('generic-project', '::1,127.0.0.1')",
user="proxy",
password="password",
)
def check_cannot_connect(**kwargs):
@@ -60,7 +64,9 @@ async def test_proxy_http_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
# Shouldn't be able to connect to this project
vanilla_pg.safe_psql(
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('proxy', '8.8.8.8')"
f"INSERT INTO {TABLE_NAME} (endpoint_id, allowed_ips) VALUES ('proxy', '8.8.8.8')",
user="proxy",
password="password",
)
def query(status: int, query: str, *args):
@@ -75,6 +81,8 @@ async def test_proxy_http_allowed_ips(static_proxy: NeonProxy, vanilla_pg: Vanil
query(400, "select 1;") # ip address is not allowed
# Should be able to connect to this project
vanilla_pg.safe_psql(
f"UPDATE {TABLE_NAME} SET allowed_ips = '8.8.8.8,127.0.0.1' WHERE endpoint_id = 'proxy'"
f"UPDATE {TABLE_NAME} SET allowed_ips = '8.8.8.8,127.0.0.1' WHERE endpoint_id = 'proxy'",
user="proxy",
password="password",
)
query(200, "select 1;") # should work now